From 43e73054610bb9e84dcf54c170565a091ef3a2b0 Mon Sep 17 00:00:00 2001 From: Michael_Dreamer <90915827+MichaelC001@users.noreply.github.com> Date: Fri, 27 Feb 2026 07:51:59 +0800 Subject: [PATCH 1/3] chore: add transcript parser tool Python script to parse Claude Code transcript JSONL into CSV or terminal table format for debugging. Co-Authored-By: Claude Opus 4.6 --- parse_transcript.py | 194 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 parse_transcript.py diff --git a/parse_transcript.py b/parse_transcript.py new file mode 100644 index 0000000..40e1d26 --- /dev/null +++ b/parse_transcript.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 +"""Parse Claude Code transcript JSONL into CSV. + +Usage: + python3 parse_transcript.py [count] [--csv output.csv] + + count: number of meaningful entries to show (default: all) + --csv: output to CSV file (default: print to terminal as table) +""" +import json, sys, csv, io +from datetime import datetime + +path = sys.argv[1] +count = None +csv_path = None + +i = 2 +while i < len(sys.argv): + if sys.argv[i] == '--csv' and i + 1 < len(sys.argv): + csv_path = sys.argv[i + 1] + i += 2 + else: + try: + count = int(sys.argv[i]) + except: + pass + i += 1 + +# Read entire file (for CSV we want everything, for terminal we limit) +with open(path, 'rb') as f: + if count and not csv_path: + # For terminal, read tail only + f.seek(0, 2) + size = f.tell() + tail = 1_000_000 # 1MB + offset = max(0, size - tail) + f.seek(offset) + data = f.read().decode('utf-8', errors='replace') + lines = data.strip().split('\n') + if offset > 0: + lines = lines[1:] + else: + data = f.read().decode('utf-8', errors='replace') + lines = data.strip().split('\n') + +def fmt_ts(obj): + ts = obj.get('timestamp') + if not ts: + return '' + if isinstance(ts, str): + try: + dt = datetime.fromisoformat(ts.replace('Z', '+00:00')) + # Convert to local time + dt = dt.astimezone() + return dt.strftime('%Y-%m-%d %H:%M:%S') + except: + return ts[:19] + try: + dt = datetime.fromtimestamp(ts / 1000 if ts > 1e12 else ts) + return dt.strftime('%Y-%m-%d %H:%M:%S') + except: + return '' + +def short_ts(obj): + ts = obj.get('timestamp') + if ts: + try: + dt = datetime.fromtimestamp(ts / 1000 if ts > 1e12 else ts) + return dt.strftime('%H:%M:%S') + except: + pass + return '' + +rows = [] +for line in lines: + try: + obj = json.loads(line) + except: + continue + + t = obj.get('type', '') + rid = obj.get('requestId', '') + uuid = obj.get('uuid', '') + parent_uuid = obj.get('parentUuid', '') + is_err = obj.get('isApiErrorMessage', False) + is_sidechain = obj.get('isSidechain', False) + timestamp = fmt_ts(obj) + session_id = obj.get('sessionId', '') + cwd = obj.get('cwd', '') + version = obj.get('version', '') + git_branch = obj.get('gitBranch', '') + slug = obj.get('slug', '') + user_type = obj.get('userType', '') + + base = { + 'time': timestamp, 'type': t, 'uuid': uuid, 'parentUuid': parent_uuid, + 'requestId': rid, 'sessionId': session_id, 'isApiErrorMessage': is_err, + 'isSidechain': is_sidechain, 'userType': user_type, + 'cwd': cwd, 'version': version, 'gitBranch': git_branch, 'slug': slug, + 'message_role': '', 'content_type': '', 'tool_name': '', 'tool_use_id': '', + 'is_error': '', 'content': '' + } + + if t == 'assistant': + msg = obj.get('message', {}) + role = msg.get('role', '') + content = msg.get('content', []) + if role != 'assistant': + row = {**base, 'message_role': role} + rows.append(row) + continue + if not content: + row = {**base, 'message_role': role} + rows.append(row) + continue + for b in content: + bt = b.get('type', '') + row = {**base, 'message_role': role, 'content_type': bt} + if bt == 'text': + row['content'] = b.get('text', '')[:2000] + elif bt == 'tool_use': + row['tool_name'] = b.get('name', '') + row['tool_use_id'] = b.get('id', '') + inp = b.get('input', {}) + row['content'] = json.dumps(inp, ensure_ascii=False)[:2000] + elif bt == 'thinking': + row['content'] = b.get('thinking', '')[:500] + else: + row['content'] = json.dumps(b, ensure_ascii=False)[:500] + rows.append(row) + + elif t == 'human': + msg = obj.get('message', {}) + role = msg.get('role', '') + content = msg.get('content', []) + if not content: + row = {**base, 'message_role': role} + rows.append(row) + continue + for b in content: + if isinstance(b, str): + row = {**base, 'message_role': role, 'content_type': 'text', 'content': b[:2000]} + rows.append(row) + elif isinstance(b, dict): + bt = b.get('type', '') + row = {**base, 'message_role': role, 'content_type': bt} + if bt == 'text': + row['content'] = b.get('text', '')[:2000] + elif bt == 'tool_result': + row['tool_use_id'] = b.get('tool_use_id', '') + row['is_error'] = b.get('is_error', False) + cont = b.get('content', '') + if isinstance(cont, list): + row['content'] = json.dumps(cont, ensure_ascii=False)[:2000] + else: + row['content'] = str(cont)[:2000] + else: + row['content'] = json.dumps(b, ensure_ascii=False)[:500] + rows.append(row) + + else: + # progress, system, user, file-history-snapshot, queue-operation, etc. + data = obj.get('data', '') + tool_use_id = obj.get('toolUseID', '') + row = {**base, 'tool_use_id': tool_use_id} + if data: + row['content'] = json.dumps(data, ensure_ascii=False)[:500] if not isinstance(data, str) else data[:500] + rows.append(row) + +if count: + rows = rows[-count:] + +fields = ['time', 'type', 'uuid', 'parentUuid', 'requestId', 'sessionId', + 'isApiErrorMessage', 'isSidechain', 'userType', 'cwd', 'version', + 'gitBranch', 'slug', 'message_role', 'content_type', 'tool_name', + 'tool_use_id', 'is_error', 'content'] + +if csv_path: + with open(csv_path, 'w', newline='', encoding='utf-8') as f: + w = csv.DictWriter(f, fieldnames=fields) + w.writeheader() + w.writerows(rows) + print(f'Wrote {len(rows)} rows to {csv_path}') +else: + # Terminal: compact table with key columns only + for r in rows: + r['content'] = r['content'].replace('\n', ' | ')[:100] if r.get('content') else '' + r['requestId'] = r['requestId'][:16] + r['time'] = r['time'][11:] if len(r['time']) > 11 else r['time'] + + print(f'{"TIME":<9} {"TYPE":<12} {"ROLE":<10} {"C_TYPE":<12} {"TOOL":<10} {"RID":<18} CONTENT') + print('-' * 130) + for r in rows: + print(f'{r["time"]:<9} {r["type"]:<12} {r["message_role"]:<10} {r["content_type"]:<12} {r["tool_name"]:<10} {r["requestId"]:<18} {r["content"]}') From 2353ca3cfbf750b3c413313a4c309e3c881325fd Mon Sep 17 00:00:00 2001 From: Michael_Dreamer <90915827+MichaelC001@users.noreply.github.com> Date: Fri, 27 Feb 2026 08:10:19 +0800 Subject: [PATCH 2/3] feat: replace JSONL ledger with SQLite database MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New db.go: SQLite-backed message tracking and tool state - INSERT OR IGNORE for atomic dedup (fixes parallel hook race) - Telegram→terminal delivery confirmation with 10s auto-retry - Delete ledger.go (JSONL implementation) - Update tests to use temp SQLite DB Co-Authored-By: Claude Opus 4.6 --- commands.go | 12 ++- db.go | 298 +++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 7 ++ go.sum | 14 +++ hooks.go | 40 +------ ledger.go | 188 -------------------------------- main.go | 1 + main_test.go | 61 +++++------ 8 files changed, 363 insertions(+), 258 deletions(-) create mode 100644 db.go delete mode 100644 ledger.go diff --git a/commands.go b/commands.go index ddbfcb3..e646579 100644 --- a/commands.go +++ b/commands.go @@ -1322,7 +1322,7 @@ func listen() error { target := tmuxTargetByID(windowID, tmuxName) listenLog("sendToTmux: target=%s window=%s", target, tmuxName) - // Record in ledger before sending + // Record in DB before sending ledgerID := fmt.Sprintf("tg:%d", update.UpdateID) appendMessage(&MessageRecord{ ID: ledgerID, @@ -1338,7 +1338,15 @@ func listen() error { listenLog("sendToTmux FAILED: target=%s err=%v", target, err) sendMessage(config, chatID, threadID, fmt.Sprintf("❌ Failed to send: %v", err)) } else { - updateDelivery(sessName, ledgerID, "terminal_delivered", true) + // Delayed confirmation: check after 10s if Claude received it + go func(sid, lid, tgt, wn, txt string, cid, tid int64) { + time.Sleep(10 * time.Second) + if hasUnconfirmedPrompt(sid, lid) { + listenLog("delivery unconfirmed after 10s, resending: session=%s id=%s", sid, lid) + sendToTmuxFromTelegram(tgt, wn, txt) + sendMessage(config, cid, tid, "⚠️ 消息未确认,已重发") + } + }(sessName, ledgerID, target, tmuxName, text, chatID, threadID) } } else { sendMessage(config, chatID, threadID, "⚠️ No session linked to this topic. Use /new to create one.") diff --git a/db.go b/db.go new file mode 100644 index 0000000..290145d --- /dev/null +++ b/db.go @@ -0,0 +1,298 @@ +package main + +import ( + "crypto/sha256" + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + _ "modernc.org/sqlite" +) + +// MessageRecord tracks the delivery state of a single message +type MessageRecord struct { + ID string `json:"id"` + Session string `json:"session"` + Type string `json:"type"` // user_prompt / assistant_text / tool_call / notification + Text string `json:"text"` + Origin string `json:"origin"` // terminal / telegram / claude + TerminalDelivered bool `json:"terminal_delivered"` + TelegramDelivered bool `json:"telegram_delivered"` + TelegramMsgID int64 `json:"telegram_msg_id,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +var ( + dbOnce sync.Once + dbInstance *sql.DB + dbPath = func() string { return filepath.Join(cacheDir(), "ccc.db") } +) + +// openDB opens (or creates) the SQLite database and ensures tables exist. +// Safe to call multiple times — uses sync.Once internally. +func openDB() *sql.DB { + dbOnce.Do(func() { + path := dbPath() + os.MkdirAll(filepath.Dir(path), 0755) + + + db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(wal)&_pragma=busy_timeout(5000)") + if err != nil { + hookLog("db: open failed: %v", err) + return + } + + // Create tables + for _, stmt := range []string{ + `CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + session TEXT NOT NULL, + type TEXT NOT NULL, + text TEXT, + origin TEXT, + terminal_delivered INTEGER DEFAULT 0, + telegram_delivered INTEGER DEFAULT 0, + telegram_msg_id INTEGER DEFAULT 0, + created_at INTEGER NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session)`, + `CREATE INDEX IF NOT EXISTS idx_messages_undelivered_tg ON messages(session, telegram_delivered) WHERE telegram_delivered = 0`, + `CREATE INDEX IF NOT EXISTS idx_messages_undelivered_tm ON messages(session, terminal_delivered) WHERE terminal_delivered = 0`, + `CREATE TABLE IF NOT EXISTS tool_state ( + session TEXT PRIMARY KEY, + telegram_msg_id INTEGER DEFAULT 0, + tools_json TEXT DEFAULT '[]' + )`, + } { + if _, err := db.Exec(stmt); err != nil { + hookLog("db: create table failed: %v", err) + } + } + + dbInstance = db + }) + return dbInstance +} + +// closeDB closes the database connection +func closeDB() { + if dbInstance != nil { + dbInstance.Close() + } +} + +// appendMessage inserts a message record. Duplicate IDs are silently ignored. +func appendMessage(rec *MessageRecord) error { + db := openDB() + if db == nil { + return fmt.Errorf("db not open") + } + if rec.Timestamp == 0 { + rec.Timestamp = time.Now().UnixMilli() + } + _, err := db.Exec( + `INSERT OR IGNORE INTO messages (id, session, type, text, origin, terminal_delivered, telegram_delivered, telegram_msg_id, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + rec.ID, rec.Session, rec.Type, rec.Text, rec.Origin, + boolToInt(rec.TerminalDelivered), boolToInt(rec.TelegramDelivered), + rec.TelegramMsgID, rec.Timestamp, + ) + return err +} + +// updateDelivery updates a specific delivery field for a message +func updateDelivery(session, msgID, field string, value any) error { + db := openDB() + if db == nil { + return fmt.Errorf("db not open") + } + var col string + switch field { + case "terminal_delivered": + col = "terminal_delivered" + case "telegram_delivered": + col = "telegram_delivered" + case "telegram_msg_id": + col = "telegram_msg_id" + default: + return fmt.Errorf("unknown field: %s", field) + } + _, err := db.Exec( + fmt.Sprintf(`UPDATE messages SET %s = ? WHERE id = ?`, col), + value, msgID, + ) + return err +} + +// isDelivered checks if a message has been delivered to the given target +func isDelivered(session, msgID, target string) bool { + db := openDB() + if db == nil { + return false + } + var col string + switch target { + case "telegram": + col = "telegram_delivered" + case "terminal": + col = "terminal_delivered" + default: + return false + } + var delivered int + err := db.QueryRow( + fmt.Sprintf(`SELECT %s FROM messages WHERE id = ?`, col), + msgID, + ).Scan(&delivered) + if err != nil { + return false + } + return delivered != 0 +} + +// findUndelivered returns messages not yet delivered to the given target +func findUndelivered(session, target string) []*MessageRecord { + db := openDB() + if db == nil { + return nil + } + var col string + switch target { + case "telegram": + col = "telegram_delivered" + case "terminal": + col = "terminal_delivered" + default: + return nil + } + rows, err := db.Query( + fmt.Sprintf(`SELECT id, session, type, text, origin, terminal_delivered, telegram_delivered, telegram_msg_id, created_at + FROM messages WHERE session = ? AND %s = 0 ORDER BY created_at`, col), + session, + ) + if err != nil { + return nil + } + defer rows.Close() + + var result []*MessageRecord + for rows.Next() { + var r MessageRecord + var termDel, tgDel int + if err := rows.Scan(&r.ID, &r.Session, &r.Type, &r.Text, &r.Origin, + &termDel, &tgDel, &r.TelegramMsgID, &r.Timestamp); err != nil { + continue + } + r.TerminalDelivered = termDel != 0 + r.TelegramDelivered = tgDel != 0 + result = append(result, &r) + } + return result +} + +// confirmTerminalDelivery marks the most recent unconfirmed Telegram message as delivered to terminal. +// Called from UserPromptSubmit hook when Claude confirms it received a prompt. +func confirmTerminalDelivery(session, promptText string) { + db := openDB() + if db == nil { + return + } + cutoff := time.Now().Add(-30 * time.Second).UnixMilli() + _, err := db.Exec( + `UPDATE messages SET terminal_delivered = 1 + WHERE session = ? AND origin = 'telegram' AND type = 'user_prompt' + AND terminal_delivered = 0 AND created_at > ?`, + session, cutoff, + ) + if err != nil { + hookLog("db: confirmTerminalDelivery failed: %v", err) + } +} + +// hasUnconfirmedPrompt checks if a Telegram prompt is still unconfirmed for terminal delivery +func hasUnconfirmedPrompt(session, msgID string) bool { + db := openDB() + if db == nil { + return false + } + var termDel int + err := db.QueryRow( + `SELECT terminal_delivered FROM messages WHERE id = ?`, msgID, + ).Scan(&termDel) + if err != nil { + return false + } + return termDel == 0 +} + +// --- Tool State --- + +// ToolState tracks tool calls and the Telegram message ID for live updates +type ToolState struct { + MsgID int64 `json:"msg_id"` + Tools []ToolCall `json:"tools"` +} + +type ToolCall struct { + Name string `json:"name"` + Input string `json:"input"` + IsText bool `json:"is_text,omitempty"` + Time int64 `json:"time,omitempty"` +} + +func loadToolState(session string) *ToolState { + db := openDB() + if db == nil { + return &ToolState{} + } + var msgID int64 + var toolsJSON string + err := db.QueryRow( + `SELECT telegram_msg_id, tools_json FROM tool_state WHERE session = ?`, session, + ).Scan(&msgID, &toolsJSON) + if err != nil { + return &ToolState{} + } + var tools []ToolCall + json.Unmarshal([]byte(toolsJSON), &tools) + return &ToolState{MsgID: msgID, Tools: tools} +} + +func saveToolState(session string, state *ToolState) { + db := openDB() + if db == nil { + return + } + toolsJSON, _ := json.Marshal(state.Tools) + db.Exec( + `INSERT OR REPLACE INTO tool_state (session, telegram_msg_id, tools_json) VALUES (?, ?, ?)`, + session, state.MsgID, string(toolsJSON), + ) +} + +func clearToolState(session string) { + db := openDB() + if db == nil { + return + } + db.Exec(`DELETE FROM tool_state WHERE session = ?`, session) +} + +// --- Helpers --- + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} + +// contentHash returns a short hash of content for dedup IDs +func contentHash(s string) string { + h := sha256.Sum256([]byte(s)) + return fmt.Sprintf("%x", h[:4]) +} diff --git a/go.mod b/go.mod index c244ee1..756ca1f 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,9 @@ require ( github.com/mutablelogic/go-client v1.3.5 // indirect github.com/mutablelogic/go-media v1.8.3 // indirect github.com/mutablelogic/go-server v1.5.18 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect github.com/pquerna/otp v1.5.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect @@ -53,6 +55,7 @@ require ( go.opentelemetry.io/otel/sdk v1.39.0 // indirect go.opentelemetry.io/otel/trace v1.39.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect @@ -62,5 +65,9 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20260122232226-8e98ce8d340d // indirect google.golang.org/grpc v1.78.0 // indirect google.golang.org/protobuf v1.36.11 // indirect + modernc.org/libc v1.67.6 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect + modernc.org/sqlite v1.46.1 // indirect rsc.io/qr v0.2.0 // indirect ) diff --git a/go.sum b/go.sum index 60aa8c7..70c864c 100644 --- a/go.sum +++ b/go.sum @@ -86,10 +86,14 @@ github.com/mutablelogic/go-server v1.5.18 h1:UKpJQReabHFMz1U/gbOq/+Q0C0ZzklVBalv github.com/mutablelogic/go-server v1.5.18/go.mod h1:swZf3T0eGe9VEE0Ki37WknpN+XxTGj2Xn6EP7BJm9x0= github.com/mutablelogic/go-whisper v0.0.39 h1:l064L4BJuhCc5eJA54sanmZ+5Bm1sfxO6x7EHQMRELw= github.com/mutablelogic/go-whisper v0.0.39/go.mod h1:iOMw8nfd/j1MCYXRGlim5r+RH32aB4Ynx/o9++bmVZU= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pquerna/otp v1.5.0 h1:NMMR+WrmaqXU4EzdGJEE1aUUI0AMRzsp96fFFWNPwxs= github.com/pquerna/otp v1.5.0/go.mod h1:dkJfzwRKNiegxyNb54X/3fLwhCynbMspSyWKnvi1AEg= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -120,6 +124,8 @@ go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjce go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= @@ -144,5 +150,13 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/sqlite v1.46.1 h1:eFJ2ShBLIEnUWlLy12raN0Z1plqmFX9Qe3rjQTKt6sU= +modernc.org/sqlite v1.46.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs= diff --git a/hooks.go b/hooks.go index 48578d8..db4fb36 100644 --- a/hooks.go +++ b/hooks.go @@ -60,44 +60,6 @@ func waitPromptAck(sessionName string, timeout time.Duration) bool { return false } -// toolStatePath returns the path for tool call display state -func toolStatePath(sessionName string) string { - return filepath.Join(cacheDir(), "tools-"+sessionName+".json") -} - -// ToolState tracks tool calls and the Telegram message ID for live updates -type ToolState struct { - MsgID int64 `json:"msg_id"` - Tools []ToolCall `json:"tools"` -} - -type ToolCall struct { - Name string `json:"name"` - Input string `json:"input"` - IsText bool `json:"is_text,omitempty"` // true for assistant text - Time int64 `json:"time,omitempty"` // unix ms for ordering -} - -func loadToolState(sessionName string) *ToolState { - data, err := os.ReadFile(toolStatePath(sessionName)) - if err != nil { - return &ToolState{} - } - var state ToolState - if json.Unmarshal(data, &state) != nil { - return &ToolState{} - } - return &state -} - -func saveToolState(sessionName string, state *ToolState) { - data, _ := json.Marshal(state) - os.WriteFile(toolStatePath(sessionName), data, 0600) -} - -func clearToolState(sessionName string) { - os.Remove(toolStatePath(sessionName)) -} // addTextToToolState adds an assistant text block to the tool state, ordered by timestamp. func addTextToToolState(sessName string, text string, ts int64) { @@ -761,6 +723,8 @@ func handleUserPromptHook() error { os.Remove(telegramActiveFlag(tmuxName)) writePromptAck(sessName) setThinking(sessName) + // Confirm terminal delivery for the Telegram message + confirmTerminalDelivery(sessName, hookData.Prompt) // Record: came from Telegram, both sides have it appendMessage(&MessageRecord{ ID: fmt.Sprintf("prompt:%s:%d", hookData.SessionID, time.Now().UnixNano()), diff --git a/ledger.go b/ledger.go deleted file mode 100644 index 6cd3b16..0000000 --- a/ledger.go +++ /dev/null @@ -1,188 +0,0 @@ -package main - -import ( - "bufio" - "crypto/sha256" - "encoding/json" - "fmt" - "os" - "path/filepath" - "sync" - "time" -) - -// MessageRecord tracks the delivery state of a single message -type MessageRecord struct { - ID string `json:"id"` // unique: "{requestId}:{hash}" or "tg:{update_id}" - Session string `json:"session"` // session name - Type string `json:"type"` // user_prompt / tool_call / assistant_text / notification - Text string `json:"text"` // message content - Origin string `json:"origin"` // terminal / telegram / claude - TerminalDelivered bool `json:"terminal_delivered"` // whether terminal received it - TelegramDelivered bool `json:"telegram_delivered"` // whether Telegram received it - TelegramMsgID int64 `json:"telegram_msg_id,omitempty"` // Telegram message ID (for editing) - Timestamp int64 `json:"timestamp"` // unix timestamp - Update string `json:"update,omitempty"` // if set, this is an update record for the given ID - UpdateField string `json:"update_field,omitempty"` // field name to update - UpdateValue any `json:"update_value,omitempty"` // new value -} - -var ledgerMu sync.Mutex - -func ledgerPath(session string) string { - return filepath.Join(cacheDir(), "ledger-"+session+".jsonl") -} - -// appendMessage writes a new message record to the session's ledger -func appendMessage(rec *MessageRecord) error { - if rec.Timestamp == 0 { - rec.Timestamp = time.Now().Unix() - } - ledgerMu.Lock() - defer ledgerMu.Unlock() - - f, err := os.OpenFile(ledgerPath(rec.Session), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return err - } - defer f.Close() - - data, err := json.Marshal(rec) - if err != nil { - return err - } - _, err = fmt.Fprintf(f, "%s\n", data) - return err -} - -// updateDelivery appends an update record to the ledger -func updateDelivery(session, msgID, field string, value any) error { - rec := &MessageRecord{ - Update: msgID, - Session: session, - UpdateField: field, - UpdateValue: value, - Timestamp: time.Now().Unix(), - } - ledgerMu.Lock() - defer ledgerMu.Unlock() - - f, err := os.OpenFile(ledgerPath(session), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return err - } - defer f.Close() - - data, err := json.Marshal(rec) - if err != nil { - return err - } - _, err = fmt.Fprintf(f, "%s\n", data) - return err -} - -// readLedger reads all records from a session's ledger and merges updates -func readLedger(session string) []*MessageRecord { - ledgerMu.Lock() - defer ledgerMu.Unlock() - - f, err := os.Open(ledgerPath(session)) - if err != nil { - return nil - } - defer f.Close() - - byID := make(map[string]*MessageRecord) - var order []string - - scanner := bufio.NewScanner(f) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) - for scanner.Scan() { - var rec MessageRecord - if json.Unmarshal(scanner.Bytes(), &rec) != nil { - continue - } - // Update record: apply to existing - if rec.Update != "" { - if orig, ok := byID[rec.Update]; ok { - applyUpdate(orig, rec.UpdateField, rec.UpdateValue) - } - continue - } - if rec.ID == "" { - continue - } - if _, exists := byID[rec.ID]; !exists { - order = append(order, rec.ID) - } - r := rec // copy - byID[rec.ID] = &r - } - - var result []*MessageRecord - for _, id := range order { - result = append(result, byID[id]) - } - return result -} - -func applyUpdate(rec *MessageRecord, field string, value any) { - switch field { - case "terminal_delivered": - if v, ok := value.(bool); ok { - rec.TerminalDelivered = v - } - case "telegram_delivered": - if v, ok := value.(bool); ok { - rec.TelegramDelivered = v - } - case "telegram_msg_id": - switch v := value.(type) { - case float64: - rec.TelegramMsgID = int64(v) - case int64: - rec.TelegramMsgID = v - } - } -} - -// isDelivered checks if a message ID has already been delivered to the given target -func isDelivered(session, msgID, target string) bool { - records := readLedger(session) - for _, r := range records { - if r.ID == msgID { - switch target { - case "telegram": - return r.TelegramDelivered - case "terminal": - return r.TerminalDelivered - } - } - } - return false -} - -// findUndelivered returns messages not yet delivered to the given target ("telegram" or "terminal") -func findUndelivered(session, target string) []*MessageRecord { - records := readLedger(session) - var result []*MessageRecord - for _, r := range records { - switch target { - case "telegram": - if !r.TelegramDelivered { - result = append(result, r) - } - case "terminal": - if !r.TerminalDelivered { - result = append(result, r) - } - } - } - return result -} - -// contentHash returns a short hash of content for dedup IDs -func contentHash(s string) string { - h := sha256.Sum256([]byte(s)) - return fmt.Sprintf("%x", h[:4]) -} diff --git a/main.go b/main.go index b083cd0..855e8aa 100644 --- a/main.go +++ b/main.go @@ -328,6 +328,7 @@ func main() { } case "listen": + defer closeDB() if err := listen(); err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) diff --git a/main_test.go b/main_test.go index 04b592c..3ff75ab 100644 --- a/main_test.go +++ b/main_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "os" "path/filepath" + "sync" "testing" ) @@ -629,12 +630,15 @@ func TestReplyToMessage(t *testing.T) { } } -// TestLedgerAppendAndRead tests basic ledger operations -func TestLedgerAppendAndRead(t *testing.T) { - // Use a unique session name with temp suffix so the ledger file doesn't collide - session := "test-ledger-" + filepath.Base(t.TempDir()) - // Clean up after test - defer os.Remove(ledgerPath(session)) +// TestDBAppendAndQuery tests basic SQLite DB operations +func TestDBAppendAndQuery(t *testing.T) { + // Use temp DB for testing + tmpDir := t.TempDir() + origPath := dbPath + dbPath = func() string { return filepath.Join(tmpDir, "test.db") } + defer func() { dbPath = origPath; closeDB(); dbOnce = sync.Once{} }() + + session := "test-session" // Append a message rec := &MessageRecord{ @@ -650,41 +654,38 @@ func TestLedgerAppendAndRead(t *testing.T) { t.Fatalf("appendMessage failed: %v", err) } - // Read back - records := readLedger(session) - if len(records) != 1 { - t.Fatalf("readLedger returned %d records, want 1", len(records)) - } - if records[0].ID != "test:1" { - t.Errorf("ID = %q, want test:1", records[0].ID) + // isDelivered checks + if isDelivered(session, "test:1", "terminal") { + t.Error("terminal should not be delivered yet") } - if records[0].TerminalDelivered { - t.Error("TerminalDelivered should be false") + if !isDelivered(session, "test:1", "telegram") { + t.Error("telegram should be delivered") } // Update delivery - if err := updateDelivery(session, "test:1", "terminal_delivered", true); err != nil { + if err := updateDelivery(session, "test:1", "terminal_delivered", 1); err != nil { t.Fatalf("updateDelivery failed: %v", err) } - - // Read again — should be merged - records = readLedger(session) - if len(records) != 1 { - t.Fatalf("readLedger returned %d records after update, want 1", len(records)) - } - if !records[0].TerminalDelivered { - t.Error("TerminalDelivered should be true after update") + if !isDelivered(session, "test:1", "terminal") { + t.Error("terminal should be delivered after update") } - // Test isDelivered + // Dedup: INSERT OR IGNORE should not overwrite + appendMessage(&MessageRecord{ + ID: "test:1", + Session: session, + Type: "user_prompt", + Text: "different text", + Origin: "telegram", + TerminalDelivered: false, + TelegramDelivered: false, + }) + // Should still show original delivery status if !isDelivered(session, "test:1", "terminal") { - t.Error("isDelivered(terminal) should be true") - } - if !isDelivered(session, "test:1", "telegram") { - t.Error("isDelivered(telegram) should be true") + t.Error("dedup failed: terminal delivery was overwritten") } - // Test findUndelivered + // findUndelivered appendMessage(&MessageRecord{ ID: "test:2", Session: session, From 8e02eec950d29c7438c731c2d35acebdd665f6f7 Mon Sep 17 00:00:00 2001 From: Michael_Dreamer <90915827+MichaelC001@users.noreply.github.com> Date: Fri, 27 Feb 2026 09:10:00 +0800 Subject: [PATCH 3/3] fix: use ON CONFLICT UPDATE for message delivery flags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit INSERT OR IGNORE caused delivery flags to stay at 0 when the first insert had telegram_delivered=false. Subsequent inserts were ignored, so the message was never marked as delivered and got resent repeatedly. Use ON CONFLICT DO UPDATE SET with MAX() to only upgrade flags (false→true), never downgrade. Co-Authored-By: Claude Opus 4.6 --- db.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index 290145d..a66d95c 100644 --- a/db.go +++ b/db.go @@ -85,7 +85,8 @@ func closeDB() { } } -// appendMessage inserts a message record. Duplicate IDs are silently ignored. +// appendMessage inserts a message record. If the ID already exists, +// delivery flags are upgraded (false→true) but never downgraded (true→false). func appendMessage(rec *MessageRecord) error { db := openDB() if db == nil { @@ -95,8 +96,12 @@ func appendMessage(rec *MessageRecord) error { rec.Timestamp = time.Now().UnixMilli() } _, err := db.Exec( - `INSERT OR IGNORE INTO messages (id, session, type, text, origin, terminal_delivered, telegram_delivered, telegram_msg_id, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + `INSERT INTO messages (id, session, type, text, origin, terminal_delivered, telegram_delivered, telegram_msg_id, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + terminal_delivered = MAX(terminal_delivered, excluded.terminal_delivered), + telegram_delivered = MAX(telegram_delivered, excluded.telegram_delivered), + telegram_msg_id = CASE WHEN excluded.telegram_msg_id > 0 THEN excluded.telegram_msg_id ELSE telegram_msg_id END`, rec.ID, rec.Session, rec.Type, rec.Text, rec.Origin, boolToInt(rec.TerminalDelivered), boolToInt(rec.TelegramDelivered), rec.TelegramMsgID, rec.Timestamp,