Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,7 @@ func listen() error {
target := tmuxTargetByID(windowID, tmuxName)
listenLog("sendToTmux: target=%s window=%s", target, tmuxName)

// Record in ledger before sending
// Record in DB before sending
ledgerID := fmt.Sprintf("tg:%d", update.UpdateID)
appendMessage(&MessageRecord{
ID: ledgerID,
Expand All @@ -1338,7 +1338,15 @@ func listen() error {
listenLog("sendToTmux FAILED: target=%s err=%v", target, err)
sendMessage(config, chatID, threadID, fmt.Sprintf("❌ Failed to send: %v", err))
} else {
updateDelivery(sessName, ledgerID, "terminal_delivered", true)
// Delayed confirmation: check after 10s if Claude received it
go func(sid, lid, tgt, wn, txt string, cid, tid int64) {
time.Sleep(10 * time.Second)
if hasUnconfirmedPrompt(sid, lid) {
listenLog("delivery unconfirmed after 10s, resending: session=%s id=%s", sid, lid)
sendToTmuxFromTelegram(tgt, wn, txt)
sendMessage(config, cid, tid, "⚠️ 消息未确认,已重发")
}
}(sessName, ledgerID, target, tmuxName, text, chatID, threadID)
}
} else {
sendMessage(config, chatID, threadID, "⚠️ No session linked to this topic. Use /new <name> to create one.")
Expand Down
303 changes: 303 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
package main

import (
"crypto/sha256"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"

_ "modernc.org/sqlite"
)

// MessageRecord tracks the delivery state of a single message
type MessageRecord struct {
ID string `json:"id"`
Session string `json:"session"`
Type string `json:"type"` // user_prompt / assistant_text / tool_call / notification
Text string `json:"text"`
Origin string `json:"origin"` // terminal / telegram / claude
TerminalDelivered bool `json:"terminal_delivered"`
TelegramDelivered bool `json:"telegram_delivered"`
TelegramMsgID int64 `json:"telegram_msg_id,omitempty"`
Timestamp int64 `json:"timestamp"`
}

var (
dbOnce sync.Once
dbInstance *sql.DB
dbPath = func() string { return filepath.Join(cacheDir(), "ccc.db") }
)

// openDB opens (or creates) the SQLite database and ensures tables exist.
// Safe to call multiple times — uses sync.Once internally.
func openDB() *sql.DB {
dbOnce.Do(func() {
path := dbPath()
os.MkdirAll(filepath.Dir(path), 0755)


db, err := sql.Open("sqlite", path+"?_pragma=journal_mode(wal)&_pragma=busy_timeout(5000)")
if err != nil {
hookLog("db: open failed: %v", err)
return
}

// Create tables
for _, stmt := range []string{
`CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
session TEXT NOT NULL,
type TEXT NOT NULL,
text TEXT,
origin TEXT,
terminal_delivered INTEGER DEFAULT 0,
telegram_delivered INTEGER DEFAULT 0,
telegram_msg_id INTEGER DEFAULT 0,
created_at INTEGER NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session)`,
`CREATE INDEX IF NOT EXISTS idx_messages_undelivered_tg ON messages(session, telegram_delivered) WHERE telegram_delivered = 0`,
`CREATE INDEX IF NOT EXISTS idx_messages_undelivered_tm ON messages(session, terminal_delivered) WHERE terminal_delivered = 0`,
`CREATE TABLE IF NOT EXISTS tool_state (
session TEXT PRIMARY KEY,
telegram_msg_id INTEGER DEFAULT 0,
tools_json TEXT DEFAULT '[]'
)`,
} {
if _, err := db.Exec(stmt); err != nil {
hookLog("db: create table failed: %v", err)
}
}

dbInstance = db
})
return dbInstance
}

// closeDB closes the database connection
func closeDB() {
if dbInstance != nil {
dbInstance.Close()
}
}

// appendMessage inserts a message record. If the ID already exists,
// delivery flags are upgraded (false→true) but never downgraded (true→false).
func appendMessage(rec *MessageRecord) error {
db := openDB()
if db == nil {
return fmt.Errorf("db not open")
}
if rec.Timestamp == 0 {
rec.Timestamp = time.Now().UnixMilli()
}
_, err := db.Exec(
`INSERT INTO messages (id, session, type, text, origin, terminal_delivered, telegram_delivered, telegram_msg_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
terminal_delivered = MAX(terminal_delivered, excluded.terminal_delivered),
telegram_delivered = MAX(telegram_delivered, excluded.telegram_delivered),
telegram_msg_id = CASE WHEN excluded.telegram_msg_id > 0 THEN excluded.telegram_msg_id ELSE telegram_msg_id END`,
rec.ID, rec.Session, rec.Type, rec.Text, rec.Origin,
boolToInt(rec.TerminalDelivered), boolToInt(rec.TelegramDelivered),
rec.TelegramMsgID, rec.Timestamp,
)
return err
}

// updateDelivery updates a specific delivery field for a message
func updateDelivery(session, msgID, field string, value any) error {
db := openDB()
if db == nil {
return fmt.Errorf("db not open")
}
var col string
switch field {
case "terminal_delivered":
col = "terminal_delivered"
case "telegram_delivered":
col = "telegram_delivered"
case "telegram_msg_id":
col = "telegram_msg_id"
default:
return fmt.Errorf("unknown field: %s", field)
}
_, err := db.Exec(
fmt.Sprintf(`UPDATE messages SET %s = ? WHERE id = ?`, col),
value, msgID,
)
return err
}

// isDelivered checks if a message has been delivered to the given target
func isDelivered(session, msgID, target string) bool {
db := openDB()
if db == nil {
return false
}
var col string
switch target {
case "telegram":
col = "telegram_delivered"
case "terminal":
col = "terminal_delivered"
default:
return false
}
var delivered int
err := db.QueryRow(
fmt.Sprintf(`SELECT %s FROM messages WHERE id = ?`, col),
msgID,
).Scan(&delivered)
if err != nil {
return false
}
return delivered != 0
}

// findUndelivered returns messages not yet delivered to the given target
func findUndelivered(session, target string) []*MessageRecord {
db := openDB()
if db == nil {
return nil
}
var col string
switch target {
case "telegram":
col = "telegram_delivered"
case "terminal":
col = "terminal_delivered"
default:
return nil
}
rows, err := db.Query(
fmt.Sprintf(`SELECT id, session, type, text, origin, terminal_delivered, telegram_delivered, telegram_msg_id, created_at
FROM messages WHERE session = ? AND %s = 0 ORDER BY created_at`, col),
session,
)
if err != nil {
return nil
}
defer rows.Close()

var result []*MessageRecord
for rows.Next() {
var r MessageRecord
var termDel, tgDel int
if err := rows.Scan(&r.ID, &r.Session, &r.Type, &r.Text, &r.Origin,
&termDel, &tgDel, &r.TelegramMsgID, &r.Timestamp); err != nil {
continue
}
r.TerminalDelivered = termDel != 0
r.TelegramDelivered = tgDel != 0
result = append(result, &r)
}
return result
}

// confirmTerminalDelivery marks the most recent unconfirmed Telegram message as delivered to terminal.
// Called from UserPromptSubmit hook when Claude confirms it received a prompt.
func confirmTerminalDelivery(session, promptText string) {
db := openDB()
if db == nil {
return
}
cutoff := time.Now().Add(-30 * time.Second).UnixMilli()
_, err := db.Exec(
`UPDATE messages SET terminal_delivered = 1
WHERE session = ? AND origin = 'telegram' AND type = 'user_prompt'
AND terminal_delivered = 0 AND created_at > ?`,
session, cutoff,
)
if err != nil {
hookLog("db: confirmTerminalDelivery failed: %v", err)
}
}

// hasUnconfirmedPrompt checks if a Telegram prompt is still unconfirmed for terminal delivery
func hasUnconfirmedPrompt(session, msgID string) bool {
db := openDB()
if db == nil {
return false
}
var termDel int
err := db.QueryRow(
`SELECT terminal_delivered FROM messages WHERE id = ?`, msgID,
).Scan(&termDel)
if err != nil {
return false
}
return termDel == 0
}

// --- Tool State ---

// ToolState tracks tool calls and the Telegram message ID for live updates
type ToolState struct {
MsgID int64 `json:"msg_id"`
Tools []ToolCall `json:"tools"`
}

type ToolCall struct {
Name string `json:"name"`
Input string `json:"input"`
IsText bool `json:"is_text,omitempty"`
Time int64 `json:"time,omitempty"`
}

func loadToolState(session string) *ToolState {
db := openDB()
if db == nil {
return &ToolState{}
}
var msgID int64
var toolsJSON string
err := db.QueryRow(
`SELECT telegram_msg_id, tools_json FROM tool_state WHERE session = ?`, session,
).Scan(&msgID, &toolsJSON)
if err != nil {
return &ToolState{}
}
var tools []ToolCall
json.Unmarshal([]byte(toolsJSON), &tools)
return &ToolState{MsgID: msgID, Tools: tools}
}

func saveToolState(session string, state *ToolState) {
db := openDB()
if db == nil {
return
}
toolsJSON, _ := json.Marshal(state.Tools)
db.Exec(
`INSERT OR REPLACE INTO tool_state (session, telegram_msg_id, tools_json) VALUES (?, ?, ?)`,
session, state.MsgID, string(toolsJSON),
)
}

func clearToolState(session string) {
db := openDB()
if db == nil {
return
}
db.Exec(`DELETE FROM tool_state WHERE session = ?`, session)
}

// --- Helpers ---

func boolToInt(b bool) int {
if b {
return 1
}
return 0
}

// contentHash returns a short hash of content for dedup IDs
func contentHash(s string) string {
h := sha256.Sum256([]byte(s))
return fmt.Sprintf("%x", h[:4])
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ require (
github.com/mutablelogic/go-client v1.3.5 // indirect
github.com/mutablelogic/go-media v1.8.3 // indirect
github.com/mutablelogic/go-server v1.5.18 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/pquerna/otp v1.5.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
Expand All @@ -53,6 +55,7 @@ require (
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
Expand All @@ -62,5 +65,9 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20260122232226-8e98ce8d340d // indirect
google.golang.org/grpc v1.78.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
modernc.org/libc v1.67.6 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
modernc.org/sqlite v1.46.1 // indirect
rsc.io/qr v0.2.0 // indirect
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ github.com/mutablelogic/go-server v1.5.18 h1:UKpJQReabHFMz1U/gbOq/+Q0C0ZzklVBalv
github.com/mutablelogic/go-server v1.5.18/go.mod h1:swZf3T0eGe9VEE0Ki37WknpN+XxTGj2Xn6EP7BJm9x0=
github.com/mutablelogic/go-whisper v0.0.39 h1:l064L4BJuhCc5eJA54sanmZ+5Bm1sfxO6x7EHQMRELw=
github.com/mutablelogic/go-whisper v0.0.39/go.mod h1:iOMw8nfd/j1MCYXRGlim5r+RH32aB4Ynx/o9++bmVZU=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pquerna/otp v1.5.0 h1:NMMR+WrmaqXU4EzdGJEE1aUUI0AMRzsp96fFFWNPwxs=
github.com/pquerna/otp v1.5.0/go.mod h1:dkJfzwRKNiegxyNb54X/3fLwhCynbMspSyWKnvi1AEg=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -120,6 +124,8 @@ go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjce
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY=
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
Expand All @@ -144,5 +150,13 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI=
modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/sqlite v1.46.1 h1:eFJ2ShBLIEnUWlLy12raN0Z1plqmFX9Qe3rjQTKt6sU=
modernc.org/sqlite v1.46.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA=
rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY=
rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs=
Loading