-
Notifications
You must be signed in to change notification settings - Fork 0
Add tamper-proof hash chain to audit trail #6
Changes from all commits
d57506e
5436451
dfd3bd6
0b122a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| module github.com/ottojongerius/beacon | ||
|
|
||
| go 1.22 | ||
| go 1.26 | ||
|
|
||
| require ( | ||
| github.com/google/uuid v1.6.0 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,15 @@ | ||
| package audit | ||
|
|
||
| import ( | ||
| "crypto/sha256" | ||
| "database/sql" | ||
| "encoding/binary" | ||
| "encoding/hex" | ||
| "encoding/json" | ||
| "fmt" | ||
| "os" | ||
| "path/filepath" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -29,10 +33,14 @@ CREATE TABLE IF NOT EXISTS messages ( | |
| timestamp TIMESTAMP NOT NULL, | ||
| jsonrpc_id TEXT, | ||
| method TEXT, | ||
| raw TEXT NOT NULL | ||
| raw TEXT NOT NULL, | ||
| sequence INTEGER NOT NULL, | ||
| prev_hash TEXT NOT NULL DEFAULT '', | ||
| hash TEXT NOT NULL DEFAULT '' | ||
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp); | ||
| CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_sequence ON messages(sequence); | ||
|
|
||
| CREATE TABLE IF NOT EXISTS tool_calls ( | ||
| id TEXT PRIMARY KEY, | ||
|
|
@@ -76,10 +84,54 @@ CREATE TABLE IF NOT EXISTS intent_tool_calls ( | |
| CREATE INDEX IF NOT EXISTS idx_intent_tool_calls_intent ON intent_tool_calls(intent_id); | ||
| ` | ||
|
|
||
| // migrateHashChain adds sequence/prev_hash/hash columns to messages if they don't exist. | ||
| // This handles upgrading databases created before the hash chain feature. | ||
| func migrateHashChain(db *sql.DB) error { | ||
| var hasSequence bool | ||
| rows, err := db.Query("PRAGMA table_info(messages)") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer rows.Close() | ||
| for rows.Next() { | ||
| var cid int | ||
| var name, typ string | ||
| var notnull int | ||
| var dflt sql.NullString | ||
| var pk int | ||
| if err := rows.Scan(&cid, &name, &typ, ¬null, &dflt, &pk); err != nil { | ||
| return err | ||
| } | ||
| if name == "sequence" { | ||
| hasSequence = true | ||
| } | ||
| } | ||
| if hasSequence { | ||
| return nil // already migrated | ||
| } | ||
|
|
||
| migrations := []string{ | ||
| "ALTER TABLE messages ADD COLUMN sequence INTEGER NOT NULL DEFAULT 0", | ||
| "ALTER TABLE messages ADD COLUMN prev_hash TEXT NOT NULL DEFAULT ''", | ||
| "ALTER TABLE messages ADD COLUMN hash TEXT NOT NULL DEFAULT ''", | ||
| } | ||
| for _, m := range migrations { | ||
| if _, err := db.Exec(m); err != nil { | ||
| // Column may already exist from partial migration | ||
| if !strings.Contains(err.Error(), "duplicate column") { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| type Store struct { | ||
| db *sql.DB | ||
| mu sync.Mutex | ||
| enc *Encryptor | ||
| db *sql.DB | ||
| mu sync.Mutex | ||
| enc *Encryptor | ||
| lastHash string // hash chain: hash of the most recent message | ||
| sequence int64 // monotonic sequence number for messages | ||
| } | ||
|
|
||
| // Open creates or opens the SQLite database at the given path. | ||
|
|
@@ -106,6 +158,12 @@ func Open(dbPath string, encryptionKey ...string) (*Store, error) { | |
| return nil, fmt.Errorf("run schema migration: %w", err) | ||
| } | ||
|
|
||
| // Migrate existing DBs: add hash chain columns if missing | ||
| if err := migrateHashChain(db); err != nil { | ||
| db.Close() | ||
| return nil, fmt.Errorf("migrate hash chain columns: %w", err) | ||
| } | ||
|
|
||
| // Restrict DB file permissions — audit data contains raw MCP payloads. | ||
| // Done after schema migration so the file is guaranteed to exist. | ||
| if err := os.Chmod(dbPath, 0600); err != nil { | ||
|
|
@@ -123,7 +181,19 @@ func Open(dbPath string, encryptionKey ...string) (*Store, error) { | |
| } | ||
| } | ||
|
|
||
| return &Store{db: db, enc: enc}, nil | ||
| store := &Store{db: db, enc: enc} | ||
|
|
||
| // Resume hash chain from the most recent message (supports restarts) | ||
| var lastHash sql.NullString | ||
| var seq sql.NullInt64 | ||
| err = db.QueryRow("SELECT hash, sequence FROM messages ORDER BY sequence DESC LIMIT 1").Scan(&lastHash, &seq) | ||
| if err == nil { | ||
| store.lastHash = lastHash.String | ||
| store.sequence = seq.Int64 | ||
| } | ||
| // err == sql.ErrNoRows is fine — empty DB, chain starts fresh | ||
|
|
||
| return store, nil | ||
| } | ||
|
|
||
| // Close closes the database connection. | ||
|
|
@@ -163,6 +233,7 @@ const maxStoredMessageSize = 512 * 1024 // 512KB — truncate raw payloads beyon | |
|
|
||
| // LogMessage records a single JSON-RPC message and returns its ID. | ||
| // Raw payloads exceeding 512KB are truncated to limit DB growth from large responses. | ||
| // Each message is linked to the previous via a SHA-256 hash chain for tamper detection. | ||
| func (s *Store) LogMessage(sessionID, direction, jsonrpcID, method, raw string) (string, error) { | ||
| id := uuid.New().String() | ||
| stored := Redact(raw) | ||
|
|
@@ -173,16 +244,41 @@ func (s *Store) LogMessage(sessionID, direction, jsonrpcID, method, raw string) | |
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| s.sequence++ | ||
| prevHash := s.lastHash | ||
| ts := time.Now().UTC() | ||
| hash := computeHash(id, sessionID, direction, jsonrpcID, method, stored, s.sequence, prevHash) | ||
| s.lastHash = hash | ||
|
|
||
| _, err := s.db.Exec( | ||
| "INSERT INTO messages (id, session_id, direction, timestamp, jsonrpc_id, method, raw) VALUES (?, ?, ?, ?, ?, ?, ?)", | ||
| id, sessionID, direction, time.Now().UTC(), nullIfEmpty(jsonrpcID), nullIfEmpty(method), s.encrypt(stored), | ||
| "INSERT INTO messages (id, session_id, direction, timestamp, jsonrpc_id, method, raw, sequence, prev_hash, hash) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
| id, sessionID, direction, ts, nullIfEmpty(jsonrpcID), nullIfEmpty(method), s.encrypt(stored), | ||
| s.sequence, prevHash, hash, | ||
| ) | ||
|
Comment on lines
244
to
257
|
||
| if err != nil { | ||
| return "", err | ||
| } | ||
| return id, nil | ||
| } | ||
|
|
||
| // computeHash creates a SHA-256 hash of the message content chained with the previous hash. | ||
| // Uses length-prefixed encoding to prevent boundary-shifting attacks. | ||
| func computeHash(id, sessionID, direction, jsonrpcID, method, raw string, sequence int64, prevHash string) string { | ||
| h := sha256.New() | ||
| // Length-prefix each field to prevent ambiguous boundaries | ||
| for _, field := range []string{prevHash, id, sessionID, direction, jsonrpcID, method, raw} { | ||
| var lenBuf [8]byte | ||
| binary.BigEndian.PutUint64(lenBuf[:], uint64(len(field))) | ||
| h.Write(lenBuf[:]) | ||
| h.Write([]byte(field)) | ||
| } | ||
| // Include sequence number | ||
| var seqBuf [8]byte | ||
| binary.BigEndian.PutUint64(seqBuf[:], uint64(sequence)) | ||
| h.Write(seqBuf[:]) | ||
| return hex.EncodeToString(h.Sum(nil)) | ||
| } | ||
|
Comment on lines
+264
to
+280
|
||
|
|
||
| // ToolCallRecord holds the data for creating a tool call entry. | ||
| type ToolCallRecord struct { | ||
| ID string | ||
|
|
@@ -272,6 +368,80 @@ func (s *Store) AddToolCallToIntent(intentID, toolCallID string, sequenceOrder i | |
| return err | ||
| } | ||
|
|
||
| // ChainStatus holds the result of a hash chain verification. | ||
| type ChainStatus struct { | ||
| Total int // total messages checked | ||
| Valid bool // true if the entire chain is intact | ||
| BrokenAt int // sequence number where the chain broke (0 if valid) | ||
| Error string // description of the break | ||
| } | ||
|
|
||
| // VerifyChain walks the message hash chain and checks for tampering. | ||
| // It recomputes each hash from stored content and verifies linkage. | ||
| // If encryption is enabled, raw content is stored encrypted — the hash | ||
| // was computed on pre-encryption content, so this method decrypts before verifying. | ||
| func (s *Store) VerifyChain() (*ChainStatus, error) { | ||
| rows, err := s.db.Query( | ||
| "SELECT id, session_id, direction, jsonrpc_id, method, raw, sequence, prev_hash, hash FROM messages ORDER BY sequence ASC", | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("query messages: %w", err) | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| status := &ChainStatus{Valid: true} | ||
| expectedPrevHash := "" | ||
|
|
||
| for rows.Next() { | ||
| var id, sessionID, direction, raw, prevHash, hash string | ||
| var jsonrpcID, method sql.NullString | ||
| var seq int64 | ||
| if err := rows.Scan(&id, &sessionID, &direction, &jsonrpcID, &method, &raw, &seq, &prevHash, &hash); err != nil { | ||
| return nil, fmt.Errorf("scan row: %w", err) | ||
| } | ||
| status.Total++ | ||
|
|
||
| // Decrypt raw if encrypted | ||
| decrypted, err := s.decryptVerify(raw) | ||
| if err != nil { | ||
| status.Valid = false | ||
| status.BrokenAt = int(seq) | ||
| status.Error = fmt.Sprintf("sequence %d: decryption failed (wrong key or corrupted data)", seq) | ||
| return status, nil | ||
| } | ||
|
|
||
| // Check prev_hash linkage | ||
| if prevHash != expectedPrevHash { | ||
| status.Valid = false | ||
| status.BrokenAt = int(seq) | ||
| status.Error = fmt.Sprintf("sequence %d: prev_hash mismatch (expected %s, got %s)", seq, expectedPrevHash, prevHash) | ||
| return status, nil | ||
| } | ||
|
|
||
| // Recompute and verify hash | ||
| computed := computeHash(id, sessionID, direction, jsonrpcID.String, method.String, decrypted, seq, prevHash) | ||
| if hash != computed { | ||
| status.Valid = false | ||
| status.BrokenAt = int(seq) | ||
| status.Error = fmt.Sprintf("sequence %d: hash mismatch (record may have been modified)", seq) | ||
| return status, nil | ||
| } | ||
|
|
||
| expectedPrevHash = hash | ||
| } | ||
|
|
||
| return status, rows.Err() | ||
| } | ||
|
|
||
| // decryptVerify decrypts a string if encryption is enabled, returning an error on failure | ||
| // (unlike decrypt which silently falls back to ciphertext). | ||
| func (s *Store) decryptVerify(ciphertext string) (string, error) { | ||
| if s.enc == nil { | ||
| return ciphertext, nil | ||
| } | ||
| return s.enc.Decrypt(ciphertext) | ||
| } | ||
|
|
||
| // encrypt encrypts a string if encryption is enabled. Returns plaintext otherwise. | ||
| func (s *Store) encrypt(plaintext string) string { | ||
| if s.enc == nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new resume query assumes the
messagestable already hassequenceandhashcolumns, but the schema is applied viaCREATE TABLE IF NOT EXISTS, which does not add columns on upgrade. On an existing DB,SELECT hash, sequence ...will fail (e.g. "no such column"), andOpen()currently ignores that error—laterLogMessage()will fail on insert. Add an explicit migration step (e.g.,PRAGMA table_info(messages)+ALTER TABLE ... ADD COLUMN), and decide how to backfill/handle pre-chain rows so upgraded databases remain usable.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed — added migrateHashChain() that checks PRAGMA table_info(messages) and runs ALTER TABLE ADD COLUMN for missing columns. Pre-chain rows will have empty hashes (sequence=0, hash='') which is acceptable — the chain starts from the first post-migration message.