Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ OMO_PULSE_API_PORT=4301
# Set to "true" in CI/CD environments to prevent Playwright from reusing existing servers
# In local development, leave unset (defaults to false)
CI=

# Telegram Notifications (optional)
# Get token from @BotFather, chat ID from @userinfobot or the getUpdates API
# When both are set, omo-pulse pushes a pinned status message to the chat
# and sends alert notifications for question/error/plan_complete transitions.
TELEGRAM_BOT_TOKEN=
TELEGRAM_CHAT_ID=
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[![Runtime: Bun](https://img.shields.io/badge/Runtime-Bun-%23f9f1e1?logo=bun)](https://bun.sh)
[![TypeScript](https://img.shields.io/badge/TypeScript-5.5-blue?logo=typescript)](https://www.typescriptlang.org/)
[![Sponsor](https://img.shields.io/badge/sponsor-%E2%9D%A4-lightgrey)](https://github.com/sponsors/EZotoff)
[![Ko-fi](https://img.shields.io/badge/Ko--fi-Support-ff5e5b?logo=ko-fi&logoColor=white)](https://ko-fi.com/ezotoff)

![Dashboard — multi-project view with session activity and token usage](docs/screenshots/details-collapsed.png)

Expand Down
17 changes: 5 additions & 12 deletions src/__tests__/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@ vi.mock("../ingest/sources-registry", () => ({
getSourceById: vi.fn(() => null),
}))

vi.mock("../server/multi-project", () => ({
createMultiProjectService: vi.fn(() => ({
getMultiProjectPayload: vi.fn(async (): Promise<DashboardMultiProjectPayload> => ({
projects: [],
serverNowMs: Date.now(),
pollIntervalMs: 2000,
})),
})),
}))

vi.mock("../ingest/session", () => ({
getStorageRoots: vi.fn(() => ({
session: "/tmp/session",
Expand Down Expand Up @@ -63,7 +53,6 @@ vi.mock("../ingest/sqlite-derive", () => ({
// Import AFTER mocking
// ---------------------------------------------------------------------------
import { createApi } from "../server/api"
import { createMultiProjectService } from "../server/multi-project"
import type { ProjectSnapshot } from "../types"

// ---------------------------------------------------------------------------
Expand All @@ -83,6 +72,8 @@ function makeProjectSnapshot(overrides: Partial<ProjectSnapshot> = {}): ProjectS
sessionId: "ses_abc",
status: "idle",
},
sessions: [],
aggregateStatus: "idle",
planProgress: {
name: "plan-1",
completed: 3,
Expand All @@ -93,6 +84,7 @@ function makeProjectSnapshot(overrides: Partial<ProjectSnapshot> = {}): ProjectS
planStale: false,
planComplete: false,
},
unintiatedPlans: [],
timeSeries: {
windowMs: 300000,
bucketMs: 2000,
Expand Down Expand Up @@ -127,12 +119,13 @@ describe("API routes", () => {
serverNowMs: Date.now(),
pollIntervalMs: 2000,
})),
invalidate: vi.fn(),
}
vi.mocked(createMultiProjectService).mockReturnValue(mockService)

app = createApi({
storageRoot: "/tmp/test-storage",
storageBackend: { kind: "sqlite", dataDir: "/tmp", sqlitePath: "/tmp/test.db" },
multiProjectService: mockService,
version: "1.0.0-test",
})
})
Expand Down
27 changes: 13 additions & 14 deletions src/__tests__/session-inclusion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ type SessionRow = {

type ActivePartRow = {
tool: string
status: string
}

type ErrorCountRow = {
cnt: number
type TerminalPartRow = {
status: string
}

type AssistantMessageRow = {
role: string
time_completed?: number
time_completed: number | null
}

type QueryRows = SessionRow[] | ActivePartRow[] | ErrorCountRow[] | AssistantMessageRow[]
type QueryRows = SessionRow[] | ActivePartRow[] | TerminalPartRow[] | AssistantMessageRow[]

type MockStatement = {
all: (...params: unknown[]) => QueryRows
Expand All @@ -44,7 +42,7 @@ type MockDatabase = {
type MockDbConfig = {
sessionRows?: SessionRow[]
activePartsBySession?: Record<string, ActivePartRow[]>
errorCountsBySession?: Record<string, number>
terminalStatusBySession?: Record<string, string>
assistantMessagesBySession?: Record<string, AssistantMessageRow[]>
throwOnQuery?: boolean
}
Expand All @@ -64,12 +62,13 @@ function createMockDb(config: MockDbConfig = {}): MockDatabase {
return config.sessionRows ?? []
}

if (sql.includes("state_status = 'pending' OR state_status = 'running'")) {
if (sql.includes("'pending', 'running'")) {
return sessionId ? (config.activePartsBySession?.[sessionId] ?? []) : []
}

if (sql.includes("state_status = 'error'")) {
return [{ cnt: sessionId ? (config.errorCountsBySession?.[sessionId] ?? 0) : 0 }]
if (sql.includes("'error', 'completed'")) {
const status = sessionId ? (config.terminalStatusBySession?.[sessionId] ?? null) : null
return status ? [{ status }] : []
}

if (sql.includes("FROM message")) {
Expand Down Expand Up @@ -293,8 +292,8 @@ describe("findIncludedSessionsSqlite", () => {
time_updated: now - 120000,
},
],
errorCountsBySession: {
"stale-error": 1,
terminalStatusBySession: {
"stale-error": "error",
},
}),
"/home/user/project",
Expand Down Expand Up @@ -511,8 +510,8 @@ describe("findIncludedSessionsSqlite", () => {
activePartsBySession: {
"question-session": [{ tool: "mcp_question", status: "pending" }],
},
errorCountsBySession: {
"error-session": 1,
terminalStatusBySession: {
"error-session": "error",
},
}),
"/home/user/project",
Expand Down
48 changes: 24 additions & 24 deletions src/ingest/session-inclusion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ function normalizePath(dir: string): string {
*/
function deriveSessionStatus(db: Database, session: SessionMetadata, nowMs: number): string {
try {
// Check for active tool (pending or running)
const activeParts = db
.query(
`SELECT tool, status FROM part
WHERE session_id = ? AND (state_status = 'pending' OR state_status = 'running')
ORDER BY created DESC LIMIT 1`
`SELECT json_extract(data, '$.tool') as tool
FROM part
WHERE session_id = ? AND json_extract(data, '$.state.status') IN ('pending', 'running')
ORDER BY time_created DESC LIMIT 1`
)
.all(session.id) as Array<{ tool: string; status: string }>
.all(session.id) as Array<{ tool: string }>

if (activeParts.length > 0) {
if (QUESTION_TOOL_NAMES.has(activeParts[0].tool)) {
Expand All @@ -50,42 +50,39 @@ function deriveSessionStatus(db: Database, session: SessionMetadata, nowMs: numb
return "running_tool"
}

// Check for error tool
const errorParts = db
const lastTerminal = db
.query(
`SELECT COUNT(*) as cnt FROM part
WHERE session_id = ? AND state_status = 'error'
LIMIT 1`
`SELECT json_extract(data, '$.state.status') as status
FROM part
WHERE session_id = ? AND json_extract(data, '$.state.status') IN ('error', 'completed')
ORDER BY time_created DESC LIMIT 1`
)
.all(session.id) as Array<{ cnt: number }>
.all(session.id) as Array<{ status: string }>

if (errorParts.length > 0 && errorParts[0].cnt > 0) {
if (lastTerminal.length > 0 && lastTerminal[0].status === "error") {
return "error"
}

// Check for recent assistant message (thinking)
const recentMessages = db
.query(
`SELECT role, time_completed FROM message
WHERE session_id = ? AND role = 'assistant'
ORDER BY created DESC LIMIT 1`
`SELECT json_extract(data, '$.time.completed') as time_completed
FROM message
WHERE session_id = ? AND json_extract(data, '$.role') = 'assistant'
ORDER BY time_created DESC LIMIT 1`
)
.all(session.id) as Array<{ role: string; time_completed?: number }>
.all(session.id) as Array<{ time_completed: number | null }>

if (
recentMessages.length > 0 &&
recentMessages[0].role === "assistant" &&
recentMessages[0].time_completed === undefined
recentMessages[0].time_completed === null
) {
return "thinking"
}

// Default: distinguish busy vs idle based on canonical ACTIVE_BUSY_WINDOW_MS threshold
const lastUpdated = session.time.updated ?? session.time.created ?? 0
const ageMs = nowMs - lastUpdated
return ageMs <= ACTIVE_BUSY_WINDOW_MS ? "busy" : "idle"
} catch {
// On any error, return unknown
return "unknown"
}
}
Expand Down Expand Up @@ -124,6 +121,7 @@ export function findIncludedSessionsSqlite(
}>

const sessions: SessionMetadata[] = []
const statusCache = new Map<string, string>()
for (const row of sessionRows) {
if (typeof row.id !== "string" || typeof row.directory !== "string") continue

Expand All @@ -144,20 +142,22 @@ export function findIncludedSessionsSqlite(
time: { created: timeCreated, updated: timeUpdated },
}

const status = deriveSessionStatus(db, meta, nowMs)
if (
isSessionIncluded(meta, idleWindowMs, nowMs) ||
isAttentionStatus(deriveSessionStatus(db, meta, nowMs))
isAttentionStatus(status)
) {
sessions.push(meta)
statusCache.set(meta.id, status)
}
}

// Severity-first ordering: error (0) > question (1) > running_tool (2) > thinking (3) > busy (4) > idle (5) > unknown (6)
// Then recency: most recent activity first (time.updated DESC)
// Finally stable tie-breaker: id ascending
sessions.sort((a, b) => {
const aStatus = deriveSessionStatus(db, a, nowMs)
const bStatus = deriveSessionStatus(db, b, nowMs)
const aStatus = statusCache.get(a.id) ?? "unknown"
const bStatus = statusCache.get(b.id) ?? "unknown"

const aSeverity = STATUS_SEVERITY[aStatus] ?? 6
const bSeverity = STATUS_SEVERITY[bStatus] ?? 6
Expand Down
24 changes: 15 additions & 9 deletions src/ingest/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,23 +283,28 @@ function readLastToolPart(partStorage: string, messageID: string): { tool: strin
return null
}

function hasErrorToolPart(partStorage: string, messageID: string): boolean {
function messageTerminalToolStatus(partStorage: string, messageID: string): "error" | "completed" | null {
const partDir = path.join(partStorage, messageID)
if (!fs.existsSync(partDir)) return false
if (!fs.existsSync(partDir)) return null

let hasError = false
let hasCompleted = false
const files = fs.readdirSync(partDir).filter((f) => f.endsWith(".json"))
for (const file of files) {
try {
const content = fs.readFileSync(path.join(partDir, file), "utf8")
const part = JSON.parse(content) as Partial<StoredToolPart>
if (part.type === "tool" && part.state?.status === "error") {
return true
if (part.type === "tool") {
if (part.state?.status === "error") hasError = true
else if (part.state?.status === "completed") hasCompleted = true
}
} catch {
continue
}
}
return false
if (hasError) return "error"
if (hasCompleted) return "completed"
return null
}

export function getMainSessionView(opts: {
Expand Down Expand Up @@ -332,11 +337,12 @@ export function getMainSessionView(opts: {
}
}

let hasErrorTool = false
let lastToolErrored = false
if (!activeTool) {
for (const meta of recentMetas) {
if (hasErrorToolPart(opts.storage.part, meta.id)) {
hasErrorTool = true
const terminal = messageTerminalToolStatus(opts.storage.part, meta.id)
if (terminal !== null) {
lastToolErrored = terminal === "error"
break
}
}
Expand All @@ -354,7 +360,7 @@ export function getMainSessionView(opts: {
}
}

if (status === "unknown" && !isStaleActivity && hasErrorTool) {
if (status === "unknown" && !isStaleActivity && lastToolErrored) {
status = "error"
} else if (status === "unknown" && !isStaleActivity && recent?.role === "assistant" && typeof recent?.time?.created === "number" && typeof recent?.time?.completed !== "number") {
status = "thinking"
Expand Down
Loading
Loading