From 4cd731a152476536a9ff7a987b743b9156dc0dbb Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 22:12:01 +0200 Subject: [PATCH 1/8] perf(ingest): reuse sqlite handles for dashboard reads --- src/ingest/session-inclusion.ts | 9 ++-- src/ingest/sqlite-derive.ts | 88 +++++++++++++++++++++++++++++++-- src/ingest/storage-backend.ts | 37 +++++++++++--- 3 files changed, 121 insertions(+), 13 deletions(-) diff --git a/src/ingest/session-inclusion.ts b/src/ingest/session-inclusion.ts index 7cfb49d..09aff92 100644 --- a/src/ingest/session-inclusion.ts +++ b/src/ingest/session-inclusion.ts @@ -124,6 +124,7 @@ export function findIncludedSessionsSqlite( }> const sessions: SessionMetadata[] = [] + const statusCache = new Map() for (const row of sessionRows) { if (typeof row.id !== "string" || typeof row.directory !== "string") continue @@ -144,11 +145,13 @@ export function findIncludedSessionsSqlite( time: { created: timeCreated, updated: timeUpdated }, } + const status = deriveSessionStatus(db, meta, nowMs) if ( isSessionIncluded(meta, idleWindowMs, nowMs) || - isAttentionStatus(deriveSessionStatus(db, meta, nowMs)) + isAttentionStatus(status) ) { sessions.push(meta) + statusCache.set(meta.id, status) } } @@ -156,8 +159,8 @@ export function findIncludedSessionsSqlite( // Then recency: most recent activity first (time.updated DESC) // Finally stable tie-breaker: id ascending sessions.sort((a, b) => { - const aStatus = deriveSessionStatus(db, a, nowMs) - const bStatus = deriveSessionStatus(db, b, nowMs) + const aStatus = statusCache.get(a.id) ?? "unknown" + const bStatus = statusCache.get(b.id) ?? "unknown" const aSeverity = STATUS_SEVERITY[aStatus] ?? 6 const bSeverity = STATUS_SEVERITY[bStatus] ?? 6 diff --git a/src/ingest/sqlite-derive.ts b/src/ingest/sqlite-derive.ts index 1c769bc..0ed0e09 100644 --- a/src/ingest/sqlite-derive.ts +++ b/src/ingest/sqlite-derive.ts @@ -1,3 +1,4 @@ +import type { Database } from "bun:sqlite" import { ACTIVE_BUSY_WINDOW_MS, BACKGROUND_RUNNING_WINDOW_MS, @@ -167,17 +168,20 @@ function readSessionMessagesAndParts(opts: { sqlitePath: string sessionId: string limit: number + db?: Database }): SqliteDeriveResult<{ metas: StoredMessageMeta[]; partsByMessage: Map }> { const metasResult = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, limit: opts.limit, + db: opts.db, }) if (!metasResult.ok) return metasResult const messageIds = metasResult.rows.map((meta) => meta.id) const partsResult = readToolPartsForMessagesSqlite({ sqlitePath: opts.sqlitePath, messageIds, + db: opts.db, }) if (!partsResult.ok) return partsResult @@ -317,10 +321,12 @@ export function pickActiveSessionIdSqlite(opts: { sqlitePath: string projectRoot: string boulderSessionIds?: string[] + db?: Database }): SqliteDeriveResult { const metasResult = readMainSessionMetasSqlite({ sqlitePath: opts.sqlitePath, directoryFilter: opts.projectRoot, + db: opts.db, }) if (!metasResult.ok) return metasResult @@ -354,7 +360,7 @@ export function pickActiveSessionIdSqlite(opts: { const ids = opts.boulderSessionIds ?? [] for (let i = ids.length - 1; i >= 0; i--) { const id = ids[i] - const messages = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: id, limit: 1 }) + const messages = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: id, limit: 1, db: opts.db }) if (!messages.ok) return messages if (messages.rows.length === 0) continue @@ -378,12 +384,14 @@ export function getMainSessionViewSqlite(opts: { sessionId: string sessionMeta?: SessionMetadata | null nowMs?: number + db?: Database }): SqliteDeriveResult { const nowMs = opts.nowMs ?? Date.now() const session = readSessionMessagesAndParts({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, limit: 200, + db: opts.db, }) if (!session.ok) return session @@ -473,16 +481,18 @@ export function deriveBackgroundTasksSqlite(opts: { sqlitePath: string mainSessionId: string nowMs?: number + db?: Database }): SqliteDeriveResult { const nowMs = opts.nowMs ?? Date.now() const main = readSessionMessagesAndParts({ sqlitePath: opts.sqlitePath, sessionId: opts.mainSessionId, limit: 200, + db: opts.db, }) if (!main.ok) return main - const allSessionMetasResult = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath }) + const allSessionMetasResult = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath, db: opts.db }) if (!allSessionMetasResult.ok) return allSessionMetasResult const allSessionMetas = allSessionMetasResult.rows const sessionMetaById = new Map(allSessionMetas.map((m) => [m.id, m] as const)) @@ -501,6 +511,7 @@ export function deriveBackgroundTasksSqlite(opts: { sqlitePath: opts.sqlitePath, sessionId, limit: 200, + db: opts.db, }) if (!loaded.ok) return loaded backgroundMessageCache.set(sessionId, loaded.value.metas) @@ -648,12 +659,44 @@ export function deriveBackgroundTasksSqlite(opts: { return { ok: true, value: rows } } +export function deriveBackgroundTasksSqliteForSessions(opts: { + sqlitePath: string + mainSessionIds?: Array + nowMs?: number + db?: Database +}): SqliteDeriveResult { + const sessionIds: string[] = [] + const seen = new Set() + for (const value of opts.mainSessionIds ?? []) { + if (typeof value !== "string") continue + const id = value.trim() + if (!id || seen.has(id)) continue + seen.add(id) + sessionIds.push(id) + } + const rows: BackgroundTaskRow[] = [] + + for (const sessionId of sessionIds) { + const result = deriveBackgroundTasksSqlite({ + sqlitePath: opts.sqlitePath, + mainSessionId: sessionId, + nowMs: opts.nowMs, + db: opts.db, + }) + if (!result.ok) return result + rows.push(...result.value) + } + + return { ok: true, value: rows } +} + export function deriveTimeSeriesActivitySqlite(opts: { sqlitePath: string mainSessionId: string | null nowMs?: number windowMs?: number bucketMs?: number + db?: Database }): SqliteDeriveResult { const windowMs = opts.windowMs ?? 300_000 const bucketMs = opts.bucketMs ?? 2_000 @@ -668,7 +711,7 @@ export function deriveTimeSeriesActivitySqlite(opts: { const atlas = zeroBuckets(buckets) const background = zeroBuckets(buckets) - const allSessionMetas = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath }) + const allSessionMetas = readAllSessionMetasSqlite({ sqlitePath: opts.sqlitePath, db: opts.db }) if (!allSessionMetas.ok) return allSessionMetas const perSessionCache = new Map }>() @@ -679,6 +722,7 @@ export function deriveTimeSeriesActivitySqlite(opts: { sqlitePath: opts.sqlitePath, sessionId, limit: 200, + db: opts.db, }) if (!loaded.ok) return loaded perSessionCache.set(sessionId, loaded.value) @@ -761,6 +805,7 @@ export function deriveTokenUsageSqlite(opts: { sqlitePath: string mainSessionId: string | null backgroundSessionIds?: Array + db?: Database }): SqliteDeriveResult> { const sessionIds: string[] = [] const seen = new Set() @@ -781,6 +826,7 @@ export function deriveTokenUsageSqlite(opts: { sqlitePath: opts.sqlitePath, sessionId, limit: TOKEN_USAGE_MESSAGE_LIMIT, + db: opts.db, }) if (!result.ok) return result metas.push(...result.rows) @@ -791,15 +837,16 @@ export function deriveTokenUsageSqlite(opts: { value: aggregateTokenUsage(metas), } } - export function deriveToolCallsSqlite(opts: { sqlitePath: string sessionId: string + db?: Database }): SqliteDeriveResult { const metasResult = readRecentMessageMetasSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, limit: MAX_TOOL_CALL_MESSAGES, + db: opts.db, }) if (!metasResult.ok) return metasResult @@ -807,6 +854,7 @@ export function deriveToolCallsSqlite(opts: { const existsResult = readSessionExistsSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, + db: opts.db, }) if (!existsResult.ok) return existsResult return { @@ -822,6 +870,7 @@ export function deriveToolCallsSqlite(opts: { const partsResult = readToolPartsForMessagesSqlite({ sqlitePath: opts.sqlitePath, messageIds: metasResult.rows.map((meta) => meta.id), + db: opts.db, }) if (!partsResult.ok) return partsResult @@ -878,10 +927,12 @@ export function deriveToolCallsSqlite(opts: { export function deriveTodosSqlite(opts: { sqlitePath: string sessionId: string + db?: Database }): SqliteDeriveResult { const result = readTodosSqlite({ sqlitePath: opts.sqlitePath, sessionId: opts.sessionId, + db: opts.db, }) if (!result.ok) return result @@ -890,3 +941,32 @@ export function deriveTodosSqlite(opts: { value: result.rows, } } + +export function deriveTodosSqliteForSessions(opts: { + sqlitePath: string + sessionIds?: Array + db?: Database +}): SqliteDeriveResult { + const sessionIds: string[] = [] + const seen = new Set() + for (const value of opts.sessionIds ?? []) { + if (typeof value !== "string") continue + const id = value.trim() + if (!id || seen.has(id)) continue + seen.add(id) + sessionIds.push(id) + } + const rows: TodoItem[] = [] + + for (const sessionId of sessionIds) { + const result = deriveTodosSqlite({ + sqlitePath: opts.sqlitePath, + sessionId, + db: opts.db, + }) + if (!result.ok) return result + rows.push(...result.value) + } + + return { ok: true, value: rows } +} diff --git a/src/ingest/storage-backend.ts b/src/ingest/storage-backend.ts index 84cfdfc..a500a2b 100644 --- a/src/ingest/storage-backend.ts +++ b/src/ingest/storage-backend.ts @@ -62,6 +62,25 @@ function withReadonlyDb(sqlitePath: string, fn: (db: BunDatabase) => T): { ok } } +/** + * Run a query against a pre-opened DB (no open/close overhead) or fall back + * to opening a fresh readonly connection via `withReadonlyDb`. + */ +function withDbOrOpen( + db: BunDatabase | undefined, + sqlitePath: string, + fn: (db: BunDatabase) => T, +): { ok: true; value: T } | { ok: false; reason: SqliteReadFailureReason } { + if (db) { + try { + return { ok: true, value: fn(db) } + } catch (error) { + return { ok: false, reason: classifySqliteError(error) } + } + } + return withReadonlyDb(sqlitePath, fn) +} + function asFiniteNumber(value: unknown): number | null { if (typeof value !== "number") return null return Number.isFinite(value) ? value : null @@ -82,6 +101,7 @@ function isToolStatus(value: unknown): value is StoredToolPart["state"]["status" export function readMainSessionMetasSqlite(opts: { sqlitePath: string directoryFilter?: string + db?: BunDatabase }): SqliteReadResult { const directoryNeedle = typeof opts.directoryFilter === "string" && opts.directoryFilter.length > 0 ? (() => { @@ -91,7 +111,7 @@ export function readMainSessionMetasSqlite(opts: { })() : null - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id, project_id, parent_id, directory, title, time_created, time_updated FROM session WHERE parent_id IS NULL ORDER BY time_updated DESC, id DESC") .all() as Array<{ @@ -141,8 +161,9 @@ export function readMainSessionMetasSqlite(opts: { export function readAllSessionMetasSqlite(opts: { sqlitePath: string + db?: BunDatabase }): SqliteReadResult { - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id, project_id, parent_id, directory, title, time_created, time_updated FROM session ORDER BY time_updated DESC, id DESC") .all() as Array<{ @@ -187,8 +208,9 @@ export function readAllSessionMetasSqlite(opts: { export function readSessionExistsSqlite(opts: { sqlitePath: string sessionId: string + db?: BunDatabase }): SqliteReadResult<{ sessionId: string }> { - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id FROM session WHERE id = ? LIMIT 1") .get(opts.sessionId) as { id?: unknown } | null @@ -204,8 +226,9 @@ export function readRecentMessageMetasSqlite(opts: { sqlitePath: string sessionId: string limit: number + db?: BunDatabase }): SqliteReadResult { - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db .query("SELECT id, session_id, time_created, data FROM message WHERE session_id = ? ORDER BY time_created DESC, id DESC LIMIT ?") .all(opts.sessionId, opts.limit) as Array<{ @@ -267,12 +290,13 @@ export function readRecentMessageMetasSqlite(opts: { export function readToolPartsForMessagesSqlite(opts: { sqlitePath: string messageIds: string[] + db?: BunDatabase }): SqliteReadResult { if (opts.messageIds.length === 0) return { ok: true, rows: [] } const placeholders = opts.messageIds.map(() => "?").join(",") const sql = `SELECT id, message_id, session_id, time_created, data FROM part WHERE message_id IN (${placeholders}) ORDER BY message_id ASC, time_created ASC, id ASC` - const result = withReadonlyDb(opts.sqlitePath, (db) => + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => db.query(sql).all(...opts.messageIds) as Array<{ id: unknown message_id: unknown @@ -337,8 +361,9 @@ export type TodoItem = { export function readTodosSqlite(opts: { sqlitePath: string sessionId: string + db?: BunDatabase }): SqliteReadResult { - const result = withReadonlyDb(opts.sqlitePath, (db) => { + const result = withDbOrOpen(opts.db, opts.sqlitePath, (db) => { try { return db .query("SELECT content, status, priority, position FROM todo WHERE session_id = ? ORDER BY position ASC") From 37fb7c012a3f1cd15db695b4ce3fd24209285ae4 Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 22:12:14 +0200 Subject: [PATCH 2/8] perf(server): cache sqlite-backed project aggregation --- src/server/dashboard.ts | 314 ++++++++++++++++++------------------ src/server/multi-project.ts | 96 +++++++---- 2 files changed, 218 insertions(+), 192 deletions(-) diff --git a/src/server/dashboard.ts b/src/server/dashboard.ts index 22e5711..a916e4b 100644 --- a/src/server/dashboard.ts +++ b/src/server/dashboard.ts @@ -1,3 +1,4 @@ +import { Database } from "bun:sqlite" import * as fs from "node:fs" import { deriveBackgroundTasks } from "../ingest/background-tasks" import * as boulderModule from "../ingest/boulder" @@ -338,184 +339,175 @@ export function buildDashboardPayload(opts: { const unintiatedPlans = scanUnintiatedPlans(opts.projectRoot, boulder?.active_plan ?? null) const planHistory = readBoulderHistorySafe(opts.projectRoot) - const active = pickActiveSessionIdSqlite({ - sqlitePath: backend.sqlitePath, - projectRoot: opts.projectRoot, - boulderSessionIds: boulder?.session_ids, - }) - if (!active.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } + let db: Database + try { + db = new Database(backend.sqlitePath, { readonly: true }) + } catch { return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) } - const sessionId = active.value - let sessionMeta: SessionMetadata | null = null - if (sessionId) { - const metas = readMainSessionMetasSqlite({ sqlitePath: backend.sqlitePath, directoryFilter: opts.projectRoot }) - if (!metas.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - sessionMeta = metas.rows.find((m) => m.id === sessionId) ?? null - } - - const main = sessionId - ? (() => { - const result = getMainSessionViewSqlite({ - sqlitePath: backend.sqlitePath, - sessionId, - sessionMeta, - nowMs, - }) - if (!result.ok) return null - return result.value - })() - : { agent: "unknown", currentTool: null, currentModel: null, lastUpdated: null, sessionLabel: "(no session)", status: "unknown" as const } - if (!main) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } + const fallback = (): DashboardPayload => { + try { db.close() } catch {} return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) } - const tasksResult = sessionId - ? deriveBackgroundTasksSqlite({ - sqlitePath: backend.sqlitePath, - mainSessionId: sessionId, - nowMs, - }) - : { ok: true as const, value: [] } - if (!tasksResult.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) + try { + const active = pickActiveSessionIdSqlite({ + sqlitePath: backend.sqlitePath, + projectRoot: opts.projectRoot, + boulderSessionIds: boulder?.session_ids, + db, + }) + if (!active.ok) return fallback() + + const sessionId = active.value + let sessionMeta: SessionMetadata | null = null + if (sessionId) { + const metas = readMainSessionMetasSqlite({ sqlitePath: backend.sqlitePath, directoryFilter: opts.projectRoot, db }) + if (!metas.ok) return fallback() + sessionMeta = metas.rows.find((m) => m.id === sessionId) ?? null } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - const timeSeriesResult = deriveTimeSeriesActivitySqlite({ - sqlitePath: backend.sqlitePath, - mainSessionId: sessionId ?? null, - nowMs, - }) - if (!timeSeriesResult.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - - const mainCurrentModel = main.currentModel - const mainSessionTasks = (() => { - if (!sessionId) return [] + const main = sessionId + ? (() => { + const result = getMainSessionViewSqlite({ + sqlitePath: backend.sqlitePath, + sessionId, + sessionMeta, + nowMs, + db, + }) + if (!result.ok) return null + return result.value + })() + : { agent: "unknown", currentTool: null, currentModel: null, lastUpdated: null, sessionLabel: "(no session)", status: "unknown" as const } + if (!main) return fallback() + + const tasksResult = sessionId + ? deriveBackgroundTasksSqlite({ + sqlitePath: backend.sqlitePath, + mainSessionId: sessionId, + nowMs, + db, + }) + : { ok: true as const, value: [] } + if (!tasksResult.ok) return fallback() - const mainStatus = main.status - const status = mainStatus === "running_tool" || mainStatus === "thinking" || mainStatus === "busy" - ? "running" - : mainStatus === "idle" - ? "idle" - : "unknown" + const timeSeriesResult = deriveTimeSeriesActivitySqlite({ + sqlitePath: backend.sqlitePath, + mainSessionId: sessionId ?? null, + nowMs, + db, + }) + if (!timeSeriesResult.ok) return fallback() + + const mainCurrentModel = main.currentModel + const mainSessionTasks = (() => { + if (!sessionId) return [] + + const mainStatus = main.status + const status = mainStatus === "running_tool" || mainStatus === "thinking" || mainStatus === "busy" + ? "running" + : mainStatus === "idle" + ? "idle" + : "unknown" + + const callsResult = deriveToolCallsSqlite({ sqlitePath: backend.sqlitePath, sessionId, db }) + if (!callsResult.ok) { + return [] + } - const callsResult = deriveToolCallsSqlite({ sqlitePath: backend.sqlitePath, sessionId }) - if (!callsResult.ok) { - return [] - } + const startAt = sessionMeta?.time?.created ?? null + const endAtMs = status === "running" ? nowMs : (main.lastUpdated ?? nowMs) + + return [ + { + id: "main-session", + description: "Main session", + subline: sessionId, + agent: main.agent, + lastModel: mainCurrentModel, + status, + toolCalls: callsResult.value.toolCalls.length, + lastTool: callsResult.value.toolCalls[0]?.tool ?? "-", + timeline: formatTimeline(startAt, endAtMs), + sessionId, + }, + ] + })() + + const tokenUsageResult = deriveTokenUsageSqlite({ + sqlitePath: backend.sqlitePath, + mainSessionId: sessionId ?? null, + backgroundSessionIds: tasksResult.value.map((task) => task.sessionId ?? null), + db, + }) + if (!tokenUsageResult.ok) return fallback() - const startAt = sessionMeta?.time?.created ?? null - const endAtMs = status === "running" ? nowMs : (main.lastUpdated ?? nowMs) + const todosResult = sessionId + ? deriveTodosSqlite({ + sqlitePath: backend.sqlitePath, + sessionId, + db, + }) + : { ok: true as const, value: [] } + const todos = todosResult.ok ? todosResult.value : [] - return [ - { - id: "main-session", - description: "Main session", - subline: sessionId, + const payload: DashboardPayload = { + mainSession: { agent: main.agent, - lastModel: mainCurrentModel, - status, - toolCalls: callsResult.value.toolCalls.length, - lastTool: callsResult.value.toolCalls[0]?.tool ?? "-", - timeline: formatTimeline(startAt, endAtMs), - sessionId, + currentModel: mainCurrentModel, + currentTool: main.currentTool ?? "-", + lastUpdatedLabel: formatIso(main.lastUpdated), + session: main.sessionLabel, + sessionId: sessionId ?? null, + statusPill: plan.planComplete && main.status === "idle" + ? mainStatusPill("plan_complete") + : mainStatusPill(main.status), }, - ] - })() - - const tokenUsageResult = deriveTokenUsageSqlite({ - sqlitePath: backend.sqlitePath, - mainSessionId: sessionId ?? null, - backgroundSessionIds: tasksResult.value.map((task) => task.sessionId ?? null), - }) - if (!tokenUsageResult.ok) { - if (hasLegacyStorageRoots(opts.storage)) { - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) + planProgress: { + name: planName, + completed: plan.completed, + total: plan.total, + path: planPath, + statusPill: planStatusPill(plan), + steps: planSteps.missing ? [] : planSteps.steps, + planStale: plan.planStale, + planComplete: plan.planComplete, + boulderStatus: boulder?.status, + completedAt: boulder?.completed_at, + }, + unintiatedPlans, + planHistory, + backgroundTasks: tasksResult.value.map((t) => ({ + id: t.id, + description: t.description, + agent: t.agent, + lastModel: t.lastModel ?? null, + status: t.status, + toolCalls: t.toolCalls ?? 0, + lastTool: t.lastTool ?? "-", + timeline: typeof t.timeline === "string" ? t.timeline : "", + sessionId: t.sessionId ?? null, + })), + mainSessionTasks, + timeSeries: timeSeriesResult.value, + tokenUsage: tokenUsageResult.value, + todos, + raw: null, } - return buildDashboardPayloadFiles({ projectRoot: opts.projectRoot, storage: opts.storage, nowMs }) - } - - const todosResult = sessionId - ? deriveTodosSqlite({ - sqlitePath: backend.sqlitePath, - sessionId, - }) - : { ok: true as const, value: [] } - // Don't fail the entire payload if todos fail, just use empty array - const todos = todosResult.ok ? todosResult.value : [] - const payload: DashboardPayload = { - mainSession: { - agent: main.agent, - currentModel: mainCurrentModel, - currentTool: main.currentTool ?? "-", - lastUpdatedLabel: formatIso(main.lastUpdated), - session: main.sessionLabel, - sessionId: sessionId ?? null, - statusPill: plan.planComplete && main.status === "idle" - ? mainStatusPill("plan_complete") - : mainStatusPill(main.status), - }, - planProgress: { - name: planName, - completed: plan.completed, - total: plan.total, - path: planPath, - statusPill: planStatusPill(plan), - steps: planSteps.missing ? [] : planSteps.steps, - planStale: plan.planStale, - planComplete: plan.planComplete, - boulderStatus: boulder?.status, - completedAt: boulder?.completed_at, - }, - unintiatedPlans, - planHistory, - backgroundTasks: tasksResult.value.map((t) => ({ - id: t.id, - description: t.description, - agent: t.agent, - lastModel: t.lastModel ?? null, - status: t.status, - toolCalls: t.toolCalls ?? 0, - lastTool: t.lastTool ?? "-", - timeline: typeof t.timeline === "string" ? t.timeline : "", - sessionId: t.sessionId ?? null, - })), - mainSessionTasks, - timeSeries: timeSeriesResult.value, - tokenUsage: tokenUsageResult.value, - todos, - raw: null, - } - - payload.raw = { - mainSession: payload.mainSession, - planProgress: payload.planProgress, - backgroundTasks: payload.backgroundTasks, - mainSessionTasks: payload.mainSessionTasks, - timeSeries: payload.timeSeries, + payload.raw = { + mainSession: payload.mainSession, + planProgress: payload.planProgress, + backgroundTasks: payload.backgroundTasks, + mainSessionTasks: payload.mainSessionTasks, + timeSeries: payload.timeSeries, + } + return payload + } finally { + try { db.close() } catch {} } - return payload } // --------------------------------------------------------------------------- diff --git a/src/server/multi-project.ts b/src/server/multi-project.ts index f233a65..dad40b7 100644 --- a/src/server/multi-project.ts +++ b/src/server/multi-project.ts @@ -1,12 +1,12 @@ +import { Database } from "bun:sqlite" import { getGitUncommittedCount } from "../ingest/git-status" import { getWorktreeInfo } from "../ingest/git-worktrees" import { derivePerSessionTimeSeries } from "../ingest/per-session-timeseries" -import { isSessionIncluded } from "../ingest/session-inclusion" +import { findIncludedSessionsSqlite } from "../ingest/session-inclusion" import { getSourceById, listSources } from "../ingest/sources-registry" import { getMainSessionViewSqlite } from "../ingest/sqlite-derive" import { compareSessionsBySeverity, computeAggregateStatus, selectDisplaySession } from "../ingest/status-rollup" import { getLegacyStorageRootForBackend, type StorageBackend } from "../ingest/storage-backend" -import { readMainSessionMetasSqlite } from "../ingest/storage-backend" import type { BackgroundTaskSummary, DashboardMultiProjectPayload, @@ -75,23 +75,21 @@ function buildEmptySessionTimeSeries(nowMs: number): SessionTimeSeriesPayload { export const MULTI_PROJECT_PAYLOAD_CACHE_TTL_MS = 5_000 export const SESSION_TIMESERIES_CACHE_TTL_MS = 15_000 +export const SESSION_SUMMARY_CACHE_TTL_MS = 10_000 const INCLUDED_SESSION_IDLE_WINDOW_MS = 300_000 -function buildSessionSummary(projectRoot: string, sqlitePath: string, nowMs: number): SessionSummary[] { +function buildSessionSummary(projectRoot: string, db: Database, sqlitePath: string, nowMs: number): SessionSummary[] { try { - if (typeof readMainSessionMetasSqlite !== "function" || typeof getMainSessionViewSqlite !== "function") { - return [] - } - - const metas = readMainSessionMetasSqlite({ sqlitePath, directoryFilter: projectRoot }) - if (!metas.ok) return [] + const includedMetas = findIncludedSessionsSqlite(db, projectRoot, INCLUDED_SESSION_IDLE_WINDOW_MS) + if (includedMetas.length === 0) return [] - const summaries = metas.rows.flatMap((meta) => { + const summaries = includedMetas.flatMap((meta) => { const result = getMainSessionViewSqlite({ sqlitePath, sessionId: meta.id, sessionMeta: meta, nowMs, + db, }) if (!result.ok) return [] @@ -106,10 +104,7 @@ function buildSessionSummary(projectRoot: string, sqlitePath: string, nowMs: num lastUpdatedMs: result.value.lastUpdated ?? 0, } - const included = isSessionIncluded(meta, INCLUDED_SESSION_IDLE_WINDOW_MS, nowMs) - || summary.status === "question" - || summary.status === "error" - return included ? [summary] : [] + return [summary] }) return summaries.sort(compareSessionsBySeverity) @@ -201,6 +196,7 @@ export function createMultiProjectService(opts: { const storeBySourceId = new Map() const storeByProjectRoot = new Map() const sessionTimeSeriesByProjectRoot = new Map() + const sessionSummaryByProjectRoot = new Map() let cachedPayload: DashboardMultiProjectPayload | null = null let cachedPayloadAt = 0 @@ -233,6 +229,16 @@ export function createMultiProjectService(opts: { return empty } + function getCachedSessionSummary(projectRoot: string, db: Database, sqlitePath: string, nowMs: number): SessionSummary[] { + const cached = sessionSummaryByProjectRoot.get(projectRoot) + if (cached && nowMs - cached.fetchedAt < SESSION_SUMMARY_CACHE_TTL_MS) { + return cached.value + } + const value = buildSessionSummary(projectRoot, db, sqlitePath, nowMs) + sessionSummaryByProjectRoot.set(projectRoot, { value, fetchedAt: nowMs }) + return value + } + function getOrCreateStore(sourceId: string, projectRoot: string): DashboardStore { const existing = storeBySourceId.get(sourceId) if (existing) return existing @@ -264,28 +270,55 @@ export function createMultiProjectService(opts: { } const sources = listSources(opts.storageRoot) - const projects: ProjectSnapshot[] = [] + const snapshots: Array<{ snapshot: ProjectSnapshot; projectRoot: string }> = [] + const sqlitePath = opts.storageBackend.kind === "sqlite" ? opts.storageBackend.sqlitePath : undefined - for (const source of sources) { + let sharedDb: Database | null = null + if (sqlitePath) { try { - const entry = getSourceById(opts.storageRoot, source.id) - if (!entry) continue - - const store = getOrCreateStore(source.id, entry.projectRoot) - const payload = store.getSnapshot() - const label = source.label ?? entry.projectRoot - const sqlitePath = opts.storageBackend.kind === "sqlite" ? opts.storageBackend.sqlitePath : undefined - const sessionTimeSeries = getCachedSessionTimeSeries(entry.projectRoot, sqlitePath, nowMs) - const sessions = sqlitePath ? buildSessionSummary(entry.projectRoot, sqlitePath, nowMs) : [] - const snapshot = transformPayloadToSnapshot(source.id, label, entry.projectRoot, payload, sessions, nowMs, sessionTimeSeries) - snapshot.gitUncommittedCount = await getGitUncommittedCount(entry.projectRoot) - snapshot.worktrees = await getWorktreeInfo(entry.projectRoot) - projects.push(snapshot) + sharedDb = new Database(sqlitePath, { readonly: true }) } catch { - // Per-source error isolation: if one source fails, others still return } } + try { + for (const source of sources) { + try { + const entry = getSourceById(opts.storageRoot, source.id) + if (!entry) continue + + const store = getOrCreateStore(source.id, entry.projectRoot) + const payload = store.getSnapshot() + const label = source.label ?? entry.projectRoot + const sessionTimeSeries = getCachedSessionTimeSeries(entry.projectRoot, sqlitePath, nowMs) + const sessions = sharedDb && sqlitePath + ? getCachedSessionSummary(entry.projectRoot, sharedDb, sqlitePath, nowMs) + : [] + const snapshot = transformPayloadToSnapshot(source.id, label, entry.projectRoot, payload, sessions, nowMs, sessionTimeSeries) + snapshots.push({ snapshot, projectRoot: entry.projectRoot }) + } catch { + // Per-source error isolation: if one source fails, others still return + } + } + } finally { + try { sharedDb?.close() } catch {} + } + + // Phase 2: Parallel async git operations across all sources + await Promise.all(snapshots.map(async ({ snapshot, projectRoot }) => { + try { + const [gitCount, worktrees] = await Promise.all([ + getGitUncommittedCount(projectRoot), + getWorktreeInfo(projectRoot), + ]) + snapshot.gitUncommittedCount = gitCount + snapshot.worktrees = worktrees + } catch { + // Git failures are isolated per-source + } + })) + + const projects = snapshots.map((s) => s.snapshot) const payload = { projects, serverNowMs: nowMs, @@ -293,7 +326,7 @@ export function createMultiProjectService(opts: { } cachedPayload = payload - cachedPayloadAt = nowMs + cachedPayloadAt = Date.now() return payload } @@ -301,6 +334,7 @@ export function createMultiProjectService(opts: { cachedPayload = null cachedPayloadAt = 0 sessionTimeSeriesByProjectRoot.clear() + sessionSummaryByProjectRoot.clear() } return { getMultiProjectPayload, invalidate } From 0381c1f0139af543dcb9dd57bb0a6fb4402c3212 Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 22:12:27 +0200 Subject: [PATCH 3/8] feat(server): expose source management and telegram status --- src/__tests__/api.test.ts | 17 +++++------------ src/server/api.ts | 27 +++++++++++++++++++-------- src/types.ts | 19 +++++++++++++++++++ 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/src/__tests__/api.test.ts b/src/__tests__/api.test.ts index cc507b3..fa9243e 100644 --- a/src/__tests__/api.test.ts +++ b/src/__tests__/api.test.ts @@ -22,16 +22,6 @@ vi.mock("../ingest/sources-registry", () => ({ getSourceById: vi.fn(() => null), })) -vi.mock("../server/multi-project", () => ({ - createMultiProjectService: vi.fn(() => ({ - getMultiProjectPayload: vi.fn(async (): Promise => ({ - projects: [], - serverNowMs: Date.now(), - pollIntervalMs: 2000, - })), - })), -})) - vi.mock("../ingest/session", () => ({ getStorageRoots: vi.fn(() => ({ session: "/tmp/session", @@ -63,7 +53,6 @@ vi.mock("../ingest/sqlite-derive", () => ({ // Import AFTER mocking // --------------------------------------------------------------------------- import { createApi } from "../server/api" -import { createMultiProjectService } from "../server/multi-project" import type { ProjectSnapshot } from "../types" // --------------------------------------------------------------------------- @@ -83,6 +72,8 @@ function makeProjectSnapshot(overrides: Partial = {}): ProjectS sessionId: "ses_abc", status: "idle", }, + sessions: [], + aggregateStatus: "idle", planProgress: { name: "plan-1", completed: 3, @@ -93,6 +84,7 @@ function makeProjectSnapshot(overrides: Partial = {}): ProjectS planStale: false, planComplete: false, }, + unintiatedPlans: [], timeSeries: { windowMs: 300000, bucketMs: 2000, @@ -127,12 +119,13 @@ describe("API routes", () => { serverNowMs: Date.now(), pollIntervalMs: 2000, })), + invalidate: vi.fn(), } - vi.mocked(createMultiProjectService).mockReturnValue(mockService) app = createApi({ storageRoot: "/tmp/test-storage", storageBackend: { kind: "sqlite", dataDir: "/tmp", sqlitePath: "/tmp/test.db" }, + multiProjectService: mockService, version: "1.0.0-test", }) }) diff --git a/src/server/api.ts b/src/server/api.ts index 949a0d0..4f13f40 100644 --- a/src/server/api.ts +++ b/src/server/api.ts @@ -14,24 +14,25 @@ import { assertAllowedPath } from "../ingest/paths" import { deriveToolCalls, MAX_TOOL_CALL_MESSAGES, MAX_TOOL_CALLS } from "../ingest/tool-calls" import { deriveToolCallsSqlite } from "../ingest/sqlite-derive" import type { StorageBackend } from "../ingest/storage-backend" -import { createMultiProjectService } from "./multi-project" +import type { DashboardMultiProjectPayload, TelegramServiceStatus } from "../types" const SESSION_ID_PATTERN = /^[A-Za-z0-9_-]{1,128}$/ +export type MultiProjectService = { + getMultiProjectPayload: () => Promise + invalidate: () => void +} + export function createApi(opts: { storageRoot: string storageBackend: StorageBackend - pollIntervalMs?: number + multiProjectService: MultiProjectService + telegramStatus?: () => TelegramServiceStatus version?: string }): Hono { const api = new Hono() const version = opts.version ?? "0.0.0" - - const multiProjectService = createMultiProjectService({ - storageRoot: opts.storageRoot, - storageBackend: opts.storageBackend, - pollIntervalMs: opts.pollIntervalMs, - }) + const multiProjectService = opts.multiProjectService const invalidateProjects = (): void => { multiProjectService.invalidate() } @@ -267,5 +268,15 @@ export function createApi(opts: { } }) + // --------------------------------------------------------------------------- + // GET /telegram/status — Telegram notification service status + // --------------------------------------------------------------------------- + api.get("/telegram/status", (c) => { + if (!opts.telegramStatus) { + return c.json({ ok: true, telegram: { enabled: false } }) + } + return c.json({ ok: true, telegram: opts.telegramStatus() }) + }) + return api } diff --git a/src/types.ts b/src/types.ts index 69182a3..039504e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -223,3 +223,22 @@ export type ProjectOrderState = { /** Per-project visibility configuration */ export type VisibilityConfig = Record + +/** Telegram notification service configuration */ +export type TelegramServiceConfig = { + botToken: string + chatId: string + /** Polling interval in ms (default: 5000) */ + pollIntervalMs?: number + /** Debounce interval for edits in ms (default: 3000) */ + debounceMs?: number +} + +/** Telegram notification service runtime status */ +export type TelegramServiceStatus = { + enabled: boolean + pinnedMessageId: number | null + lastUpdateMs: number | null + lastError: string | null + alertsSent: number +} From b0f653ba150c8eb327135473800fdc35db3a0ca7 Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 22:12:40 +0200 Subject: [PATCH 4/8] feat(server): add telegram notification service bootstrap --- src/server/dev.ts | 29 +++- src/server/start.ts | 29 +++- src/server/telegram.ts | 332 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 383 insertions(+), 7 deletions(-) create mode 100644 src/server/telegram.ts diff --git a/src/server/dev.ts b/src/server/dev.ts index 0a3ae5f..03a70be 100644 --- a/src/server/dev.ts +++ b/src/server/dev.ts @@ -3,6 +3,8 @@ import { Hono } from "hono"; import { readFileSync } from "node:fs"; import { dirname, resolve } from "node:path"; import { createApi } from "./api"; +import { createMultiProjectService } from "./multi-project"; +import { createTelegramService } from "./telegram"; import { selectStorageBackend, getLegacyStorageRootForBackend } from "../ingest/storage-backend"; const here = dirname(new URL(import.meta.url).pathname); @@ -13,18 +15,39 @@ const port = parseInt(process.env.OMO_PULSE_API_PORT || "51244", 10); const app = new Hono(); -// Initialize storage backend and create API router const storageBackend = selectStorageBackend(); const storageRoot = getLegacyStorageRootForBackend(storageBackend); -const apiRouter = createApi({ storageRoot, storageBackend, version: APP_VERSION }); +const multiProjectService = createMultiProjectService({ storageRoot, storageBackend }); + +const telegramBotToken = process.env.TELEGRAM_BOT_TOKEN +const telegramChatId = process.env.TELEGRAM_CHAT_ID +const telegramService = telegramBotToken && telegramChatId + ? createTelegramService( + { botToken: telegramBotToken, chatId: telegramChatId }, + () => multiProjectService.getMultiProjectPayload(), + ) + : null + +const apiRouter = createApi({ + storageRoot, + storageBackend, + multiProjectService, + telegramStatus: telegramService ? () => telegramService.getStatus() : undefined, + version: APP_VERSION, +}); -// Mount the API router app.route("/api", apiRouter); Bun.serve({ fetch: app.fetch, hostname: "127.0.0.1", port, + idleTimeout: 60, }); +if (telegramService) { + telegramService.start() + console.log("Telegram notifications enabled") +} + console.log(`Server running at http://127.0.0.1:${port}`); diff --git a/src/server/start.ts b/src/server/start.ts index 2adc68e..90440ae 100644 --- a/src/server/start.ts +++ b/src/server/start.ts @@ -3,6 +3,8 @@ import { Hono } from "hono"; import { readFileSync } from "node:fs"; import { dirname, join, resolve } from "node:path"; import { createApi } from "./api"; +import { createMultiProjectService } from "./multi-project"; +import { createTelegramService } from "./telegram"; import { selectStorageBackend, getLegacyStorageRootForBackend } from "../ingest/storage-backend"; const here = dirname(new URL(import.meta.url).pathname); @@ -14,13 +16,27 @@ const app = new Hono(); const port = parseInt(process.env.OMO_PULSE_PORT || "4300", 10); const distRoot = join(import.meta.dir, "../../dist"); - -// Initialize storage backend and create API router const storageBackend = selectStorageBackend(); const storageRoot = getLegacyStorageRootForBackend(storageBackend); -const apiRouter = createApi({ storageRoot, storageBackend, version: APP_VERSION }); +const multiProjectService = createMultiProjectService({ storageRoot, storageBackend }); + +const telegramBotToken = process.env.TELEGRAM_BOT_TOKEN +const telegramChatId = process.env.TELEGRAM_CHAT_ID +const telegramService = telegramBotToken && telegramChatId + ? createTelegramService( + { botToken: telegramBotToken, chatId: telegramChatId }, + () => multiProjectService.getMultiProjectPayload(), + ) + : null + +const apiRouter = createApi({ + storageRoot, + storageBackend, + multiProjectService, + telegramStatus: telegramService ? () => telegramService.getStatus() : undefined, + version: APP_VERSION, +}); -// Mount the API router BEFORE the SPA fallback middleware app.route("/api", apiRouter); // SPA fallback middleware app.use("*", async (c, next) => { @@ -79,3 +95,8 @@ Bun.serve({ hostname: "127.0.0.1", port, }); + +if (telegramService) { + telegramService.start() + console.log("Telegram notifications enabled") +} diff --git a/src/server/telegram.ts b/src/server/telegram.ts new file mode 100644 index 0000000..e63726f --- /dev/null +++ b/src/server/telegram.ts @@ -0,0 +1,332 @@ +import type { + DashboardMultiProjectPayload, + ProjectSnapshot, + SessionStatus, + TelegramServiceConfig, + TelegramServiceStatus, +} from "../types" + +const STATUS_EMOJI: Record = { + busy: "🟢", + running_tool: "🟢", + thinking: "🟡", + idle: "⚪", + question: "❓", + plan_complete: "✅", + error: "🔴", + unknown: "⚫", +} + +const ALERT_STATUSES: ReadonlySet = new Set(["question", "error", "plan_complete"]) + +type TelegramApiResponse = { + ok: boolean + result?: { message_id: number } + description?: string + parameters?: { retry_after?: number } +} + +type TelegramState = { + pinnedMessageId: number | null + prevSessionStatuses: Map + alertedSessions: Set + lastEditMs: number + lastMessageText: string + alertsSent: number + lastError: string | null + isFirstPoll: boolean +} + +function escapeHtml(text: string): string { + return text + .replace(/&/g, "&") + .replace(//g, ">") +} + +function formatTime(ms: number): string { + const d = new Date(ms) + return `${d.getHours().toString().padStart(2, "0")}:${d.getMinutes().toString().padStart(2, "0")}` +} + +function formatRelativeTime(diffMs: number): string { + if (diffMs < 60_000) return `${Math.round(diffMs / 1000)}s ago` + if (diffMs < 3_600_000) return `${Math.round(diffMs / 60_000)}m ago` + return `${Math.round(diffMs / 3_600_000)}h ago` +} + +export function formatStatusMessage(payload: DashboardMultiProjectPayload): string { + const { projects, serverNowMs } = payload + const totalSessions = projects.reduce((sum, p) => sum + Math.max(p.sessions.length, 1), 0) + + const lines: string[] = [] + lines.push(`omo-pulse · ${projects.length} project${projects.length !== 1 ? "s" : ""} · ${totalSessions} session${totalSessions !== 1 ? "s" : ""}`) + lines.push("") + + for (const project of projects) { + if (project.sessions.length === 0) { + const emoji = STATUS_EMOJI[project.aggregateStatus] + const label = escapeHtml(project.label) + const tool = + project.mainSession.currentTool && project.mainSession.currentTool !== "-" + ? ` (${escapeHtml(project.mainSession.currentTool)})` + : "" + lines.push(`${emoji} ${label} ${project.aggregateStatus}${tool}`) + } else { + for (const session of project.sessions) { + const emoji = STATUS_EMOJI[session.status] + const label = escapeHtml(project.label) + const sessionLabel = escapeHtml(session.sessionLabel) + const tool = + session.currentTool && session.currentTool !== "-" + ? ` (${escapeHtml(session.currentTool)})` + : "" + const diffMs = serverNowMs - session.lastUpdatedMs + const relative = + session.status === "idle" && session.lastUpdatedMs > 0 && diffMs > 60_000 + ? ` ${formatRelativeTime(diffMs)}` + : "" + lines.push(`${emoji} ${label} ${sessionLabel}: ${session.status}${tool}${relative}`) + } + } + } + + if (projects.length === 0) { + lines.push("No projects registered") + } + + lines.push("") + lines.push(`🕐 ${formatTime(serverNowMs)} · updated just now`) + + return lines.join("\n") +} + +function formatAlertMessage(project: ProjectSnapshot, sessionId: string, status: SessionStatus): string { + const emoji = STATUS_EMOJI[status] + const label = escapeHtml(project.label) + const session = project.sessions.find((s) => s.sessionId === sessionId) + const sessionLabel = session ? escapeHtml(session.sessionLabel) : sessionId + + if (status === "question") { + return `${emoji} ${label} · ${sessionLabel}\nAgent is waiting for your input` + } + if (status === "error") { + return `${emoji} ${label} · ${sessionLabel}\nSession encountered an error` + } + if (status === "plan_complete") { + return `${emoji} ${label} · ${sessionLabel}\nPlan completed` + } + return `${emoji} ${label} · ${sessionLabel}: ${status}` +} + +async function telegramApi( + botToken: string, + method: string, + params: Record, +): Promise { + const url = `https://api.telegram.org/bot${botToken}/${method}` + const response = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(params), + }) + return (await response.json()) as TelegramApiResponse +} + +export type TelegramService = { + start: () => void + stop: () => void + getStatus: () => TelegramServiceStatus +} + +export function createTelegramService( + config: TelegramServiceConfig, + getPayload: () => Promise, +): TelegramService { + const pollIntervalMs = config.pollIntervalMs ?? 5000 + const debounceMs = config.debounceMs ?? 3000 + + const state: TelegramState = { + pinnedMessageId: null, + prevSessionStatuses: new Map(), + alertedSessions: new Set(), + lastEditMs: 0, + lastMessageText: "", + alertsSent: 0, + lastError: null, + isFirstPoll: true, + } + + let timer: ReturnType | null = null + + async function sendMessage(text: string): Promise { + try { + const result = await telegramApi(config.botToken, "sendMessage", { + chat_id: config.chatId, + text, + parse_mode: "HTML", + disable_notification: true, + }) + if (!result.ok) { + state.lastError = result.description ?? "sendMessage failed" + return null + } + state.lastError = null + return result.result?.message_id ?? null + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + return null + } + } + + async function editMessage(messageId: number, text: string): Promise { + try { + const result = await telegramApi(config.botToken, "editMessageText", { + chat_id: config.chatId, + message_id: messageId, + text, + parse_mode: "HTML", + }) + if (!result.ok) { + if (result.description?.includes("message is not modified")) { + return true + } + if (result.parameters?.retry_after) { + state.lastError = `Rate limited, retry after ${result.parameters.retry_after}s` + return false + } + state.lastError = result.description ?? "editMessageText failed" + return false + } + state.lastError = null + return true + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + return false + } + } + + async function pinMessage(messageId: number): Promise { + try { + const result = await telegramApi(config.botToken, "pinChatMessage", { + chat_id: config.chatId, + message_id: messageId, + disable_notification: true, + }) + if (!result.ok) { + state.lastError = result.description ?? "pinChatMessage failed" + } + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + } + } + + async function sendAlert(text: string): Promise { + try { + const result = await telegramApi(config.botToken, "sendMessage", { + chat_id: config.chatId, + text, + parse_mode: "HTML", + }) + if (result.ok) { + state.alertsSent++ + } + } catch { + // Alert failures are non-critical + } + } + + function detectAlerts(payload: DashboardMultiProjectPayload): void { + if (state.isFirstPoll) return + + for (const project of payload.projects) { + for (const session of project.sessions) { + const alertKey = `${session.sessionId}:${session.status}` + + if (ALERT_STATUSES.has(session.status)) { + if (!state.alertedSessions.has(alertKey)) { + state.alertedSessions.add(alertKey) + const alertText = formatAlertMessage(project, session.sessionId, session.status) + sendAlert(alertText) + } + } else { + for (const s of ALERT_STATUSES) { + state.alertedSessions.delete(`${session.sessionId}:${s}`) + } + } + } + } + } + + function updatePrevStatuses(payload: DashboardMultiProjectPayload): void { + state.prevSessionStatuses.clear() + for (const project of payload.projects) { + for (const session of project.sessions) { + state.prevSessionStatuses.set(session.sessionId, session.status) + } + } + } + + async function poll(): Promise { + try { + const payload = await getPayload() + const text = formatStatusMessage(payload) + + detectAlerts(payload) + updatePrevStatuses(payload) + + const nowMs = Date.now() + + if (state.pinnedMessageId === null) { + const messageId = await sendMessage(text) + if (messageId !== null) { + state.pinnedMessageId = messageId + state.lastMessageText = text + state.lastEditMs = nowMs + await pinMessage(messageId) + } + state.isFirstPoll = false + return + } + + if (nowMs - state.lastEditMs < debounceMs) return + + if (text === state.lastMessageText) return + + const success = await editMessage(state.pinnedMessageId, text) + if (success) { + state.lastMessageText = text + state.lastEditMs = nowMs + } + + state.isFirstPoll = false + } catch (err) { + state.lastError = err instanceof Error ? err.message : String(err) + } + } + + function start(): void { + if (timer !== null) return + poll() + timer = setInterval(poll, pollIntervalMs) + } + + function stop(): void { + if (timer !== null) { + clearInterval(timer) + timer = null + } + } + + function getStatus(): TelegramServiceStatus { + return { + enabled: true, + pinnedMessageId: state.pinnedMessageId, + lastUpdateMs: state.lastEditMs || null, + lastError: state.lastError, + alertsSent: state.alertsSent, + } + } + + return { start, stop, getStatus } +} From c9b952351fed68d76dad3062931eb0b2dae3267a Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 22:12:56 +0200 Subject: [PATCH 5/8] fix(ui): refine question status indicator styling --- src/ui/components/ProjectStrip.css | 34 +++++++----------------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/src/ui/components/ProjectStrip.css b/src/ui/components/ProjectStrip.css index 599e45d..0aaf369 100644 --- a/src/ui/components/ProjectStrip.css +++ b/src/ui/components/ProjectStrip.css @@ -57,11 +57,6 @@ 100% { box-shadow: 0 0 5px 0px color-mix(in srgb, var(--status-question-border) 50%, transparent), inset 0 1px 0 color-mix(in srgb, var(--status-question-border) 50%, transparent); } } -@keyframes status-dash-rotate { - from { transform: rotate(0deg); } - to { transform: rotate(360deg); } -} - @keyframes status-pulse { 0% { box-shadow: 0 0 0 0 color-mix(in srgb, var(--status-question-border) 0%, transparent); } 50% { box-shadow: 0 0 0 4px color-mix(in srgb, var(--status-question-border) 28%, transparent); } @@ -280,27 +275,14 @@ .strip-status-dot[data-status="question"] { color: var(--status-question); - background: var(--status-question); - border-style: dashed; - border-width: 2px; - border-color: var(--status-question-border); - box-shadow: 0 0 6px var(--status-question); - display: flex; - align-items: center; - justify-content: center; -} - -.strip-status-dot[data-status="question"]::after { - content: '?'; - position: absolute; - font-family: var(--font-mono); - font-size: 10px; - font-weight: 800; - color: #ffffff; -} - -.project-strip[data-expanded="true"] .strip-status-dot[data-status="question"] { - animation: status-dash-rotate 2s ease-in-out infinite; + background: + radial-gradient(ellipse 3px 2px at 35% 25%, rgba(255, 255, 255, 0.6) 0%, transparent 100%), + radial-gradient(circle at 35% 35%, rgba(0, 0, 0, 0.4) 0%, var(--status-question) 80%); + border-color: color-mix(in srgb, var(--status-question) 80%, #000 20%); + box-shadow: + inset 0 1px 1px rgba(255, 255, 255, 0.4), + inset 0 -2px 4px rgba(0, 0, 0, 0.6), + 0 0 10px 3px color-mix(in srgb, var(--status-question) 60%, transparent); } /* error — red, pulse + intense glow, PROMINENT */ From 0e2a6cc60a37b1d48db1fd25d640893327a301d6 Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 22:13:09 +0200 Subject: [PATCH 6/8] docs(env): document telegram notification variables --- .env.example | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.env.example b/.env.example index 570f6c6..bf9d3ff 100644 --- a/.env.example +++ b/.env.example @@ -13,3 +13,10 @@ OMO_PULSE_API_PORT=4301 # Set to "true" in CI/CD environments to prevent Playwright from reusing existing servers # In local development, leave unset (defaults to false) CI= + +# Telegram Notifications (optional) +# Get token from @BotFather, chat ID from @userinfobot or the getUpdates API +# When both are set, omo-pulse pushes a pinned status message to the chat +# and sends alert notifications for question/error/plan_complete transitions. +TELEGRAM_BOT_TOKEN= +TELEGRAM_CHAT_ID= From e7976a93ae6c9efd4b89ea7eaba55aa76089ac39 Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Fri, 10 Apr 2026 22:12:56 +0200 Subject: [PATCH 7/8] docs(readme): add Ko-fi badge --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 7daf633..623501b 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ [![Runtime: Bun](https://img.shields.io/badge/Runtime-Bun-%23f9f1e1?logo=bun)](https://bun.sh) [![TypeScript](https://img.shields.io/badge/TypeScript-5.5-blue?logo=typescript)](https://www.typescriptlang.org/) [![Sponsor](https://img.shields.io/badge/sponsor-%E2%9D%A4-lightgrey)](https://github.com/sponsors/EZotoff) +[![Ko-fi](https://img.shields.io/badge/Ko--fi-Support-ff5e5b?logo=ko-fi&logoColor=white)](https://ko-fi.com/ezotoff) ![Dashboard — multi-project view with session activity and token usage](docs/screenshots/details-collapsed.png) From 899871bf806ea7bcc7f9418ed64fbb303215cb8a Mon Sep 17 00:00:00 2001 From: Evgeny Zotov Date: Tue, 14 Apr 2026 22:27:57 +0200 Subject: [PATCH 8/8] fix(ingest): use last-terminal-event logic for error status instead of any-error-in-history Error status was triggered by any tool part with status 'error' in recent history, causing recoverable refusals (e.g. 'read file before editing') to persist as Error state. Now only the most recent terminal tool event (error or completed) determines error status. Also fixes session-inclusion.ts SQL queries that referenced non-existent columns (state_status, tool) instead of using json_extract on the data JSON column. --- src/__tests__/session-inclusion.test.ts | 27 +++++++++-------- src/ingest/session-inclusion.ts | 39 ++++++++++++------------- src/ingest/session.ts | 24 +++++++++------ src/ingest/sqlite-derive.ts | 16 +++++----- 4 files changed, 55 insertions(+), 51 deletions(-) diff --git a/src/__tests__/session-inclusion.test.ts b/src/__tests__/session-inclusion.test.ts index b002378..19f886e 100644 --- a/src/__tests__/session-inclusion.test.ts +++ b/src/__tests__/session-inclusion.test.ts @@ -19,19 +19,17 @@ type SessionRow = { type ActivePartRow = { tool: string - status: string } -type ErrorCountRow = { - cnt: number +type TerminalPartRow = { + status: string } type AssistantMessageRow = { - role: string - time_completed?: number + time_completed: number | null } -type QueryRows = SessionRow[] | ActivePartRow[] | ErrorCountRow[] | AssistantMessageRow[] +type QueryRows = SessionRow[] | ActivePartRow[] | TerminalPartRow[] | AssistantMessageRow[] type MockStatement = { all: (...params: unknown[]) => QueryRows @@ -44,7 +42,7 @@ type MockDatabase = { type MockDbConfig = { sessionRows?: SessionRow[] activePartsBySession?: Record - errorCountsBySession?: Record + terminalStatusBySession?: Record assistantMessagesBySession?: Record throwOnQuery?: boolean } @@ -64,12 +62,13 @@ function createMockDb(config: MockDbConfig = {}): MockDatabase { return config.sessionRows ?? [] } - if (sql.includes("state_status = 'pending' OR state_status = 'running'")) { + if (sql.includes("'pending', 'running'")) { return sessionId ? (config.activePartsBySession?.[sessionId] ?? []) : [] } - if (sql.includes("state_status = 'error'")) { - return [{ cnt: sessionId ? (config.errorCountsBySession?.[sessionId] ?? 0) : 0 }] + if (sql.includes("'error', 'completed'")) { + const status = sessionId ? (config.terminalStatusBySession?.[sessionId] ?? null) : null + return status ? [{ status }] : [] } if (sql.includes("FROM message")) { @@ -293,8 +292,8 @@ describe("findIncludedSessionsSqlite", () => { time_updated: now - 120000, }, ], - errorCountsBySession: { - "stale-error": 1, + terminalStatusBySession: { + "stale-error": "error", }, }), "/home/user/project", @@ -511,8 +510,8 @@ describe("findIncludedSessionsSqlite", () => { activePartsBySession: { "question-session": [{ tool: "mcp_question", status: "pending" }], }, - errorCountsBySession: { - "error-session": 1, + terminalStatusBySession: { + "error-session": "error", }, }), "/home/user/project", diff --git a/src/ingest/session-inclusion.ts b/src/ingest/session-inclusion.ts index 09aff92..3a82c16 100644 --- a/src/ingest/session-inclusion.ts +++ b/src/ingest/session-inclusion.ts @@ -34,14 +34,14 @@ function normalizePath(dir: string): string { */ function deriveSessionStatus(db: Database, session: SessionMetadata, nowMs: number): string { try { - // Check for active tool (pending or running) const activeParts = db .query( - `SELECT tool, status FROM part - WHERE session_id = ? AND (state_status = 'pending' OR state_status = 'running') - ORDER BY created DESC LIMIT 1` + `SELECT json_extract(data, '$.tool') as tool + FROM part + WHERE session_id = ? AND json_extract(data, '$.state.status') IN ('pending', 'running') + ORDER BY time_created DESC LIMIT 1` ) - .all(session.id) as Array<{ tool: string; status: string }> + .all(session.id) as Array<{ tool: string }> if (activeParts.length > 0) { if (QUESTION_TOOL_NAMES.has(activeParts[0].tool)) { @@ -50,42 +50,39 @@ function deriveSessionStatus(db: Database, session: SessionMetadata, nowMs: numb return "running_tool" } - // Check for error tool - const errorParts = db + const lastTerminal = db .query( - `SELECT COUNT(*) as cnt FROM part - WHERE session_id = ? AND state_status = 'error' - LIMIT 1` + `SELECT json_extract(data, '$.state.status') as status + FROM part + WHERE session_id = ? AND json_extract(data, '$.state.status') IN ('error', 'completed') + ORDER BY time_created DESC LIMIT 1` ) - .all(session.id) as Array<{ cnt: number }> + .all(session.id) as Array<{ status: string }> - if (errorParts.length > 0 && errorParts[0].cnt > 0) { + if (lastTerminal.length > 0 && lastTerminal[0].status === "error") { return "error" } - // Check for recent assistant message (thinking) const recentMessages = db .query( - `SELECT role, time_completed FROM message - WHERE session_id = ? AND role = 'assistant' - ORDER BY created DESC LIMIT 1` + `SELECT json_extract(data, '$.time.completed') as time_completed + FROM message + WHERE session_id = ? AND json_extract(data, '$.role') = 'assistant' + ORDER BY time_created DESC LIMIT 1` ) - .all(session.id) as Array<{ role: string; time_completed?: number }> + .all(session.id) as Array<{ time_completed: number | null }> if ( recentMessages.length > 0 && - recentMessages[0].role === "assistant" && - recentMessages[0].time_completed === undefined + recentMessages[0].time_completed === null ) { return "thinking" } - // Default: distinguish busy vs idle based on canonical ACTIVE_BUSY_WINDOW_MS threshold const lastUpdated = session.time.updated ?? session.time.created ?? 0 const ageMs = nowMs - lastUpdated return ageMs <= ACTIVE_BUSY_WINDOW_MS ? "busy" : "idle" } catch { - // On any error, return unknown return "unknown" } } diff --git a/src/ingest/session.ts b/src/ingest/session.ts index 2194e7b..db22359 100644 --- a/src/ingest/session.ts +++ b/src/ingest/session.ts @@ -283,23 +283,28 @@ function readLastToolPart(partStorage: string, messageID: string): { tool: strin return null } -function hasErrorToolPart(partStorage: string, messageID: string): boolean { +function messageTerminalToolStatus(partStorage: string, messageID: string): "error" | "completed" | null { const partDir = path.join(partStorage, messageID) - if (!fs.existsSync(partDir)) return false + if (!fs.existsSync(partDir)) return null + let hasError = false + let hasCompleted = false const files = fs.readdirSync(partDir).filter((f) => f.endsWith(".json")) for (const file of files) { try { const content = fs.readFileSync(path.join(partDir, file), "utf8") const part = JSON.parse(content) as Partial - if (part.type === "tool" && part.state?.status === "error") { - return true + if (part.type === "tool") { + if (part.state?.status === "error") hasError = true + else if (part.state?.status === "completed") hasCompleted = true } } catch { continue } } - return false + if (hasError) return "error" + if (hasCompleted) return "completed" + return null } export function getMainSessionView(opts: { @@ -332,11 +337,12 @@ export function getMainSessionView(opts: { } } - let hasErrorTool = false + let lastToolErrored = false if (!activeTool) { for (const meta of recentMetas) { - if (hasErrorToolPart(opts.storage.part, meta.id)) { - hasErrorTool = true + const terminal = messageTerminalToolStatus(opts.storage.part, meta.id) + if (terminal !== null) { + lastToolErrored = terminal === "error" break } } @@ -354,7 +360,7 @@ export function getMainSessionView(opts: { } } - if (status === "unknown" && !isStaleActivity && hasErrorTool) { + if (status === "unknown" && !isStaleActivity && lastToolErrored) { status = "error" } else if (status === "unknown" && !isStaleActivity && recent?.role === "assistant" && typeof recent?.time?.created === "number" && typeof recent?.time?.completed !== "number") { status = "thinking" diff --git a/src/ingest/sqlite-derive.ts b/src/ingest/sqlite-derive.ts index 0ed0e09..58fe024 100644 --- a/src/ingest/sqlite-derive.ts +++ b/src/ingest/sqlite-derive.ts @@ -414,14 +414,16 @@ export function getMainSessionViewSqlite(opts: { if (activeTool) break } - let hasErrorTool = false + let lastToolErrored = false if (!activeTool) { - for (const meta of session.value.metas) { + findLastTerminal: for (const meta of session.value.metas) { const parts = session.value.partsByMessage.get(meta.id) ?? [] - const errorPart = parts.find((part) => part.state.status === "error") - if (errorPart) { - hasErrorTool = true - break + for (let i = parts.length - 1; i >= 0; i--) { + const s = parts[i].state.status + if (s === "error" || s === "completed") { + lastToolErrored = s === "error" + break findLastTerminal + } } } } @@ -438,7 +440,7 @@ export function getMainSessionViewSqlite(opts: { } } - if (status === "unknown" && !isStaleActivity && hasErrorTool) { + if (status === "unknown" && !isStaleActivity && lastToolErrored) { status = "error" } else if (status === "unknown" && !isStaleActivity && recent?.role === "assistant" && typeof recent.time?.created === "number" && typeof recent.time?.completed !== "number") { status = "thinking"