From cd4bc80dc5fabc1d083446b7f7739314ef8b7741 Mon Sep 17 00:00:00 2001 From: Mario Witte Date: Wed, 8 Apr 2026 20:03:53 -0500 Subject: [PATCH] feat: add Warp agent support --- internal/parser/taxonomy.go | 18 ++ internal/parser/types.go | 10 + internal/parser/warp.go | 571 ++++++++++++++++++++++++++++++++++ internal/parser/warp_paths.go | 15 + internal/parser/warp_test.go | 426 +++++++++++++++++++++++++ internal/sync/engine.go | 171 +++++++++- 6 files changed, 1210 insertions(+), 1 deletion(-) create mode 100644 internal/parser/warp.go create mode 100644 internal/parser/warp_paths.go create mode 100644 internal/parser/warp_test.go diff --git a/internal/parser/taxonomy.go b/internal/parser/taxonomy.go index b9aa8246..4af4bb19 100644 --- a/internal/parser/taxonomy.go +++ b/internal/parser/taxonomy.go @@ -153,6 +153,24 @@ func NormalizeToolCategory(rawName string) string { case "code_interpreter": return "Bash" + // Warp tools + case "read_files": + return "Read" + case "apply_file_diff": + return "Edit" + case "search_codebase": + return "Grep" + case "call_mcp_tool", "read_mcp_resource": + return "Tool" + case "suggest_plan", "suggest_create_plan": + return "Tool" + case "write_to_long_running_shell_command": + return "Bash" + case "read_shell_command_output": + return "Read" + case "use_computer": + return "Tool" + default: // MCP tools may carry a server prefix (e.g. // "Zencoder_subagent__ZencoderSubagent") or use diff --git a/internal/parser/types.go b/internal/parser/types.go index 19c9a505..270ab9e5 100644 --- a/internal/parser/types.go +++ b/internal/parser/types.go @@ -29,6 +29,7 @@ const ( AgentKiroIDE AgentType = "kiro-ide" AgentCortex AgentType = "cortex" AgentHermes AgentType = "hermes" + AgentWarp AgentType = "warp" ) // AgentDef describes a supported coding agent's filesystem @@ -271,6 +272,15 @@ var Registry = []AgentDef{ DiscoverFunc: DiscoverHermesSessions, FindSourceFunc: FindHermesSourceFile, }, + { + Type: AgentWarp, + DisplayName: "Warp", + EnvVar: "WARP_DIR", + ConfigKey: "warp_dirs", + DefaultDirs: warpDefaultDirs(), + IDPrefix: "warp:", + FileBased: false, + }, } // NonFileBackedAgents returns agent types where FileBased is false. diff --git a/internal/parser/warp.go b/internal/parser/warp.go new file mode 100644 index 00000000..2766945e --- /dev/null +++ b/internal/parser/warp.go @@ -0,0 +1,571 @@ +package parser + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "time" +) + +// WarpSession bundles a parsed session with its messages. +type WarpSession struct { + Session ParsedSession + Messages []ParsedMessage +} + +// WarpSessionMeta is lightweight metadata for a session, +// used to detect changes without parsing messages. +type WarpSessionMeta struct { + SessionID string + VirtualPath string + FileMtime int64 // last_modified_at as UnixNano +} + +// ListWarpSessionMeta returns lightweight metadata for all +// conversations without parsing exchanges. Used by the sync +// engine to detect which sessions have changed. +func ListWarpSessionMeta( + dbPath string, +) ([]WarpSessionMeta, error) { + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + return nil, nil + } + + db, err := openWarpDB(dbPath) + if err != nil { + return nil, err + } + defer db.Close() + + rows, err := db.Query( + `SELECT conversation_id, last_modified_at + FROM agent_conversations`, + ) + if err != nil { + return nil, fmt.Errorf( + "listing warp conversations: %w", err, + ) + } + defer rows.Close() + + var metas []WarpSessionMeta + for rows.Next() { + var id string + var lastModified string + if err := rows.Scan( + &id, &lastModified, + ); err != nil { + return nil, fmt.Errorf( + "scanning warp session meta: %w", err, + ) + } + mtime := parseWarpTimestamp(lastModified).UnixNano() + metas = append(metas, WarpSessionMeta{ + SessionID: id, + VirtualPath: dbPath + "#" + id, + FileMtime: mtime, + }) + } + return metas, rows.Err() +} + +// ParseWarpDB opens the Warp SQLite database read-only and +// returns all conversations with messages. +func ParseWarpDB( + dbPath, machine string, +) ([]WarpSession, error) { + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + return nil, nil + } + + db, err := openWarpDB(dbPath) + if err != nil { + return nil, err + } + defer db.Close() + + convos, err := loadWarpConversations(db) + if err != nil { + return nil, fmt.Errorf( + "loading warp conversations: %w", err, + ) + } + + var results []WarpSession + for _, c := range convos { + parsed, msgs, err := buildWarpSession( + db, c, dbPath, machine, + ) + if err != nil { + log.Printf( + "warp conversation %s: %v", c.id, err, + ) + continue + } + if parsed == nil { + continue + } + results = append(results, WarpSession{ + Session: *parsed, + Messages: msgs, + }) + } + return results, nil +} + +// ParseWarpSession parses a single conversation by ID from +// the Warp database. +func ParseWarpSession( + dbPath, conversationID, machine string, +) (*ParsedSession, []ParsedMessage, error) { + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + return nil, nil, fmt.Errorf( + "warp db not found: %s", dbPath, + ) + } + + db, err := openWarpDB(dbPath) + if err != nil { + return nil, nil, err + } + defer db.Close() + + c, err := loadOneWarpConversation(db, conversationID) + if err != nil { + return nil, nil, fmt.Errorf( + "loading warp conversation %s: %w", + conversationID, err, + ) + } + + return buildWarpSession(db, c, dbPath, machine) +} + +func openWarpDB(dbPath string) (*sql.DB, error) { + dsn := dbPath + + "?mode=ro&_journal_mode=WAL&_busy_timeout=3000" + db, err := sql.Open("sqlite3", dsn) + if err != nil { + return nil, fmt.Errorf( + "opening warp db %s: %w", dbPath, err, + ) + } + return db, nil +} + +// warpConversationRow is a row from agent_conversations. +type warpConversationRow struct { + id string + conversationData string + lastModifiedAt string +} + +func loadWarpConversations( + db *sql.DB, +) ([]warpConversationRow, error) { + rows, err := db.Query(` + SELECT conversation_id, + COALESCE(conversation_data, '{}'), + last_modified_at + FROM agent_conversations + ORDER BY last_modified_at + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var convos []warpConversationRow + for rows.Next() { + var c warpConversationRow + if err := rows.Scan( + &c.id, &c.conversationData, &c.lastModifiedAt, + ); err != nil { + return nil, err + } + convos = append(convos, c) + } + return convos, rows.Err() +} + +func loadOneWarpConversation( + db *sql.DB, conversationID string, +) (warpConversationRow, error) { + row := db.QueryRow(` + SELECT conversation_id, + COALESCE(conversation_data, '{}'), + last_modified_at + FROM agent_conversations + WHERE conversation_id = ? + `, conversationID) + + var c warpConversationRow + err := row.Scan( + &c.id, &c.conversationData, &c.lastModifiedAt, + ) + return c, err +} + +// warpExchangeRow is a row from ai_queries. +type warpExchangeRow struct { + exchangeID string + startTS string + input string + modelID string + workingDir string + outputStatus string +} + +func loadWarpExchanges( + db *sql.DB, conversationID string, +) ([]warpExchangeRow, error) { + rows, err := db.Query(` + SELECT exchange_id, start_ts, + COALESCE(input, '[]'), + COALESCE(model_id, ''), + COALESCE(working_directory, ''), + COALESCE(output_status, '') + FROM ai_queries + WHERE conversation_id = ? + ORDER BY start_ts + `, conversationID) + if err != nil { + return nil, err + } + defer rows.Close() + + var exchanges []warpExchangeRow + for rows.Next() { + var e warpExchangeRow + if err := rows.Scan( + &e.exchangeID, &e.startTS, &e.input, + &e.modelID, &e.workingDir, &e.outputStatus, + ); err != nil { + return nil, err + } + exchanges = append(exchanges, e) + } + return exchanges, rows.Err() +} + +func buildWarpSession( + db *sql.DB, + c warpConversationRow, + dbPath, machine string, +) (*ParsedSession, []ParsedMessage, error) { + exchanges, err := loadWarpExchanges(db, c.id) + if err != nil { + return nil, nil, fmt.Errorf( + "loading exchanges for %s: %w", c.id, err, + ) + } + + if len(exchanges) == 0 { + return nil, nil, nil + } + + var ( + parsed []ParsedMessage + firstMsg string + startedAt time.Time + endedAt time.Time + project string + cwd string + ordinal int + userCount int + lastModel string + ) + + for _, e := range exchanges { + ts := parseWarpTimestamp(e.startTS) + if startedAt.IsZero() { + startedAt = ts + } + endedAt = ts + if e.modelID != "" { + lastModel = normalizeWarpModel(e.modelID) + } + if e.workingDir != "" && cwd == "" { + cwd = e.workingDir + } + + queryText := extractWarpQueryText(e.input) + if queryText == "" { + // Exchange without user input (assistant + // tool call / intermediate step). Count it + // for timing but don't create a message. + continue + } + + // User message. + parsed = append(parsed, ParsedMessage{ + Ordinal: ordinal, + Role: RoleUser, + Content: queryText, + Timestamp: ts, + ContentLength: len(queryText), + Model: lastModel, + }) + ordinal++ + userCount++ + + if firstMsg == "" { + firstMsg = truncate( + strings.ReplaceAll(queryText, "\n", " "), + 300, + ) + } + } + + if len(parsed) == 0 { + return nil, nil, nil + } + + // Extract project from working directory. + if cwd != "" { + project = ExtractProjectFromCwd(cwd) + } + if project == "" { + project = "unknown" + } + + // Parse conversation metadata for token usage. + meta := parseWarpConversationMeta(c.conversationData) + + // Synthesize tool call messages from aggregate stats. + toolMsgs := synthesizeWarpToolMessages( + meta, endedAt, lastModel, &ordinal, + ) + parsed = append(parsed, toolMsgs...) + + sess := &ParsedSession{ + ID: "warp:" + c.id, + Project: project, + Machine: machine, + Agent: AgentWarp, + Cwd: cwd, + FirstMessage: firstMsg, + StartedAt: startedAt, + EndedAt: endedAt, + MessageCount: len(parsed), + UserMessageCount: userCount, + File: FileInfo{ + Path: dbPath + "#" + c.id, + Mtime: parseWarpTimestamp(c.lastModifiedAt).UnixNano(), + }, + } + + // Token usage from conversation metadata. + if meta.totalTokens > 0 { + sess.HasTotalOutputTokens = true + sess.TotalOutputTokens = meta.totalTokens + } + + return sess, parsed, nil +} + +// warpConversationMeta holds parsed metadata from +// conversation_data JSON. +type warpConversationMeta struct { + totalTokens int + toolStats warpToolStats +} + +type warpToolStats struct { + RunCommand int + ReadFiles int + SearchCodebase int + Grep int + FileGlob int + ApplyFileDiff int + WriteLongRunning int + ReadMCPResource int + CallMCPTool int + SuggestPlan int + SuggestCreatePlan int + ReadShellCmdOutput int + UseComputer int +} + +func parseWarpConversationMeta(data string) warpConversationMeta { + var meta warpConversationMeta + if data == "" || data == "{}" { + return meta + } + + var raw struct { + Usage struct { + TokenUsage []struct { + WarpTokens int `json:"warp_tokens"` + BYOKTokens int `json:"byok_tokens"` + } `json:"token_usage"` + ToolUsage struct { + RunCommand struct{ Count int } `json:"run_command_stats"` + ReadFiles struct{ Count int } `json:"read_files_stats"` + SearchCodebase struct{ Count int } `json:"search_codebase_stats"` + Grep struct{ Count int } `json:"grep_stats"` + FileGlob struct{ Count int } `json:"file_glob_stats"` + ApplyFileDiff struct { + Count int `json:"count"` + } `json:"apply_file_diff_stats"` + WriteLongRunning struct{ Count int } `json:"write_to_long_running_shell_command_stats"` + ReadMCPResource struct{ Count int } `json:"read_mcp_resource_stats"` + CallMCPTool struct{ Count int } `json:"call_mcp_tool_stats"` + SuggestPlan struct{ Count int } `json:"suggest_plan_stats"` + SuggestCreatePlan struct{ Count int } `json:"suggest_create_plan_stats"` + ReadShellOutput struct{ Count int } `json:"read_shell_command_output_stats"` + UseComputer struct{ Count int } `json:"use_computer_stats"` + } `json:"tool_usage_metadata"` + } `json:"conversation_usage_metadata"` + } + + if err := json.Unmarshal([]byte(data), &raw); err != nil { + return meta + } + + for _, tu := range raw.Usage.TokenUsage { + meta.totalTokens += tu.WarpTokens + tu.BYOKTokens + } + + ts := raw.Usage.ToolUsage + meta.toolStats = warpToolStats{ + RunCommand: ts.RunCommand.Count, + ReadFiles: ts.ReadFiles.Count, + SearchCodebase: ts.SearchCodebase.Count, + Grep: ts.Grep.Count, + FileGlob: ts.FileGlob.Count, + ApplyFileDiff: ts.ApplyFileDiff.Count, + WriteLongRunning: ts.WriteLongRunning.Count, + ReadMCPResource: ts.ReadMCPResource.Count, + CallMCPTool: ts.CallMCPTool.Count, + SuggestPlan: ts.SuggestPlan.Count, + SuggestCreatePlan: ts.SuggestCreatePlan.Count, + ReadShellCmdOutput: ts.ReadShellOutput.Count, + UseComputer: ts.UseComputer.Count, + } + + return meta +} + +// synthesizeWarpToolMessages creates assistant messages with +// tool call metadata from aggregate tool usage stats. This +// gives agentsview accurate tool category breakdowns even +// though Warp doesn't persist individual tool call records. +func synthesizeWarpToolMessages( + meta warpConversationMeta, + ts time.Time, + model string, + ordinal *int, +) []ParsedMessage { + type toolEntry struct { + name string + count int + } + entries := []toolEntry{ + {"run_command", meta.toolStats.RunCommand}, + {"read_files", meta.toolStats.ReadFiles}, + {"search_codebase", meta.toolStats.SearchCodebase}, + {"grep", meta.toolStats.Grep}, + {"file_glob", meta.toolStats.FileGlob}, + {"apply_file_diff", meta.toolStats.ApplyFileDiff}, + {"write_to_long_running_shell_command", meta.toolStats.WriteLongRunning}, + {"read_mcp_resource", meta.toolStats.ReadMCPResource}, + {"call_mcp_tool", meta.toolStats.CallMCPTool}, + {"suggest_plan", meta.toolStats.SuggestPlan}, + {"suggest_create_plan", meta.toolStats.SuggestCreatePlan}, + {"read_shell_command_output", meta.toolStats.ReadShellCmdOutput}, + {"use_computer", meta.toolStats.UseComputer}, + } + + var msgs []ParsedMessage + for _, e := range entries { + for range e.count { + category := NormalizeToolCategory(e.name) + content := fmt.Sprintf("[%s]", category) + msgs = append(msgs, ParsedMessage{ + Ordinal: *ordinal, + Role: RoleAssistant, + Content: content, + Timestamp: ts, + HasToolUse: true, + ContentLength: len(content), + Model: model, + ToolCalls: []ParsedToolCall{{ + ToolName: e.name, + Category: category, + }}, + }) + *ordinal++ + } + } + return msgs +} + +// extractWarpQueryText extracts the user's query text from the +// ai_queries input JSON. The format is: +// [{"Query":{"text":"...","context":[...]}}] +func extractWarpQueryText(input string) string { + input = strings.TrimSpace(input) + if input == "" || input == "[]" { + return "" + } + + var items []json.RawMessage + if err := json.Unmarshal([]byte(input), &items); err != nil { + return "" + } + if len(items) == 0 { + return "" + } + + var wrapper struct { + Query struct { + Text string `json:"text"` + } `json:"Query"` + } + if err := json.Unmarshal(items[0], &wrapper); err != nil { + return "" + } + return strings.TrimSpace(wrapper.Query.Text) +} + +// normalizeWarpModel cleans up Warp model IDs for display. +func normalizeWarpModel(modelID string) string { + // Warp uses IDs like "auto-genius", "auto" — keep as-is. + return modelID +} + +// parseWarpTimestamp parses timestamps from Warp's SQLite DB. +// Format: "2026-04-07 08:55:40" or with fractional seconds. +func parseWarpTimestamp(s string) time.Time { + s = strings.TrimSpace(s) + if s == "" { + return time.Time{} + } + + // Try with fractional seconds first. + for _, layout := range []string{ + "2006-01-02 15:04:05.000000", + "2006-01-02 15:04:05", + time.RFC3339, + } { + if t, err := time.Parse(layout, s); err == nil { + return t + } + } + return time.Time{} +} + +// FindWarpDBPath returns the path to warp.sqlite inside the +// given directory, or "" if it doesn't exist. +func FindWarpDBPath(dir string) string { + candidate := filepath.Join(dir, "warp.sqlite") + if _, err := os.Stat(candidate); err == nil { + return candidate + } + return "" +} diff --git a/internal/parser/warp_paths.go b/internal/parser/warp_paths.go new file mode 100644 index 00000000..0b444f8e --- /dev/null +++ b/internal/parser/warp_paths.go @@ -0,0 +1,15 @@ +package parser + +// warpDefaultDirs returns the platform-specific default +// directories for the Warp SQLite database. Each path is +// relative to $HOME and should contain warp.sqlite. +func warpDefaultDirs() []string { + return []string{ + // macOS + "Library/Group Containers/2BBY89MBSN.dev.warp/Library/Application Support/dev.warp.Warp-Stable", + // Linux + ".local/state/warp-terminal", + // Windows + "AppData/Local/warp/Warp/data", + } +} diff --git a/internal/parser/warp_test.go b/internal/parser/warp_test.go new file mode 100644 index 00000000..f41f0e66 --- /dev/null +++ b/internal/parser/warp_test.go @@ -0,0 +1,426 @@ +package parser + +import ( + "database/sql" + "path/filepath" + "testing" + + _ "github.com/mattn/go-sqlite3" +) + +// warpSchema matches the relevant tables from Warp's SQLite database. +const warpSchema = ` +CREATE TABLE agent_conversations ( + id INTEGER PRIMARY KEY NOT NULL, + conversation_id TEXT NOT NULL, + conversation_data TEXT NOT NULL, + last_modified_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +CREATE UNIQUE INDEX ux_agent_conversations_conversation_id + ON agent_conversations (conversation_id); + +CREATE TABLE ai_queries ( + id INTEGER PRIMARY KEY NOT NULL, + exchange_id TEXT NOT NULL, + conversation_id TEXT NOT NULL, + start_ts DATETIME NOT NULL, + input TEXT NOT NULL, + working_directory TEXT, + output_status TEXT NOT NULL, + model_id TEXT NOT NULL DEFAULT '', + planning_model_id TEXT NOT NULL DEFAULT '', + coding_model_id TEXT NOT NULL DEFAULT '' +); +CREATE UNIQUE INDEX ux_ai_queries_exchange_id + ON ai_queries(exchange_id); +` + +type WarpSeeder struct { + db *sql.DB + t *testing.T +} + +func (s *WarpSeeder) AddConversation( + conversationID, conversationData, lastModified string, +) { + s.t.Helper() + _, err := s.db.Exec( + `INSERT INTO agent_conversations + (conversation_id, conversation_data, last_modified_at) + VALUES (?, ?, ?)`, + conversationID, conversationData, lastModified, + ) + if err != nil { + s.t.Fatalf("add conversation: %v", err) + } +} + +func (s *WarpSeeder) AddExchange( + exchangeID, conversationID, startTS, input, + workingDir, outputStatus, modelID string, +) { + s.t.Helper() + _, err := s.db.Exec( + `INSERT INTO ai_queries + (exchange_id, conversation_id, start_ts, input, + working_directory, output_status, model_id) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + exchangeID, conversationID, startTS, input, + workingDir, outputStatus, modelID, + ) + if err != nil { + s.t.Fatalf("add exchange: %v", err) + } +} + +func newWarpTestDB(t *testing.T) (string, *WarpSeeder, *sql.DB) { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "warp.sqlite") + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + t.Fatalf("open test db: %v", err) + } + if _, err := db.Exec(warpSchema); err != nil { + t.Fatalf("create schema: %v", err) + } + seeder := &WarpSeeder{db: db, t: t} + return dbPath, seeder, db +} + +func seedWarpConversation(t *testing.T, seeder *WarpSeeder) { + t.Helper() + + convData := `{ + "conversation_usage_metadata":{ + "token_usage":[ + {"model_id":"Claude Opus 4","warp_tokens":100000,"byok_tokens":0} + ], + "tool_usage_metadata":{ + "run_command_stats":{"count":3,"commands_executed":3}, + "read_files_stats":{"count":2}, + "search_codebase_stats":{"count":0}, + "grep_stats":{"count":1}, + "file_glob_stats":{"count":0}, + "apply_file_diff_stats":{"count":1,"lines_added":5,"lines_removed":2,"files_changed":1}, + "write_to_long_running_shell_command_stats":{"count":0}, + "read_mcp_resource_stats":{"count":0}, + "call_mcp_tool_stats":{"count":0}, + "suggest_plan_stats":{"count":0}, + "suggest_create_plan_stats":{"count":0}, + "read_shell_command_output_stats":{"count":0}, + "use_computer_stats":{"count":0} + } + } + }` + + seeder.AddConversation( + "conv-001", convData, "2026-04-07 10:00:00", + ) + + // User message with query text + seeder.AddExchange( + "ex-001", "conv-001", + "2026-04-07 09:50:00.000000", + `[{"Query":{"text":"Fix the JSON parsing bug in parser.go","context":[]}}]`, + "/Users/alice/code/myproject", + `"Completed"`, "auto-genius", + ) + // Intermediate exchange (tool call, no user input) + seeder.AddExchange( + "ex-002", "conv-001", + "2026-04-07 09:50:05.000000", + `[]`, + "/Users/alice/code/myproject", + `"Completed"`, "auto-genius", + ) + // Follow-up user message + seeder.AddExchange( + "ex-003", "conv-001", + "2026-04-07 09:51:00.000000", + `[{"Query":{"text":"Now add a test for that fix","context":[]}}]`, + "/Users/alice/code/myproject", + `"Completed"`, "auto-genius", + ) +} + +func TestParseWarpDB_StandardConversation(t *testing.T) { + dbPath, seeder, db := newWarpTestDB(t) + defer db.Close() + seedWarpConversation(t, seeder) + + sessions, err := ParseWarpDB(dbPath, "testmachine") + if err != nil { + t.Fatalf("ParseWarpDB: %v", err) + } + + assertEq(t, "sessions len", len(sessions), 1) + + s := sessions[0] + assertEq(t, "ID", s.Session.ID, "warp:conv-001") + assertEq(t, "Agent", s.Session.Agent, AgentWarp) + assertEq(t, "Machine", s.Session.Machine, "testmachine") + assertEq(t, "Project", s.Session.Project, "myproject") + assertEq(t, "UserMessageCount", s.Session.UserMessageCount, 2) + assertEq(t, "FirstMessage", + s.Session.FirstMessage, + "Fix the JSON parsing bug in parser.go", + ) + + wantPath := dbPath + "#conv-001" + assertEq(t, "File.Path", s.Session.File.Path, wantPath) + + // Token usage from conversation_data + assertEq(t, "HasTotalOutputTokens", + s.Session.HasTotalOutputTokens, true) + assertEq(t, "TotalOutputTokens", + s.Session.TotalOutputTokens, 100000) + + // Check user messages + var userMsgs, toolMsgs int + for _, m := range s.Messages { + if m.Role == RoleUser { + userMsgs++ + } + if m.HasToolUse { + toolMsgs++ + } + } + assertEq(t, "userMsgs", userMsgs, 2) + // 3 run_command + 2 read_files + 1 grep + 1 apply_file_diff = 7 + assertEq(t, "toolMsgs", toolMsgs, 7) +} + +func TestParseWarpSession_SingleConversation(t *testing.T) { + dbPath, seeder, db := newWarpTestDB(t) + defer db.Close() + seedWarpConversation(t, seeder) + + sess, msgs, err := ParseWarpSession( + dbPath, "conv-001", "testmachine", + ) + if err != nil { + t.Fatalf("ParseWarpSession: %v", err) + } + if sess == nil { + t.Fatal("expected non-nil session") + } + + assertEq(t, "ID", sess.ID, "warp:conv-001") + assertEq(t, "Agent", sess.Agent, AgentWarp) + + // First user message + assertEq(t, "msgs[0].Role", msgs[0].Role, RoleUser) + assertEq(t, "msgs[0].Content", msgs[0].Content, + "Fix the JSON parsing bug in parser.go") + // Second user message + assertEq(t, "msgs[1].Role", msgs[1].Role, RoleUser) + assertEq(t, "msgs[1].Content", msgs[1].Content, + "Now add a test for that fix") +} + +func TestListWarpSessionMeta(t *testing.T) { + dbPath, seeder, db := newWarpTestDB(t) + defer db.Close() + seedWarpConversation(t, seeder) + + metas, err := ListWarpSessionMeta(dbPath) + if err != nil { + t.Fatalf("ListWarpSessionMeta: %v", err) + } + + assertEq(t, "metas len", len(metas), 1) + assertEq(t, "SessionID", metas[0].SessionID, "conv-001") + assertEq(t, "VirtualPath", + metas[0].VirtualPath, dbPath+"#conv-001") + if metas[0].FileMtime == 0 { + t.Error("expected non-zero FileMtime") + } +} + +func TestParseWarpDB_EmptyConversation(t *testing.T) { + dbPath, seeder, db := newWarpTestDB(t) + defer db.Close() + + seeder.AddConversation( + "conv-empty", "{}", "2026-04-07 10:00:00", + ) + + sessions, err := ParseWarpDB(dbPath, "m") + if err != nil { + t.Fatalf("ParseWarpDB: %v", err) + } + assertEq(t, "sessions len", len(sessions), 0) +} + +func TestParseWarpDB_NoQueryText(t *testing.T) { + dbPath, seeder, db := newWarpTestDB(t) + defer db.Close() + + seeder.AddConversation( + "conv-notext", "{}", "2026-04-07 10:00:00", + ) + // Only empty exchanges + seeder.AddExchange( + "ex-x1", "conv-notext", + "2026-04-07 09:50:00", + `[]`, "/tmp", `"Completed"`, "auto", + ) + + sessions, err := ParseWarpDB(dbPath, "m") + if err != nil { + t.Fatalf("ParseWarpDB: %v", err) + } + assertEq(t, "sessions len", len(sessions), 0) +} + +func TestParseWarpDB_NonExistent(t *testing.T) { + sessions, err := ParseWarpDB( + "/nonexistent/warp.sqlite", "m", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if sessions != nil { + t.Error("expected nil sessions for non-existent db") + } +} + +func TestExtractWarpQueryText(t *testing.T) { + tests := []struct { + name string + input string + want string + }{ + {"empty", "", ""}, + {"empty array", "[]", ""}, + {"with text", `[{"Query":{"text":"hello world","context":[]}}]`, "hello world"}, + {"no query key", `[{"Other":{}}]`, ""}, + {"invalid json", `not json`, ""}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := extractWarpQueryText(tc.input) + assertEq(t, "text", got, tc.want) + }) + } +} + +func TestParseWarpTimestamp(t *testing.T) { + tests := []struct { + input string + year int + }{ + {"2026-04-07 08:55:40", 2026}, + {"2026-04-07 08:55:40.412505", 2026}, + {"", 0}, + } + + for _, tc := range tests { + ts := parseWarpTimestamp(tc.input) + if tc.year == 0 { + if !ts.IsZero() { + t.Errorf("expected zero time for %q", tc.input) + } + } else if ts.Year() != tc.year { + t.Errorf("year = %d, want %d for %q", + ts.Year(), tc.year, tc.input) + } + } +} + +func TestFindWarpDBPath(t *testing.T) { + // Create a temp dir with warp.sqlite + dir := t.TempDir() + dbPath := filepath.Join(dir, "warp.sqlite") + + // Before creating the file + assertEq(t, "not found", FindWarpDBPath(dir), "") + + // Create the file (sql.Open is lazy; Ping forces creation) + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + t.Fatal(err) + } + if err := db.Ping(); err != nil { + t.Fatal(err) + } + db.Close() + + assertEq(t, "found", FindWarpDBPath(dir), dbPath) +} + +func TestParseWarpConversationMeta(t *testing.T) { + data := `{ + "conversation_usage_metadata":{ + "token_usage":[ + {"warp_tokens":1000,"byok_tokens":200}, + {"warp_tokens":500,"byok_tokens":0} + ], + "tool_usage_metadata":{ + "run_command_stats":{"count":5}, + "read_files_stats":{"count":3}, + "grep_stats":{"count":2}, + "apply_file_diff_stats":{"count":1}, + "search_codebase_stats":{"count":0}, + "file_glob_stats":{"count":0}, + "write_to_long_running_shell_command_stats":{"count":0}, + "read_mcp_resource_stats":{"count":0}, + "call_mcp_tool_stats":{"count":0}, + "suggest_plan_stats":{"count":0}, + "suggest_create_plan_stats":{"count":0}, + "read_shell_command_output_stats":{"count":0}, + "use_computer_stats":{"count":0} + } + } + }` + + meta := parseWarpConversationMeta(data) + assertEq(t, "totalTokens", meta.totalTokens, 1700) + assertEq(t, "RunCommand", meta.toolStats.RunCommand, 5) + assertEq(t, "ReadFiles", meta.toolStats.ReadFiles, 3) + assertEq(t, "Grep", meta.toolStats.Grep, 2) + assertEq(t, "ApplyFileDiff", meta.toolStats.ApplyFileDiff, 1) +} + +func TestParseWarpConversationMeta_Empty(t *testing.T) { + meta := parseWarpConversationMeta("{}") + assertEq(t, "totalTokens", meta.totalTokens, 0) + assertEq(t, "RunCommand", meta.toolStats.RunCommand, 0) +} + +func TestSynthesizeWarpToolMessages(t *testing.T) { + meta := warpConversationMeta{ + toolStats: warpToolStats{ + RunCommand: 2, + ReadFiles: 1, + }, + } + + ordinal := 0 + msgs := synthesizeWarpToolMessages( + meta, parseWarpTimestamp("2026-04-07 10:00:00"), + "auto", &ordinal, + ) + + assertEq(t, "msgs len", len(msgs), 3) // 2 + 1 + assertEq(t, "ordinal after", ordinal, 3) + + // All should be assistant messages with tool use + for _, m := range msgs { + assertEq(t, "Role", m.Role, RoleAssistant) + if !m.HasToolUse { + t.Error("expected HasToolUse=true") + } + if len(m.ToolCalls) != 1 { + t.Errorf("expected 1 tool call, got %d", + len(m.ToolCalls)) + } + } + + // Check categories + assertEq(t, "tc[0].Category", + msgs[0].ToolCalls[0].Category, "Bash") + assertEq(t, "tc[2].Category", + msgs[2].ToolCalls[0].Category, "Read") +} diff --git a/internal/sync/engine.go b/internal/sync/engine.go index 86c89880..5014aae8 100644 --- a/internal/sync/engine.go +++ b/internal/sync/engine.go @@ -1003,6 +1003,47 @@ func (e *Engine) syncAllLocked( return stats } + // Sync Warp sessions (DB-backed, not file-based). + tWarp := time.Now() + warpPending := e.syncWarp(ctx) + if len(warpPending) > 0 { + stats.TotalSessions += len(warpPending) + tWrite := time.Now() + var warpWritten int + for _, pw := range warpPending { + if ctx.Err() != nil { + break + } + switch err := e.writeSessionFull(pw); { + case err == nil: + warpWritten++ + case errors.Is(err, db.ErrSessionExcluded): + // Intentional skip, not a failure. + default: + stats.RecordFailed() + } + } + stats.RecordSynced(warpWritten) + if verbose { + log.Printf( + "warp write: %d sessions in %s", + len(warpPending), + time.Since(tWrite).Round(time.Millisecond), + ) + } + } + if verbose { + log.Printf( + "warp sync: %s", + time.Since(tWarp).Round(time.Millisecond), + ) + } + + if ctx.Err() != nil { + stats.Aborted = true + return stats + } + tPersist := time.Now() skipCount := e.persistSkipCache() if verbose { @@ -2444,7 +2485,12 @@ func (e *Engine) SyncSingleSession(sessionID string) error { return fmt.Errorf("unknown agent for session %s", sessionID) } if !def.FileBased { - return e.syncSingleOpenCode(sessionID) + switch def.Type { + case parser.AgentWarp: + return e.syncSingleWarp(sessionID) + default: + return e.syncSingleOpenCode(sessionID) + } } path := e.FindSourceFile(sessionID) @@ -2634,6 +2680,129 @@ func (e *Engine) syncSingleOpenCode( return fmt.Errorf("opencode session %s not found", sessionID) } +// syncWarp syncs sessions from Warp SQLite databases. +// Uses per-conversation last_modified_at to detect changes, +// so only modified conversations are fully parsed. +func (e *Engine) syncWarp( + ctx context.Context, +) []pendingWrite { + var allPending []pendingWrite + for _, dir := range e.agentDirs[parser.AgentWarp] { + if ctx.Err() != nil { + break + } + if dir == "" { + continue + } + allPending = append( + allPending, e.syncOneWarp(ctx, dir)..., + ) + } + return allPending +} + +// syncOneWarp handles a single Warp directory. +func (e *Engine) syncOneWarp( + ctx context.Context, dir string, +) []pendingWrite { + dbPath := parser.FindWarpDBPath(dir) + if dbPath == "" { + return nil + } + + metas, err := parser.ListWarpSessionMeta(dbPath) + if err != nil { + log.Printf("sync warp: %v", err) + return nil + } + if len(metas) == 0 { + return nil + } + + var changed []string + for _, m := range metas { + _, storedMtime, ok := + e.db.GetFileInfoByPath(m.VirtualPath) + if ok && storedMtime == m.FileMtime { + continue + } + changed = append(changed, m.SessionID) + } + if len(changed) == 0 { + return nil + } + + var pending []pendingWrite + for _, cid := range changed { + if ctx.Err() != nil { + break + } + sess, msgs, err := parser.ParseWarpSession( + dbPath, cid, e.machine, + ) + if err != nil { + log.Printf( + "warp conversation %s: %v", cid, err, + ) + continue + } + if sess == nil { + continue + } + pending = append(pending, pendingWrite{ + sess: *sess, + msgs: msgs, + }) + } + + return pending +} + +// syncSingleWarp re-syncs a single Warp conversation. +func (e *Engine) syncSingleWarp( + sessionID string, +) error { + rawID := strings.TrimPrefix(sessionID, "warp:") + + var lastErr error + for _, dir := range e.agentDirs[parser.AgentWarp] { + if dir == "" { + continue + } + dbPath := parser.FindWarpDBPath(dir) + if dbPath == "" { + continue + } + sess, msgs, err := parser.ParseWarpSession( + dbPath, rawID, e.machine, + ) + if err != nil { + lastErr = err + continue + } + if sess == nil { + continue + } + if err := e.writeSessionFull( + pendingWrite{sess: *sess, msgs: msgs}, + ); err != nil && !errors.Is(err, db.ErrSessionExcluded) { + return fmt.Errorf("write session %s: %w", + sess.ID, err) + } + return nil + } + + if len(e.agentDirs[parser.AgentWarp]) == 0 { + return fmt.Errorf("warp dir not configured") + } + if lastErr != nil { + return fmt.Errorf( + "warp session %s: %w", sessionID, lastErr, + ) + } + return fmt.Errorf("warp session %s not found", sessionID) +} + func strPtr(s string) *string { if s == "" { return nil