diff --git a/.env.example b/.env.example index 570f6c6..bf9d3ff 100644 --- a/.env.example +++ b/.env.example @@ -13,3 +13,10 @@ OMO_PULSE_API_PORT=4301 # Set to "true" in CI/CD environments to prevent Playwright from reusing existing servers # In local development, leave unset (defaults to false) CI= + +# Telegram Notifications (optional) +# Get token from @BotFather, chat ID from @userinfobot or the getUpdates API +# When both are set, omo-pulse pushes a pinned status message to the chat +# and sends alert notifications for question/error/plan_complete transitions. +TELEGRAM_BOT_TOKEN= +TELEGRAM_CHAT_ID= diff --git a/README.md b/README.md index 7daf633..623501b 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ [![Runtime: Bun](https://img.shields.io/badge/Runtime-Bun-%23f9f1e1?logo=bun)](https://bun.sh) [![TypeScript](https://img.shields.io/badge/TypeScript-5.5-blue?logo=typescript)](https://www.typescriptlang.org/) [![Sponsor](https://img.shields.io/badge/sponsor-%E2%9D%A4-lightgrey)](https://github.com/sponsors/EZotoff) +[![Ko-fi](https://img.shields.io/badge/Ko--fi-Support-ff5e5b?logo=ko-fi&logoColor=white)](https://ko-fi.com/ezotoff) ![Dashboard — multi-project view with session activity and token usage](docs/screenshots/details-collapsed.png) diff --git a/src/__tests__/api.test.ts b/src/__tests__/api.test.ts index cc507b3..fa9243e 100644 --- a/src/__tests__/api.test.ts +++ b/src/__tests__/api.test.ts @@ -22,16 +22,6 @@ vi.mock("../ingest/sources-registry", () => ({ getSourceById: vi.fn(() => null), })) -vi.mock("../server/multi-project", () => ({ - createMultiProjectService: vi.fn(() => ({ - getMultiProjectPayload: vi.fn(async (): Promise => ({ - projects: [], - serverNowMs: Date.now(), - pollIntervalMs: 2000, - })), - })), -})) - vi.mock("../ingest/session", () => ({ getStorageRoots: vi.fn(() => ({ session: "/tmp/session", @@ -63,7 +53,6 @@ vi.mock("../ingest/sqlite-derive", () => ({ // Import AFTER mocking // --------------------------------------------------------------------------- import { createApi } from "../server/api" -import { createMultiProjectService } from "../server/multi-project" import type { ProjectSnapshot } from "../types" // --------------------------------------------------------------------------- @@ -83,6 +72,8 @@ function makeProjectSnapshot(overrides: Partial = {}): ProjectS sessionId: "ses_abc", status: "idle", }, + sessions: [], + aggregateStatus: "idle", planProgress: { name: "plan-1", completed: 3, @@ -93,6 +84,7 @@ function makeProjectSnapshot(overrides: Partial = {}): ProjectS planStale: false, planComplete: false, }, + unintiatedPlans: [], timeSeries: { windowMs: 300000, bucketMs: 2000, @@ -127,12 +119,13 @@ describe("API routes", () => { serverNowMs: Date.now(), pollIntervalMs: 2000, })), + invalidate: vi.fn(), } - vi.mocked(createMultiProjectService).mockReturnValue(mockService) app = createApi({ storageRoot: "/tmp/test-storage", storageBackend: { kind: "sqlite", dataDir: "/tmp", sqlitePath: "/tmp/test.db" }, + multiProjectService: mockService, version: "1.0.0-test", }) }) diff --git a/src/__tests__/session-inclusion.test.ts b/src/__tests__/session-inclusion.test.ts index b002378..19f886e 100644 --- a/src/__tests__/session-inclusion.test.ts +++ b/src/__tests__/session-inclusion.test.ts @@ -19,19 +19,17 @@ type SessionRow = { type ActivePartRow = { tool: string - status: string } -type ErrorCountRow = { - cnt: number +type TerminalPartRow = { + status: string } type AssistantMessageRow = { - role: string - time_completed?: number + time_completed: number | null } -type QueryRows = SessionRow[] | ActivePartRow[] | ErrorCountRow[] | AssistantMessageRow[] +type QueryRows = SessionRow[] | ActivePartRow[] | TerminalPartRow[] | AssistantMessageRow[] type MockStatement = { all: (...params: unknown[]) => QueryRows @@ -44,7 +42,7 @@ type MockDatabase = { type MockDbConfig = { sessionRows?: SessionRow[] activePartsBySession?: Record - errorCountsBySession?: Record + terminalStatusBySession?: Record assistantMessagesBySession?: Record throwOnQuery?: boolean } @@ -64,12 +62,13 @@ function createMockDb(config: MockDbConfig = {}): MockDatabase { return config.sessionRows ?? [] } - if (sql.includes("state_status = 'pending' OR state_status = 'running'")) { + if (sql.includes("'pending', 'running'")) { return sessionId ? (config.activePartsBySession?.[sessionId] ?? []) : [] } - if (sql.includes("state_status = 'error'")) { - return [{ cnt: sessionId ? (config.errorCountsBySession?.[sessionId] ?? 0) : 0 }] + if (sql.includes("'error', 'completed'")) { + const status = sessionId ? (config.terminalStatusBySession?.[sessionId] ?? null) : null + return status ? [{ status }] : [] } if (sql.includes("FROM message")) { @@ -293,8 +292,8 @@ describe("findIncludedSessionsSqlite", () => { time_updated: now - 120000, }, ], - errorCountsBySession: { - "stale-error": 1, + terminalStatusBySession: { + "stale-error": "error", }, }), "/home/user/project", @@ -511,8 +510,8 @@ describe("findIncludedSessionsSqlite", () => { activePartsBySession: { "question-session": [{ tool: "mcp_question", status: "pending" }], }, - errorCountsBySession: { - "error-session": 1, + terminalStatusBySession: { + "error-session": "error", }, }), "/home/user/project", diff --git a/src/ingest/session-inclusion.ts b/src/ingest/session-inclusion.ts index 7cfb49d..3a82c16 100644 --- a/src/ingest/session-inclusion.ts +++ b/src/ingest/session-inclusion.ts @@ -34,14 +34,14 @@ function normalizePath(dir: string): string { */ function deriveSessionStatus(db: Database, session: SessionMetadata, nowMs: number): string { try { - // Check for active tool (pending or running) const activeParts = db .query( - `SELECT tool, status FROM part - WHERE session_id = ? AND (state_status = 'pending' OR state_status = 'running') - ORDER BY created DESC LIMIT 1` + `SELECT json_extract(data, '$.tool') as tool + FROM part + WHERE session_id = ? AND json_extract(data, '$.state.status') IN ('pending', 'running') + ORDER BY time_created DESC LIMIT 1` ) - .all(session.id) as Array<{ tool: string; status: string }> + .all(session.id) as Array<{ tool: string }> if (activeParts.length > 0) { if (QUESTION_TOOL_NAMES.has(activeParts[0].tool)) { @@ -50,42 +50,39 @@ function deriveSessionStatus(db: Database, session: SessionMetadata, nowMs: numb return "running_tool" } - // Check for error tool - const errorParts = db + const lastTerminal = db .query( - `SELECT COUNT(*) as cnt FROM part - WHERE session_id = ? AND state_status = 'error' - LIMIT 1` + `SELECT json_extract(data, '$.state.status') as status + FROM part + WHERE session_id = ? AND json_extract(data, '$.state.status') IN ('error', 'completed') + ORDER BY time_created DESC LIMIT 1` ) - .all(session.id) as Array<{ cnt: number }> + .all(session.id) as Array<{ status: string }> - if (errorParts.length > 0 && errorParts[0].cnt > 0) { + if (lastTerminal.length > 0 && lastTerminal[0].status === "error") { return "error" } - // Check for recent assistant message (thinking) const recentMessages = db .query( - `SELECT role, time_completed FROM message - WHERE session_id = ? AND role = 'assistant' - ORDER BY created DESC LIMIT 1` + `SELECT json_extract(data, '$.time.completed') as time_completed + FROM message + WHERE session_id = ? AND json_extract(data, '$.role') = 'assistant' + ORDER BY time_created DESC LIMIT 1` ) - .all(session.id) as Array<{ role: string; time_completed?: number }> + .all(session.id) as Array<{ time_completed: number | null }> if ( recentMessages.length > 0 && - recentMessages[0].role === "assistant" && - recentMessages[0].time_completed === undefined + recentMessages[0].time_completed === null ) { return "thinking" } - // Default: distinguish busy vs idle based on canonical ACTIVE_BUSY_WINDOW_MS threshold const lastUpdated = session.time.updated ?? session.time.created ?? 0 const ageMs = nowMs - lastUpdated return ageMs <= ACTIVE_BUSY_WINDOW_MS ? "busy" : "idle" } catch { - // On any error, return unknown return "unknown" } } @@ -124,6 +121,7 @@ export function findIncludedSessionsSqlite( }> const sessions: SessionMetadata[] = [] + const statusCache = new Map() for (const row of sessionRows) { if (typeof row.id !== "string" || typeof row.directory !== "string") continue @@ -144,11 +142,13 @@ export function findIncludedSessionsSqlite( time: { created: timeCreated, updated: timeUpdated }, } + const status = deriveSessionStatus(db, meta, nowMs) if ( isSessionIncluded(meta, idleWindowMs, nowMs) || - isAttentionStatus(deriveSessionStatus(db, meta, nowMs)) + isAttentionStatus(status) ) { sessions.push(meta) + statusCache.set(meta.id, status) } } @@ -156,8 +156,8 @@ export function findIncludedSessionsSqlite( // Then recency: most recent activity first (time.updated DESC) // Finally stable tie-breaker: id ascending sessions.sort((a, b) => { - const aStatus = deriveSessionStatus(db, a, nowMs) - const bStatus = deriveSessionStatus(db, b, nowMs) + const aStatus = statusCache.get(a.id) ?? "unknown" + const bStatus = statusCache.get(b.id) ?? "unknown" const aSeverity = STATUS_SEVERITY[aStatus] ?? 6 const bSeverity = STATUS_SEVERITY[bStatus] ?? 6 diff --git a/src/ingest/session.ts b/src/ingest/session.ts index 2194e7b..db22359 100644 --- a/src/ingest/session.ts +++ b/src/ingest/session.ts @@ -283,23 +283,28 @@ function readLastToolPart(partStorage: string, messageID: string): { tool: strin return null } -function hasErrorToolPart(partStorage: string, messageID: string): boolean { +function messageTerminalToolStatus(partStorage: string, messageID: string): "error" | "completed" | null { const partDir = path.join(partStorage, messageID) - if (!fs.existsSync(partDir)) return false + if (!fs.existsSync(partDir)) return null + let hasError = false + let hasCompleted = false const files = fs.readdirSync(partDir).filter((f) => f.endsWith(".json")) for (const file of files) { try { const content = fs.readFileSync(path.join(partDir, file), "utf8") const part = JSON.parse(content) as Partial - if (part.type === "tool" && part.state?.status === "error") { - return true + if (part.type === "tool") { + if (part.state?.status === "error") hasError = true + else if (part.state?.status === "completed") hasCompleted = true } } catch { continue } } - return false + if (hasError) return "error" + if (hasCompleted) return "completed" + return null } export function getMainSessionView(opts: { @@ -332,11 +337,12 @@ export function getMainSessionView(opts: { } } - let hasErrorTool = false + let lastToolErrored = false if (!activeTool) { for (const meta of recentMetas) { - if (hasErrorToolPart(opts.storage.part, meta.id)) { - hasErrorTool = true + const terminal = messageTerminalToolStatus(opts.storage.part, meta.id) + if (terminal !== null) { + lastToolErrored = terminal === "error" break } } @@ -354,7 +360,7 @@ export function getMainSessionView(opts: { } } - if (status === "unknown" && !isStaleActivity && hasErrorTool) { + if (status === "unknown" && !isStaleActivity && lastToolErrored) { status = "error" } else if (status === "unknown" && !isStaleActivity && recent?.role === "assistant" && typeof recent?.time?.created === "number" && typeof recent?.time?.completed !== "number") { status = "thinking" diff --git a/src/ingest/sqlite-derive.ts b/src/ingest/sqlite-derive.ts index 1c769bc..58fe024 100644 --- a/src/ingest/sqlite-derive.ts +++ b/src/ingest/sqlite-derive.ts @@ -1,3 +1,4 @@ +import type { Database } from "bun:sqlite" import { ACTIVE_BUSY_WINDOW_MS, BACKGROUND_RUNNING_WINDOW_MS, @@ -167,17 +168,20 @@ function readSessionMessagesAndParts(opts: { sqlitePath: string sessionId: string limit: number + db?: Database }): SqliteDeriveResult<{ metas: StoredMessageMeta[]; partsByMessage: Map }> { const metasResult = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, limit: opts.limit, + db: opts.db, }) if (!metasResult.ok) return metasResult const messageIds = metasResult.rows.map((meta) => meta.id) const partsResult = readToolPartsForMessagesSqlite({ sqlitePath: opts.sqlitePath, messageIds, + db: opts.db, }) if (!partsResult.ok) return partsResult @@ -317,10 +321,12 @@ export function pickActiveSessionIdSqlite(opts: { sqlitePath: string projectRoot: string boulderSessionIds?: string[] + db?: Database }): SqliteDeriveResult { const metasResult = readMainSessionMetasSqlite({ sqlitePath: opts.sqlitePath, directoryFilter: opts.projectRoot, + db: opts.db, }) if (!metasResult.ok) return metasResult @@ -354,7 +360,7 @@ export function pickActiveSessionIdSqlite(opts: { const ids = opts.boulderSessionIds ?? [] for (let i = ids.length - 1; i >= 0; i--) { const id = ids[i] - const messages = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: id, limit: 1 }) + const messages = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: id, limit: 1, db: opts.db }) if (!messages.ok) return messages if (messages.rows.length === 0) continue @@ -378,12 +384,14 @@ export function getMainSessionViewSqlite(opts: { sessionId: string sessionMeta?: SessionMetadata | null nowMs?: number + db?: Database }): SqliteDeriveResult { const nowMs = opts.nowMs ?? Date.now() const session = readSessionMessagesAndParts({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, limit: 200, + db: opts.db, }) if (!session.ok) return session @@ -406,14 +414,16 @@ export function getMainSessionViewSqlite(opts: { if (activeTool) break } - let hasErrorTool = false + let lastToolErrored = false if (!activeTool) { - for (const meta of session.value.metas) { + findLastTerminal: for (const meta of session.value.metas) { const parts = session.value.partsByMessage.get(meta.id) ?? [] - const errorPart = parts.find((part) => part.state.status === "error") - if (errorPart) { - hasErrorTool = true - break + for (let i = parts.length - 1; i >= 0; i--) { + const s = parts[i].state.status + if (s === "error" || s === "completed") { + lastToolErrored = s === "error" + break findLastTerminal + } } } } @@ -430,7 +440,7 @@ export function getMainSessionViewSqlite(opts: { } } - if (status === "unknown" && !isStaleActivity && hasErrorTool) { + if (status === "unknown" && !isStaleActivity && lastToolErrored) { status = "error" } else if (status === "unknown" && !isStaleActivity && recent?.role === "assistant" && typeof recent.time?.created === "number" && typeof recent.time?.completed !== "number") { status = "thinking" @@ -473,16 +483,18 @@ export function deriveBackgroundTasksSqlite(opts: { sqlitePath: string mainSessionId: string nowMs?: number + db?: Database }): SqliteDeriveResult { const nowMs = opts.nowMs ?? Date.now() const main = readSessionMessagesAndParts({ sqlitePath: opts.sqlitePath, sessionId: opts.mainSessionId, limit: 200, + db: opts.db, }) if (!main.ok) return main - const allSessionMetasResult = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath }) + const allSessionMetasResult = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath, db: opts.db }) if (!allSessionMetasResult.ok) return allSessionMetasResult const allSessionMetas = allSessionMetasResult.rows const sessionMetaById = new Map(allSessionMetas.map((m) => [m.id, m] as const)) @@ -501,6 +513,7 @@ export function deriveBackgroundTasksSqlite(opts: { sqlitePath: opts.sqlitePath, sessionId, limit: 200, + db: opts.db, }) if (!loaded.ok) return loaded backgroundMessageCache.set(sessionId, loaded.value.metas) @@ -648,12 +661,44 @@ export function deriveBackgroundTasksSqlite(opts: { return { ok: true, value: rows } } +export function deriveBackgroundTasksSqliteForSessions(opts: { + sqlitePath: string + mainSessionIds?: Array + nowMs?: number + db?: Database +}): SqliteDeriveResult { + const sessionIds: string[] = [] + const seen = new Set() + for (const value of opts.mainSessionIds ?? []) { + if (typeof value !== "string") continue + const id = value.trim() + if (!id || seen.has(id)) continue + seen.add(id) + sessionIds.push(id) + } + const rows: BackgroundTaskRow[] = [] + + for (const sessionId of sessionIds) { + const result = deriveBackgroundTasksSqlite({ + sqlitePath: opts.sqlitePath, + mainSessionId: sessionId, + nowMs: opts.nowMs, + db: opts.db, + }) + if (!result.ok) return result + rows.push(...result.value) + } + + return { ok: true, value: rows } +} + export function deriveTimeSeriesActivitySqlite(opts: { sqlitePath: string mainSessionId: string | null nowMs?: number windowMs?: number bucketMs?: number + db?: Database }): SqliteDeriveResult { const windowMs = opts.windowMs ?? 300_000 const bucketMs = opts.bucketMs ?? 2_000 @@ -668,7 +713,7 @@ export function deriveTimeSeriesActivitySqlite(opts: { const atlas = zeroBuckets(buckets) const background = zeroBuckets(buckets) - const allSessionMetas = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath }) + const allSessionMetas = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath, db: opts.db }) if (!allSessionMetas.ok) return allSessionMetas const perSessionCache = new Map }>() @@ -679,6 +724,7 @@ export function deriveTimeSeriesActivitySqlite(opts: { sqlitePath: opts.sqlitePath, sessionId, limit: 200, + db: opts.db, }) if (!loaded.ok) return loaded perSessionCache.set(sessionId, loaded.value) @@ -761,6 +807,7 @@ export function deriveTokenUsageSqlite(opts: { sqlitePath: string mainSessionId: string | null backgroundSessionIds?: Array + db?: Database }): SqliteDeriveResult> { const sessionIds: string[] = [] const seen = new Set() @@ -781,6 +828,7 @@ export function deriveTokenUsageSqlite(opts: { sqlitePath: opts.sqlitePath, sessionId, limit: TOKEN_USAGE_MESSAGE_LIMIT, + db: opts.db, }) if (!result.ok) return result metas.push(...result.rows) @@ -791,15 +839,16 @@ export function deriveTokenUsageSqlite(opts: { value: aggregateTokenUsage(metas), } } - export function deriveToolCallsSqlite(opts: { sqlitePath: string sessionId: string + db?: Database }): SqliteDeriveResult { const metasResult = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, limit: MAX_TOOL_CALL_MESSAGES, + db: opts.db, }) if (!metasResult.ok) return metasResult @@ -807,6 +856,7 @@ export function deriveToolCallsSqlite(opts: { const existsResult = readSessionExistsSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, + db: opts.db, }) if (!existsResult.ok) return existsResult return { @@ -822,6 +872,7 @@ export function deriveToolCallsSqlite(opts: { const partsResult = readToolPartsForMessagesSqlite({ sqlitePath: opts.sqlitePath, messageIds: metasResult.rows.map((meta) => meta.id), + db: opts.db, }) if (!partsResult.ok) return partsResult @@ -878,10 +929,12 @@ export function deriveToolCallsSqlite(opts: { export function deriveTodosSqlite(opts: { sqlitePath: string sessionId: string + db?: Database }): SqliteDeriveResult { const result = readTodosSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, + db: opts.db, }) if (!result.ok) return result @@ -890,3 +943,32 @@ export function deriveTodosSqlite(opts: { value: result.rows, } } + +export function deriveTodosSqliteForSessions(opts: { + sqlitePath: string + sessionIds?: Array + db?: Database +}): SqliteDeriveResult { + const sessionIds: string[] = [] + const seen = new Set() + for (const value of opts.sessionIds ?? []) { + if (typeof value !== "string") continue + const id = value.trim() + if (!id || seen.has(id)) continue + seen.add(id) + sessionIds.push(id) + } + const rows: TodoItem[] = [] + + for (const sessionId of sessionIds) { + const result = deriveTodosSqlite({ + sqlitePath: opts.sqlitePath, + sessionId, + db: opts.db, + }) + if (!result.ok) return result + rows.push(...result.value) + } + + return { ok: true, value: rows } +} diff --git a/src/ingest/storage-backend.ts b/src/ingest/storage-backend.ts index 84cfdfc..a500a2b 100644 --- a/src/ingest/storage-backend.ts +++ b/src/ingest/storage-backend.ts @@ -62,6 +62,25 @@ function withReadonlyDb(sqlitePath: string, fn: (db: BunDatabase) => T): { ok } } +/** + * Run a query against a pre-opened DB (no open/close overhead) or fall back + * to opening a fresh readonly connection via `withReadonlyDb`. + */ +function withDbOrOpen( + db: BunDatabase | undefined, + sqlitePath: string, + fn: (db: BunDatabase) => T, +): { ok: true; value: T } | { ok: false; reason: SqliteReadFailureReason } { + if (db) { + try { + return { ok: true, value: fn(db) } + } catch (error) { + return { ok: false, reason: classifySqliteError(error) } + } + } + return withReadonlyDb(sqlitePath, fn) +} + function asFiniteNumber(value: unknown): number | null { if (typeof value !== "number") return null return Number.isFinite(value) ? value : null @@ -82,6 +101,7 @@ function isToolStatus(value: unknown): value is StoredToolPart["state"]["status" export function readMainSessionMetasSqlite(opts: { sqlitePath: string directoryFilter?: string + db?: BunDatabase }): SqliteReadResult { const directoryNeedle = typeof opts.directoryFilter === "string" && opts.directoryFilter.length > 0 ? (() => { @@ -91,7 +111,7 @@ export function readMainSessionMetasSqlite(opts: { })() : null - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id, project_id, parent_id, directory, title, time_created, time_updated FROM session WHERE parent_id IS NULL ORDER BY time_updated DESC, id DESC") .all() as Array<{ @@ -141,8 +161,9 @@ export function readMainSessionMetasSqlite(opts: { export function readAllSessionMetasSqlite(opts: { sqlitePath: string + db?: BunDatabase }): SqliteReadResult { - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id, project_id, parent_id, directory, title, time_created, time_updated FROM session ORDER BY time_updated DESC, id DESC") .all() as Array<{ @@ -187,8 +208,9 @@ export function readAllSessionMetasSqlite(opts: { export function readSessionExistsSqlite(opts: { sqlitePath: string sessionId: string + db?: BunDatabase }): SqliteReadResult<{ sessionId: string }> { - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id FROM session WHERE id = ? LIMIT 1") .get(opts.sessionId) as { id?: unknown } | null @@ -204,8 +226,9 @@ export function readRecentMessageMetasSqlite(opts: { sqlitePath: string sessionId: string limit: number + db?: BunDatabase }): SqliteReadResult { - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id, session_id, time_created, data FROM message WHERE session_id = ? ORDER BY time_created DESC, id DESC LIMIT ?") .all(opts.sessionId, opts.limit) as Array<{ @@ -267,12 +290,13 @@ export function readRecentMessageMetasSqlite(opts: { export function readToolPartsForMessagesSqlite(opts: { sqlitePath: string messageIds: string[] + db?: BunDatabase }): SqliteReadResult { if (opts.messageIds.length === 0) return { ok: true, rows: [] } const placeholders = opts.messageIds.map(() => "?").join(",") const sql = `SELECT id, message_id, session_id, time_created, data FROM part WHERE message_id IN (${placeholders}) ORDER BY message_id ASC, time_created ASC, id ASC` - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db.query(sql).all(...opts.messageIds) as Array<{ id: unknown message_id: unknown @@ -337,8 +361,9 @@ export type TodoItem = { export function readTodosSqlite(opts: { sqlitePath: string sessionId: string + db?: BunDatabase }): SqliteReadResult { - const result = withReadonlyDb(opts.sqlitePath, (db) => { + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => { try { return db .query("SELECT content, status, priority, position FROM todo WHERE session_id = ? ORDER BY position ASC") diff --git a/src/server/api.ts b/src/server/api.ts index 949a0d0..4f13f40 100644 --- a/src/server/api.ts +++ b/src/server/api.ts @@ -14,24 +14,25 @@ import { assertAllowedPath } from "../ingest/paths" import { deriveToolCalls, MAX_TOOL_CALL_MESSAGES, MAX_TOOL_CALLS } from "../ingest/tool-calls" import { deriveToolCallsSqlite } from "../ingest/sqlite-derive" import type { StorageBackend } from "../ingest/storage-backend" -import { createMultiProjectService } from "./multi-project" +import type { DashboardMultiProjectPayload, TelegramServiceStatus } from "../types" const SESSION_ID_PATTERN = /^[A-Za-z0-9_-]{1,128}$/ +export type MultiProjectService = { + getMultiProjectPayload: () => Promise + invalidate: () => void +} + export function createApi(opts: { storageRoot: string storageBackend: StorageBackend - pollIntervalMs?: number + multiProjectService: MultiProjectService + telegramStatus?: () => TelegramServiceStatus version?: string }): Hono { const api = new Hono() const version = opts.version ?? "0.0.0" - - const multiProjectService = createMultiProjectService({ - storageRoot: opts.storageRoot, - storageBackend: opts.storageBackend, - pollIntervalMs: opts.pollIntervalMs, - }) + const multiProjectService = opts.multiProjectService const invalidateProjects = (): void => { multiProjectService.invalidate() } @@ -267,5 +268,15 @@ export function createApi(opts: { } }) + // --------------------------------------------------------------------------- + // GET /telegram/status — Telegram notification service status + // --------------------------------------------------------------------------- + api.get("/telegram/status", (c) => { + if (!opts.telegramStatus) { + return c.json({ ok: true, telegram: { enabled: false } }) + } + return c.json({ ok: true, telegram: opts.telegramStatus() }) + }) + return api } diff --git a/src/server/dashboard.ts b/src/server/dashboard.ts index 22e5711..a916e4b 100644 --- a/src/server/dashboard.ts +++ b/src/server/dashboard.ts @@ -1,3 +1,4 @@ +import { Database } from "bun:sqlite" import * as fs from "node:fs" import { deriveBackgroundTasks } from "../ingest/background-tasks" import * as boulderModule from "../ingest/boulder" @@ -338,184 +339,175 @@ export function buildDashboardPayload(opts: { const unintiatedPlans = scanUnintiatedPlans(opts.projectRoot, boulder?.active_plan ?? null) const planHistory = readBoulderHistorySafe(opts.projectRoot) - const active = pickActiveSessionIdSqlite({ - sqlitePath: backend.sqlitePath, - projectRoot: opts.projectRoot, - boulderSessionIds: boulder?.session_ids, - }) - if (!active.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } + let db: Database + try { + db = new Database(backend.sqlitePath, { readonly: true }) + } catch { return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) } - const sessionId = active.value - let sessionMeta: SessionMetadata | null = null - if (sessionId) { - const metas = readMainSessionMetasSqlite({ sqlitePath: backend.sqlitePath, directoryFilter: opts.projectRoot }) - if (!metas.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - sessionMeta = metas.rows.find((m) => m.id === sessionId) ?? null - } - - const main = sessionId - ? (() => { - const result = getMainSessionViewSqlite({ - sqlitePath: backend.sqlitePath, - sessionId, - sessionMeta, - nowMs, - }) - if (!result.ok) return null - return result.value - })() - : { agent: "unknown", currentTool: null, currentModel: null, lastUpdated: null, sessionLabel: "(no session)", status: "unknown" as const } - if (!main) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } + const fallback = (): DashboardPayload => { + try { db.close() } catch {} return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) } - const tasksResult = sessionId - ? deriveBackgroundTasksSqlite({ - sqlitePath: backend.sqlitePath, - mainSessionId: sessionId, - nowMs, - }) - : { ok: true as const, value: [] } - if (!tasksResult.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) + try { + const active = pickActiveSessionIdSqlite({ + sqlitePath: backend.sqlitePath, + projectRoot: opts.projectRoot, + boulderSessionIds: boulder?.session_ids, + db, + }) + if (!active.ok) return fallback() + + const sessionId = active.value + let sessionMeta: SessionMetadata | null = null + if (sessionId) { + const metas = readMainSessionMetasSqlite({ sqlitePath: backend.sqlitePath, directoryFilter: opts.projectRoot, db }) + if (!metas.ok) return fallback() + sessionMeta = metas.rows.find((m) => m.id === sessionId) ?? null } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - const timeSeriesResult = deriveTimeSeriesActivitySqlite({ - sqlitePath: backend.sqlitePath, - mainSessionId: sessionId ?? null, - nowMs, - }) - if (!timeSeriesResult.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - - const mainCurrentModel = main.currentModel - const mainSessionTasks = (() => { - if (!sessionId) return [] + const main = sessionId + ? (() => { + const result = getMainSessionViewSqlite({ + sqlitePath: backend.sqlitePath, + sessionId, + sessionMeta, + nowMs, + db, + }) + if (!result.ok) return null + return result.value + })() + : { agent: "unknown", currentTool: null, currentModel: null, lastUpdated: null, sessionLabel: "(no session)", status: "unknown" as const } + if (!main) return fallback() + + const tasksResult = sessionId + ? deriveBackgroundTasksSqlite({ + sqlitePath: backend.sqlitePath, + mainSessionId: sessionId, + nowMs, + db, + }) + : { ok: true as const, value: [] } + if (!tasksResult.ok) return fallback() - const mainStatus = main.status - const status = mainStatus === "running_tool" || mainStatus === "thinking" || mainStatus === "busy" - ? "running" - : mainStatus === "idle" - ? "idle" - : "unknown" + const timeSeriesResult = deriveTimeSeriesActivitySqlite({ + sqlitePath: backend.sqlitePath, + mainSessionId: sessionId ?? null, + nowMs, + db, + }) + if (!timeSeriesResult.ok) return fallback() + + const mainCurrentModel = main.currentModel + const mainSessionTasks = (() => { + if (!sessionId) return [] + + const mainStatus = main.status + const status = mainStatus === "running_tool" || mainStatus === "thinking" || mainStatus === "busy" + ? "running" + : mainStatus === "idle" + ? "idle" + : "unknown" + + const callsResult = deriveToolCallsSqlite({ sqlitePath: backend.sqlitePath, sessionId, db }) + if (!callsResult.ok) { + return [] + } - const callsResult = deriveToolCallsSqlite({ sqlitePath: backend.sqlitePath, sessionId }) - if (!callsResult.ok) { - return [] - } + const startAt = sessionMeta?.time?.created ?? null + const endAtMs = status === "running" ? nowMs : (main.lastUpdated ?? nowMs) + + return [ + { + id: "main-session", + description: "Main session", + subline: sessionId, + agent: main.agent, + lastModel: mainCurrentModel, + status, + toolCalls: callsResult.value.toolCalls.length, + lastTool: callsResult.value.toolCalls[0]?.tool ?? "-", + timeline: formatTimeline(startAt, endAtMs), + sessionId, + }, + ] + })() + + const tokenUsageResult = deriveTokenUsageSqlite({ + sqlitePath: backend.sqlitePath, + mainSessionId: sessionId ?? null, + backgroundSessionIds: tasksResult.value.map((task) => task.sessionId ?? null), + db, + }) + if (!tokenUsageResult.ok) return fallback() - const startAt = sessionMeta?.time?.created ?? null - const endAtMs = status === "running" ? nowMs : (main.lastUpdated ?? nowMs) + const todosResult = sessionId + ? deriveTodosSqlite({ + sqlitePath: backend.sqlitePath, + sessionId, + db, + }) + : { ok: true as const, value: [] } + const todos = todosResult.ok ? todosResult.value : [] - return [ - { - id: "main-session", - description: "Main session", - subline: sessionId, + const payload: DashboardPayload = { + mainSession: { agent: main.agent, - lastModel: mainCurrentModel, - status, - toolCalls: callsResult.value.toolCalls.length, - lastTool: callsResult.value.toolCalls[0]?.tool ?? "-", - timeline: formatTimeline(startAt, endAtMs), - sessionId, + currentModel: mainCurrentModel, + currentTool: main.currentTool ?? "-", + lastUpdatedLabel: formatIso(main.lastUpdated), + session: main.sessionLabel, + sessionId: sessionId ?? null, + statusPill: plan.planComplete && main.status === "idle" + ? mainStatusPill("plan_complete") + : mainStatusPill(main.status), }, - ] - })() - - const tokenUsageResult = deriveTokenUsageSqlite({ - sqlitePath: backend.sqlitePath, - mainSessionId: sessionId ?? null, - backgroundSessionIds: tasksResult.value.map((task) => task.sessionId ?? null), - }) - if (!tokenUsageResult.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) + planProgress: { + name: planName, + completed: plan.completed, + total: plan.total, + path: planPath, + statusPill: planStatusPill(plan), + steps: planSteps.missing ? [] : planSteps.steps, + planStale: plan.planStale, + planComplete: plan.planComplete, + boulderStatus: boulder?.status, + completedAt: boulder?.completed_at, + }, + unintiatedPlans, + planHistory, + backgroundTasks: tasksResult.value.map((t) => ({ + id: t.id, + description: t.description, + agent: t.agent, + lastModel: t.lastModel ?? null, + status: t.status, + toolCalls: t.toolCalls ?? 0, + lastTool: t.lastTool ?? "-", + timeline: typeof t.timeline === "string" ? t.timeline : "", + sessionId: t.sessionId ?? null, + })), + mainSessionTasks, + timeSeries: timeSeriesResult.value, + tokenUsage: tokenUsageResult.value, + todos, + raw: null, } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - - const todosResult = sessionId - ? deriveTodosSqlite({ - sqlitePath: backend.sqlitePath, - sessionId, - }) - : { ok: true as const, value: [] } - // Don't fail the entire payload if todos fail, just use empty array - const todos = todosResult.ok ? todosResult.value : [] - const payload: DashboardPayload = { - mainSession: { - agent: main.agent, - currentModel: mainCurrentModel, - currentTool: main.currentTool ?? "-", - lastUpdatedLabel: formatIso(main.lastUpdated), - session: main.sessionLabel, - sessionId: sessionId ?? null, - statusPill: plan.planComplete && main.status === "idle" - ? mainStatusPill("plan_complete") - : mainStatusPill(main.status), - }, - planProgress: { - name: planName, - completed: plan.completed, - total: plan.total, - path: planPath, - statusPill: planStatusPill(plan), - steps: planSteps.missing ? [] : planSteps.steps, - planStale: plan.planStale, - planComplete: plan.planComplete, - boulderStatus: boulder?.status, - completedAt: boulder?.completed_at, - }, - unintiatedPlans, - planHistory, - backgroundTasks: tasksResult.value.map((t) => ({ - id: t.id, - description: t.description, - agent: t.agent, - lastModel: t.lastModel ?? null, - status: t.status, - toolCalls: t.toolCalls ?? 0, - lastTool: t.lastTool ?? "-", - timeline: typeof t.timeline === "string" ? t.timeline : "", - sessionId: t.sessionId ?? null, - })), - mainSessionTasks, - timeSeries: timeSeriesResult.value, - tokenUsage: tokenUsageResult.value, - todos, - raw: null, - } - - payload.raw = { - mainSession: payload.mainSession, - planProgress: payload.planProgress, - backgroundTasks: payload.backgroundTasks, - mainSessionTasks: payload.mainSessionTasks, - timeSeries: payload.timeSeries, + payload.raw = { + mainSession: payload.mainSession, + planProgress: payload.planProgress, + backgroundTasks: payload.backgroundTasks, + mainSessionTasks: payload.mainSessionTasks, + timeSeries: payload.timeSeries, + } + return payload + } finally { + try { db.close() } catch {} } - return payload } // --------------------------------------------------------------------------- diff --git a/src/server/dev.ts b/src/server/dev.ts index 0a3ae5f..03a70be 100644 --- a/src/server/dev.ts +++ b/src/server/dev.ts @@ -3,6 +3,8 @@ import { Hono } from "hono"; import { readFileSync } from "node:fs"; import { dirname, resolve } from "node:path"; import { createApi } from "./api"; +import { createMultiProjectService } from "./multi-project"; +import { createTelegramService } from "./telegram"; import { selectStorageBackend, getLegacyStorageRootForBackend } from "../ingest/storage-backend"; const here = dirname(new URL(import.meta.url).pathname); @@ -13,18 +15,39 @@ const port = parseInt(process.env.OMO_PULSE_API_PORT || "51244", 10); const app = new Hono(); -// Initialize storage backend and create API router const storageBackend = selectStorageBackend(); const storageRoot = getLegacyStorageRootForBackend(storageBackend); -const apiRouter = createApi({ storageRoot, storageBackend, version: APP_VERSION }); +const multiProjectService = createMultiProjectService({ storageRoot, storageBackend }); + +const telegramBotToken = process.env.TELEGRAM_BOT_TOKEN +const telegramChatId = process.env.TELEGRAM_CHAT_ID +const telegramService = telegramBotToken && telegramChatId + ? createTelegramService( + { botToken: telegramBotToken, chatId: telegramChatId }, + () => multiProjectService.getMultiProjectPayload(), + ) + : null + +const apiRouter = createApi({ + storageRoot, + storageBackend, + multiProjectService, + telegramStatus: telegramService ? () => telegramService.getStatus() : undefined, + version: APP_VERSION, +}); -// Mount the API router app.route("/api", apiRouter); Bun.serve({ fetch: app.fetch, hostname: "127.0.0.1", port, + idleTimeout: 60, }); +if (telegramService) { + telegramService.start() + console.log("Telegram notifications enabled") +} + console.log(`Server running at http://127.0.0.1:${port}`); diff --git a/src/server/multi-project.ts b/src/server/multi-project.ts index f233a65..dad40b7 100644 --- a/src/server/multi-project.ts +++ b/src/server/multi-project.ts @@ -1,12 +1,12 @@ +import { Database } from "bun:sqlite" import { getGitUncommittedCount } from "../ingest/git-status" import { getWorktreeInfo } from "../ingest/git-worktrees" import { derivePerSessionTimeSeries } from "../ingest/per-session-timeseries" -import { isSessionIncluded } from "../ingest/session-inclusion" +import { findIncludedSessionsSqlite } from "../ingest/session-inclusion" import { getSourceById, listSources } from "../ingest/sources-registry" import { getMainSessionViewSqlite } from "../ingest/sqlite-derive" import { compareSessionsBySeverity, computeAggregateStatus, selectDisplaySession } from "../ingest/status-rollup" import { getLegacyStorageRootForBackend, type StorageBackend } from "../ingest/storage-backend" -import { readMainSessionMetasSqlite } from "../ingest/storage-backend" import type { BackgroundTaskSummary, DashboardMultiProjectPayload, @@ -75,23 +75,21 @@ function buildEmptySessionTimeSeries(nowMs: number): SessionTimeSeriesPayload { export const MULTI_PROJECT_PAYLOAD_CACHE_TTL_MS = 5_000 export const SESSION_TIMESERIES_CACHE_TTL_MS = 15_000 +export const SESSION_SUMMARY_CACHE_TTL_MS = 10_000 const INCLUDED_SESSION_IDLE_WINDOW_MS = 300_000 -function buildSessionSummary(projectRoot: string, sqlitePath: string, nowMs: number): SessionSummary[] { +function buildSessionSummary(projectRoot: string, db: Database, sqlitePath: string, nowMs: number): SessionSummary[] { try { - if (typeof readMainSessionMetasSqlite !== "function" || typeof getMainSessionViewSqlite !== "function") { - return [] - } - - const metas = readMainSessionMetasSqlite({ sqlitePath, directoryFilter: projectRoot }) - if (!metas.ok) return [] + const includedMetas = findIncludedSessionsSqlite(db, projectRoot, INCLUDED_SESSION_IDLE_WINDOW_MS) + if (includedMetas.length === 0) return [] - const summaries = metas.rows.flatMap((meta) => { + const summaries = includedMetas.flatMap((meta) => { const result = getMainSessionViewSqlite({ sqlitePath, sessionId: meta.id, sessionMeta: meta, nowMs, + db, }) if (!result.ok) return [] @@ -106,10 +104,7 @@ function buildSessionSummary(projectRoot: string, sqlitePath: string, nowMs: num lastUpdatedMs: result.value.lastUpdated ?? 0, } - const included = isSessionIncluded(meta, INCLUDED_SESSION_IDLE_WINDOW_MS, nowMs) - || summary.status === "question" - || summary.status === "error" - return included ? [summary] : [] + return [summary] }) return summaries.sort(compareSessionsBySeverity) @@ -201,6 +196,7 @@ export function createMultiProjectService(opts: { const storeBySourceId = new Map() const storeByProjectRoot = new Map() const sessionTimeSeriesByProjectRoot = new Map() + const sessionSummaryByProjectRoot = new Map() let cachedPayload: DashboardMultiProjectPayload | null = null let cachedPayloadAt = 0 @@ -233,6 +229,16 @@ export function createMultiProjectService(opts: { return empty } + function getCachedSessionSummary(projectRoot: string, db: Database, sqlitePath: string, nowMs: number): SessionSummary[] { + const cached = sessionSummaryByProjectRoot.get(projectRoot) + if (cached && nowMs - cached.fetchedAt < SESSION_SUMMARY_CACHE_TTL_MS) { + return cached.value + } + const value = buildSessionSummary(projectRoot, db, sqlitePath, nowMs) + sessionSummaryByProjectRoot.set(projectRoot, { value, fetchedAt: nowMs }) + return value + } + function getOrCreateStore(sourceId: string, projectRoot: string): DashboardStore { const existing = storeBySourceId.get(sourceId) if (existing) return existing @@ -264,28 +270,55 @@ export function createMultiProjectService(opts: { } const sources = listSources(opts.storageRoot) - const projects: ProjectSnapshot[] = [] + const snapshots: Array<{ snapshot: ProjectSnapshot; projectRoot: string }> = [] + const sqlitePath = opts.storageBackend.kind === "sqlite" ? opts.storageBackend.sqlitePath : undefined - for (const source of sources) { + let sharedDb: Database | null = null + if (sqlitePath) { try { - const entry = getSourceById(opts.storageRoot, source.id) - if (!entry) continue - - const store = getOrCreateStore(source.id, entry.projectRoot) - const payload = store.getSnapshot() - const label = source.label ?? entry.projectRoot - const sqlitePath = opts.storageBackend.kind === "sqlite" ? opts.storageBackend.sqlitePath : undefined - const sessionTimeSeries = getCachedSessionTimeSeries(entry.projectRoot, sqlitePath, nowMs) - const sessions = sqlitePath ? buildSessionSummary(entry.projectRoot, sqlitePath, nowMs) : [] - const snapshot = transformPayloadToSnapshot(source.id, label, entry.projectRoot, payload, sessions, nowMs, sessionTimeSeries) - snapshot.gitUncommittedCount = await getGitUncommittedCount(entry.projectRoot) - snapshot.worktrees = await getWorktreeInfo(entry.projectRoot) - projects.push(snapshot) + sharedDb = new Database(sqlitePath, { readonly: true }) } catch { - // Per-source error isolation: if one source fails, others still return } } + try { + for (const source of sources) { + try { + const entry = getSourceById(opts.storageRoot, source.id) + if (!entry) continue + + const store = getOrCreateStore(source.id, entry.projectRoot) + const payload = store.getSnapshot() + const label = source.label ?? entry.projectRoot + const sessionTimeSeries = getCachedSessionTimeSeries(entry.projectRoot, sqlitePath, nowMs) + const sessions = sharedDb && sqlitePath + ? getCachedSessionSummary(entry.projectRoot, sharedDb, sqlitePath, nowMs) + : [] + const snapshot = transformPayloadToSnapshot(source.id, label, entry.projectRoot, payload, sessions, nowMs, sessionTimeSeries) + snapshots.push({ snapshot, projectRoot: entry.projectRoot }) + } catch { + // Per-source error isolation: if one source fails, others still return + } + } + } finally { + try { sharedDb?.close() } catch {} + } + + // Phase 2: Parallel async git operations across all sources + await Promise.all(snapshots.map(async ({ snapshot, projectRoot }) => { + try { + const [gitCount, worktrees] = await Promise.all([ + getGitUncommittedCount(projectRoot), + getWorktreeInfo(projectRoot), + ]) + snapshot.gitUncommittedCount = gitCount + snapshot.worktrees = worktrees + } catch { + // Git failures are isolated per-source + } + })) + + const projects = snapshots.map((s) => s.snapshot) const payload = { projects, serverNowMs: nowMs, @@ -293,7 +326,7 @@ export function createMultiProjectService(opts: { } cachedPayload = payload - cachedPayloadAt = nowMs + cachedPayloadAt = Date.now() return payload } @@ -301,6 +334,7 @@ export function createMultiProjectService(opts: { cachedPayload = null cachedPayloadAt = 0 sessionTimeSeriesByProjectRoot.clear() + sessionSummaryByProjectRoot.clear() } return { getMultiProjectPayload, invalidate } diff --git a/src/server/start.ts b/src/server/start.ts index 2adc68e..90440ae 100644 --- a/src/server/start.ts +++ b/src/server/start.ts @@ -3,6 +3,8 @@ import { Hono } from "hono"; import { readFileSync } from "node:fs"; import { dirname, join, resolve } from "node:path"; import { createApi } from "./api"; +import { createMultiProjectService } from "./multi-project"; +import { createTelegramService } from "./telegram"; import { selectStorageBackend, getLegacyStorageRootForBackend } from "../ingest/storage-backend"; const here = dirname(new URL(import.meta.url).pathname); @@ -14,13 +16,27 @@ const app = new Hono(); const port = parseInt(process.env.OMO_PULSE_PORT || "4300", 10); const distRoot = join(import.meta.dir, "../../dist"); - -// Initialize storage backend and create API router const storageBackend = selectStorageBackend(); const storageRoot = getLegacyStorageRootForBackend(storageBackend); -const apiRouter = createApi({ storageRoot, storageBackend, version: APP_VERSION }); +const multiProjectService = createMultiProjectService({ storageRoot, storageBackend }); + +const telegramBotToken = process.env.TELEGRAM_BOT_TOKEN +const telegramChatId = process.env.TELEGRAM_CHAT_ID +const telegramService = telegramBotToken && telegramChatId + ? createTelegramService( + { botToken: telegramBotToken, chatId: telegramChatId }, + () => multiProjectService.getMultiProjectPayload(), + ) + : null + +const apiRouter = createApi({ + storageRoot, + storageBackend, + multiProjectService, + telegramStatus: telegramService ? () => telegramService.getStatus() : undefined, + version: APP_VERSION, +}); -// Mount the API router BEFORE the SPA fallback middleware app.route("/api", apiRouter); // SPA fallback middleware app.use("*", async (c, next) => { @@ -79,3 +95,8 @@ Bun.serve({ hostname: "127.0.0.1", port, }); + +if (telegramService) { + telegramService.start() + console.log("Telegram notifications enabled") +} diff --git a/src/server/telegram.ts b/src/server/telegram.ts new file mode 100644 index 0000000..e63726f --- /dev/null +++ b/src/server/telegram.ts @@ -0,0 +1,332 @@ +import type { + DashboardMultiProjectPayload, + ProjectSnapshot, + SessionStatus, + TelegramServiceConfig, + TelegramServiceStatus, +} from "../types" + +const STATUS_EMOJI: Record = { + busy: "🟢", + running_tool: "🟢", + thinking: "🟡", + idle: "⚪", + question: "❓", + plan_complete: "✅", + error: "🔴", + unknown: "⚫", +} + +const ALERT_STATUSES: ReadonlySet = new Set(["question", "error", "plan_complete"]) + +type TelegramApiResponse = { + ok: boolean + result?: { message_id: number } + description?: string + parameters?: { retry_after?: number } +} + +type TelegramState = { + pinnedMessageId: number | null + prevSessionStatuses: Map + alertedSessions: Set + lastEditMs: number + lastMessageText: string + alertsSent: number + lastError: string | null + isFirstPoll: boolean +} + +function escapeHtml(text: string): string { + return text + .replace(/&/g, "&") + .replace(//g, ">") +} + +function formatTime(ms: number): string { + const d = new Date(ms) + return `${d.getHours().toString().padStart(2, "0")}:${d.getMinutes().toString().padStart(2, "0")}` +} + +function formatRelativeTime(diffMs: number): string { + if (diffMs < 60_000) return `${Math.round(diffMs / 1000)}s ago` + if (diffMs < 3_600_000) return `${Math.round(diffMs / 60_000)}m ago` + return `${Math.round(diffMs / 3_600_000)}h ago` +} + +export function formatStatusMessage(payload: DashboardMultiProjectPayload): string { + const { projects, serverNowMs } = payload + const totalSessions = projects.reduce((sum, p) => sum + Math.max(p.sessions.length, 1), 0) + + const lines: string[] = [] + lines.push(`omo-pulse · ${projects.length} project${projects.length !== 1 ? "s" : ""} · ${totalSessions} session${totalSessions !== 1 ? "s" : ""}`) + lines.push("") + + for (const project of projects) { + if (project.sessions.length === 0) { + const emoji = STATUS_EMOJI[project.aggregateStatus] + const label = escapeHtml(project.label) + const tool = + project.mainSession.currentTool && project.mainSession.currentTool !== "-" + ? ` (${escapeHtml(project.mainSession.currentTool)})` + : "" + lines.push(`${emoji} ${label} ${project.aggregateStatus}${tool}`) + } else { + for (const session of project.sessions) { + const emoji = STATUS_EMOJI[session.status] + const label = escapeHtml(project.label) + const sessionLabel = escapeHtml(session.sessionLabel) + const tool = + session.currentTool && session.currentTool !== "-" + ? ` (${escapeHtml(session.currentTool)})` + : "" + const diffMs = serverNowMs - session.lastUpdatedMs + const relative = + session.status === "idle" && session.lastUpdatedMs > 0 && diffMs > 60_000 + ? ` ${formatRelativeTime(diffMs)}` + : "" + lines.push(`${emoji} ${label} ${sessionLabel}: ${session.status}${tool}${relative}`) + } + } + } + + if (projects.length === 0) { + lines.push("No projects registered") + } + + lines.push("") + lines.push(`🕐 ${formatTime(serverNowMs)} · updated just now`) + + return lines.join("\n") +} + +function formatAlertMessage(project: ProjectSnapshot, sessionId: string, status: SessionStatus): string { + const emoji = STATUS_EMOJI[status] + const label = escapeHtml(project.label) + const session = project.sessions.find((s) => s.sessionId === sessionId) + const sessionLabel = session ? escapeHtml(session.sessionLabel) : sessionId + + if (status === "question") { + return `${emoji} ${label} · ${sessionLabel}\nAgent is waiting for your input` + } + if (status === "error") { + return `${emoji} ${label} · ${sessionLabel}\nSession encountered an error` + } + if (status === "plan_complete") { + return `${emoji} ${label} · ${sessionLabel}\nPlan completed` + } + return `${emoji} ${label} · ${sessionLabel}: ${status}` +} + +async function telegramApi( + botToken: string, + method: string, + params: Record, +): Promise { + const url = `https://api.telegram.org/bot${botToken}/${method}` + const response = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(params), + }) + return (await response.json()) as TelegramApiResponse +} + +export type TelegramService = { + start: () => void + stop: () => void + getStatus: () => TelegramServiceStatus +} + +export function createTelegramService( + config: TelegramServiceConfig, + getPayload: () => Promise, +): TelegramService { + const pollIntervalMs = config.pollIntervalMs ?? 5000 + const debounceMs = config.debounceMs ?? 3000 + + const state: TelegramState = { + pinnedMessageId: null, + prevSessionStatuses: new Map(), + alertedSessions: new Set(), + lastEditMs: 0, + lastMessageText: "", + alertsSent: 0, + lastError: null, + isFirstPoll: true, + } + + let timer: ReturnType | null = null + + async function sendMessage(text: string): Promise { + try { + const result = await telegramApi(config.botToken, "sendMessage", { + chat_id: config.chatId, + text, + parse_mode: "HTML", + disable_notification: true, + }) + if (!result.ok) { + state.lastError = result.description ?? "sendMessage failed" + return null + } + state.lastError = null + return result.result?.message_id ?? null + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + return null + } + } + + async function editMessage(messageId: number, text: string): Promise { + try { + const result = await telegramApi(config.botToken, "editMessageText", { + chat_id: config.chatId, + message_id: messageId, + text, + parse_mode: "HTML", + }) + if (!result.ok) { + if (result.description?.includes("message is not modified")) { + return true + } + if (result.parameters?.retry_after) { + state.lastError = `Rate limited, retry after ${result.parameters.retry_after}s` + return false + } + state.lastError = result.description ?? "editMessageText failed" + return false + } + state.lastError = null + return true + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + return false + } + } + + async function pinMessage(messageId: number): Promise { + try { + const result = await telegramApi(config.botToken, "pinChatMessage", { + chat_id: config.chatId, + message_id: messageId, + disable_notification: true, + }) + if (!result.ok) { + state.lastError = result.description ?? "pinChatMessage failed" + } + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + } + } + + async function sendAlert(text: string): Promise { + try { + const result = await telegramApi(config.botToken, "sendMessage", { + chat_id: config.chatId, + text, + parse_mode: "HTML", + }) + if (result.ok) { + state.alertsSent++ + } + } catch { + // Alert failures are non-critical + } + } + + function detectAlerts(payload: DashboardMultiProjectPayload): void { + if (state.isFirstPoll) return + + for (const project of payload.projects) { + for (const session of project.sessions) { + const alertKey = `${session.sessionId}:${session.status}` + + if (ALERT_STATUSES.has(session.status)) { + if (!state.alertedSessions.has(alertKey)) { + state.alertedSessions.add(alertKey) + const alertText = formatAlertMessage(project, session.sessionId, session.status) + sendAlert(alertText) + } + } else { + for (const s of ALERT_STATUSES) { + state.alertedSessions.delete(`${session.sessionId}:${s}`) + } + } + } + } + } + + function updatePrevStatuses(payload: DashboardMultiProjectPayload): void { + state.prevSessionStatuses.clear() + for (const project of payload.projects) { + for (const session of project.sessions) { + state.prevSessionStatuses.set(session.sessionId, session.status) + } + } + } + + async function poll(): Promise { + try { + const payload = await getPayload() + const text = formatStatusMessage(payload) + + detectAlerts(payload) + updatePrevStatuses(payload) + + const nowMs = Date.now() + + if (state.pinnedMessageId === null) { + const messageId = await sendMessage(text) + if (messageId !== null) { + state.pinnedMessageId = messageId + state.lastMessageText = text + state.lastEditMs = nowMs + await pinMessage(messageId) + } + state.isFirstPoll = false + return + } + + if (nowMs - state.lastEditMs < debounceMs) return + + if (text === state.lastMessageText) return + + const success = await editMessage(state.pinnedMessageId, text) + if (success) { + state.lastMessageText = text + state.lastEditMs = nowMs + } + + state.isFirstPoll = false + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + } + } + + function start(): void { + if (timer !== null) return + poll() + timer = setInterval(poll, pollIntervalMs) + } + + function stop(): void { + if (timer !== null) { + clearInterval(timer) + timer = null + } + } + + function getStatus(): TelegramServiceStatus { + return { + enabled: true, + pinnedMessageId: state.pinnedMessageId, + lastUpdateMs: state.lastEditMs || null, + lastError: state.lastError, + alertsSent: state.alertsSent, + } + } + + return { start, stop, getStatus } +} diff --git a/src/types.ts b/src/types.ts index 69182a3..039504e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -223,3 +223,22 @@ export type ProjectOrderState = { /** Per-project visibility configuration */ export type VisibilityConfig = Record + +/** Telegram notification service configuration */ +export type TelegramServiceConfig = { + botToken: string + chatId: string + /** Polling interval in ms (default: 5000) */ + pollIntervalMs?: number + /** Debounce interval for edits in ms (default: 3000) */ + debounceMs?: number +} + +/** Telegram notification service runtime status */ +export type TelegramServiceStatus = { + enabled: boolean + pinnedMessageId: number | null + lastUpdateMs: number | null + lastError: string | null + alertsSent: number +} diff --git a/src/ui/components/ProjectStrip.css b/src/ui/components/ProjectStrip.css index 599e45d..0aaf369 100644 --- a/src/ui/components/ProjectStrip.css +++ b/src/ui/components/ProjectStrip.css @@ -57,11 +57,6 @@ 100% { box-shadow: 0 0 5px 0px color-mix(in srgb, var(--status-question-border) 50%, transparent), inset 0 1px 0 color-mix(in srgb, var(--status-question-border) 50%, transparent); } } -@keyframes status-dash-rotate { - from { transform: rotate(0deg); } - to { transform: rotate(360deg); } -} - @keyframes status-pulse { 0% { box-shadow: 0 0 0 0 color-mix(in srgb, var(--status-question-border) 0%, transparent); } 50% { box-shadow: 0 0 0 4px color-mix(in srgb, var(--status-question-border) 28%, transparent); } @@ -280,27 +275,14 @@ .strip-status-dot[data-status="question"] { color: var(--status-question); - background: var(--status-question); - border-style: dashed; - border-width: 2px; - border-color: var(--status-question-border); - box-shadow: 0 0 6px var(--status-question); - display: flex; - align-items: center; - justify-content: center; -} - -.strip-status-dot[data-status="question"]::after { - content: '?'; - position: absolute; - font-family: var(--font-mono); - font-size: 10px; - font-weight: 800; - color: #ffffff; -} - -.project-strip[data-expanded="true"] .strip-status-dot[data-status="question"] { - animation: status-dash-rotate 2s ease-in-out infinite; + background: + radial-gradient(ellipse 3px 2px at 35% 25%, rgba(255, 255, 255, 0.6) 0%, transparent 100%), + radial-gradient(circle at 35% 35%, rgba(0, 0, 0, 0.4) 0%, var(--status-question) 80%); + border-color: color-mix(in srgb, var(--status-question) 80%, #000 20%); + box-shadow: + inset 0 1px 1px rgba(255, 255, 255, 0.4), + inset 0 -2px 4px rgba(0, 0, 0, 0.6), + 0 0 10px 3px color-mix(in srgb, var(--status-question) 60%, transparent); } /* error — red, pulse + intense glow, PROMINENT */