From e1d916a3340a679053778190832f38f1f745e3e9 Mon Sep 17 00:00:00 2001 From: Plutarch01 <51755350+Plutarch01@users.noreply.github.com> Date: Sun, 12 Apr 2026 13:59:00 +0200 Subject: [PATCH] fix: skip corrupted stored JSON rows during reads Prevent a single malformed info_json, part_json, or metadata_json row from taking down grep, resume, and session reads. --- src/store-artifacts.ts | 16 ++++- src/store.ts | 115 ++++++++++++++++++++++++++------- src/utils.ts | 21 +++++- tests/store-transform.test.mjs | 110 +++++++++++++++++++++++++++++++ 4 files changed, 235 insertions(+), 27 deletions(-) diff --git a/src/store-artifacts.ts b/src/store-artifacts.ts index b57abbb..10c0e1f 100644 --- a/src/store-artifacts.ts +++ b/src/store-artifacts.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'node:crypto'; import type { Message, Part } from '@opencode-ai/sdk'; +import { getLogger } from './logging.js'; import { runBinaryPreviewProviders } from './preview-providers.js'; import { type CompiledPrivacyOptions, @@ -23,6 +24,7 @@ import { inferFileExtension, inferUrlScheme, parseJson, + parseJsonSafe, sanitizeAutomaticRetrievalSourceText, truncate, } from './utils.js'; @@ -649,6 +651,18 @@ export function materializeArtifactRow( ): ArtifactData { const blob = bindings.readArtifactBlobSync(row.content_hash); const contentText = blob?.content_text ?? row.content_text; + const metadata = + parseJsonSafe>(row.metadata_json || '{}', (error, preview) => { + getLogger().warn('Corrupted artifact metadata ignored', { + operation: 'materializeArtifactRow', + sessionID: row.session_id, + messageID: row.message_id, + partID: row.part_id, + artifactID: row.artifact_id, + error: error.message, + preview, + }); + }) ?? {}; return { artifactID: row.artifact_id, sessionID: row.session_id, @@ -661,6 +675,6 @@ export function materializeArtifactRow( contentHash: row.content_hash ?? hashContent(contentText), charCount: blob?.char_count ?? row.char_count, createdAt: row.created_at, - metadata: parseJson>(row.metadata_json || '{}'), + metadata, }; } diff --git a/src/store.ts b/src/store.ts index 8a9a859..be1448b 100644 --- a/src/store.ts +++ b/src/store.ts @@ -83,6 +83,7 @@ import { hashContent, isAutomaticRetrievalNoise, parseJson, + parseJsonSafe, sanitizeAutomaticRetrievalSourceText, shortNodeID, shouldSuppressLowSignalAutomaticRetrievalAnchor, @@ -390,6 +391,73 @@ function filterValidConversationMessages( return valid; } +function parseStoredPart( + row: Pick, + operation: string, +): Part | undefined { + return parseJsonSafe(row.part_json, (error, preview) => { + logMalformedMessage( + 'Skipping corrupted stored part', + { operation, sessionID: row.session_id }, + { + messageID: row.message_id, + partID: row.part_id, + error: error.message, + preview, + }, + ); + }); +} + +function parseStoredMessageInfo( + row: Pick, + operation: string, +): Message | undefined { + const info = parseJsonSafe(row.info_json, (error, preview) => { + logMalformedMessage( + 'Skipping corrupted stored message', + { operation, sessionID: row.session_id }, + { + messageID: row.message_id, + error: error.message, + preview, + }, + ); + }); + if (!info) return undefined; + if (!getValidMessageInfo(info)) { + logMalformedMessage( + 'Skipping malformed stored message', + { + operation, + sessionID: row.session_id, + }, + { messageID: row.message_id }, + ); + return undefined; + } + return info; +} + +function parseArtifactMetadata( + row: Pick, + operation: string, +): Record { + return ( + parseJsonSafe>(row.metadata_json || '{}', (error, preview) => { + getLogger().warn('Corrupted artifact metadata ignored', { + operation, + sessionID: row.session_id, + messageID: row.message_id, + partID: row.part_id, + artifactID: row.artifact_id, + error: error.message, + preview, + }); + }) ?? {} + ); +} + function isValidMessagePartUpdate(event: Event): boolean { if (event.type !== 'message.part.updated') return false; const part = asRecord(event.properties.part); @@ -4860,7 +4928,7 @@ export class SqliteLcmStore { contentHash: contentHash ?? hashContent(contentText), charCount: blob?.char_count ?? row.char_count, createdAt: row.created_at, - metadata: parseJson>(row.metadata_json || '{}'), + metadata: parseArtifactMetadata(row, 'readSessionsBatchSync'), }; const list = artifactsByPart.get(artifact.partID) ?? []; list.push(artifact); @@ -4876,7 +4944,8 @@ export class SqliteLcmStore { partsByMessage = new Map(); partsBySessionMessage.set(partRow.session_id, partsByMessage); } - const part = parseJson(partRow.part_json); + const part = parseStoredPart(partRow, 'readSessionsBatchSync'); + if (!part) continue; const artifacts = artifactsByPart.get(part.id) ?? []; hydratePartFromArtifacts(part, artifacts); const parts = partsByMessage.get(partRow.message_id) ?? []; @@ -4888,9 +4957,11 @@ export class SqliteLcmStore { const messagesBySession = new Map>(); for (const messageRow of messageRows) { const sessionParts = partsBySessionMessage.get(messageRow.session_id); + const info = parseStoredMessageInfo(messageRow, 'readSessionsBatchSync'); + if (!info) continue; const messages = messagesBySession.get(messageRow.session_id) ?? []; messages.push({ - info: parseJson(messageRow.info_json), + info, parts: sessionParts?.get(messageRow.message_id) ?? [], }); messagesBySession.set(messageRow.session_id, messages); @@ -4937,7 +5008,8 @@ export class SqliteLcmStore { const partsByMessage = new Map(); for (const partRow of partRows) { const parts = partsByMessage.get(partRow.message_id) ?? []; - const part = parseJson(partRow.part_json); + const part = parseStoredPart(partRow, 'readSessionSync'); + if (!part) continue; const artifacts = artifactsByPart.get(part.id) ?? []; hydratePartFromArtifacts(part, artifacts); parts.push(part); @@ -4945,10 +5017,16 @@ export class SqliteLcmStore { } const messages = filterValidConversationMessages( - messageRows.map((messageRow) => ({ - info: parseJson(messageRow.info_json), - parts: partsByMessage.get(messageRow.message_id) ?? [], - })), + messageRows + .map((messageRow) => { + const info = parseStoredMessageInfo(messageRow, 'readSessionSync'); + if (!info) return undefined; + return { + info, + parts: partsByMessage.get(messageRow.message_id) ?? [], + }; + }) + .filter((message): message is { info: Message; parts: Part[] } => Boolean(message)), { operation: 'readSessionSync', sessionID }, ); @@ -5053,25 +5131,16 @@ export class SqliteLcmStore { ) .all(sessionID, messageID) as PartRow[]; - const info = parseJson(row.info_json); - if (!getValidMessageInfo(info)) { - logMalformedMessage( - 'Skipping malformed stored message', - { - operation: 'readMessageSync', - sessionID, - }, - { messageID }, - ); - return undefined; - } + const info = parseStoredMessageInfo(row, 'readMessageSync'); + if (!info) return undefined; return { info, - parts: parts.map((partRow) => { - const part = parseJson(partRow.part_json); + parts: parts.flatMap((partRow) => { + const part = parseStoredPart(partRow, 'readMessageSync'); + if (!part) return []; if (hydrateArtifacts) hydratePartFromArtifacts(part, artifactsByPart.get(part.id) ?? []); - return part; + return [part]; }), }; } diff --git a/src/utils.ts b/src/utils.ts index 5da1b12..8050f2f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -33,14 +33,29 @@ export function shortNodeID(nodeID: string): string { return nodeID.length <= 32 ? nodeID : `${nodeID.slice(0, 20)}...${nodeID.slice(-8)}`; } +function jsonInputPreview(value: string): string { + return `${value.slice(0, 120)}${value.length > 120 ? '...' : ''}`; +} + export function parseJson(value: string): T { try { return JSON.parse(value) as T; } catch (error) { const message = error instanceof Error ? error.message : String(error); - throw new Error( - `Failed to parse JSON: ${message}\nInput: ${value.slice(0, 120)}${value.length > 120 ? '...' : ''}`, - ); + throw new Error(`Failed to parse JSON: ${message}\nInput: ${jsonInputPreview(value)}`); + } +} + +export function parseJsonSafe( + value: string, + onError?: (error: Error, preview: string) => void, +): T | undefined { + try { + return parseJson(value); + } catch (error) { + const normalized = error instanceof Error ? error : new Error(String(error)); + onError?.(normalized, jsonInputPreview(value)); + return undefined; } } diff --git a/tests/store-transform.test.mjs b/tests/store-transform.test.mjs index 055d6da..d197662 100644 --- a/tests/store-transform.test.mjs +++ b/tests/store-transform.test.mjs @@ -1836,6 +1836,59 @@ test('resume and describe skip stored messages with malformed metadata', async ( } }); +test('resume and describe skip stored messages with corrupted info_json', async () => { + const workspace = makeWorkspace('lcm-resume-corrupted-info-json'); + const dbPath = path.join(workspace, '.lcm', 'lcm.db'); + let store; + let db; + + try { + store = new SqliteLcmStore( + workspace, + makeOptions({ freshTailMessages: 1, minMessagesForTransform: 4 }), + ); + await store.init(); + + await createSession(store, workspace, 's1', 1); + for (const [messageID, created, text] of [ + ['m1', 2, 'resume archived one'], + ['m2', 3, 'resume archived two'], + ['m3', 4, 'resume archived three'], + ['m4', 5, 'resume fresh tail'], + ]) { + await captureMessage(store, { + sessionID: 's1', + messageID, + created, + parts: [textPart('s1', messageID, `${messageID}-p`, text)], + }); + } + + db = new DatabaseSync(dbPath, { + enableForeignKeyConstraints: true, + timeout: 5000, + }); + db.prepare('UPDATE messages SET info_json = ? WHERE session_id = ? AND message_id = ?').run( + '{"id":"m2","sessionID":"s1","role":"user","time":{"created":3}', + 's1', + 'm2', + ); + db.close(); + db = undefined; + + const resumed = await store.resume('s1'); + const described = await store.describe({ sessionID: 's1' }); + + assert.match(resumed, /resume archived one/); + assert.doesNotMatch(resumed, /resume archived two/); + assert.match(described, /Messages: 3/); + } finally { + db?.close(); + store?.close(); + await cleanupWorkspace(workspace); + } +}); + test('grep scan fallback skips stored messages with malformed metadata', async () => { const workspace = makeWorkspace('lcm-grep-malformed-info'); const dbPath = path.join(workspace, '.lcm', 'lcm.db'); @@ -1884,6 +1937,63 @@ test('grep scan fallback skips stored messages with malformed metadata', async ( } }); +test('grep scan survives a single corrupted part_json row', async () => { + const workspace = makeWorkspace('lcm-grep-corrupted-part-json'); + const dbPath = path.join(workspace, '.lcm', 'lcm.db'); + let store; + let db; + + try { + store = new SqliteLcmStore(workspace, makeOptions()); + await store.init(); + + await createSession(store, workspace, 's1', 1); + for (const [messageID, created, text] of [ + ['m1', 2, 'cosmic-ray keep alpha'], + ['m2', 3, 'cosmic-ray corrupt beta'], + ['m3', 4, 'cosmic-ray keep gamma'], + ]) { + await captureMessage(store, { + sessionID: 's1', + messageID, + created, + parts: [textPart('s1', messageID, `${messageID}-p`, text)], + }); + } + + db = new DatabaseSync(dbPath, { + enableForeignKeyConstraints: true, + timeout: 5000, + }); + db.prepare('UPDATE parts SET part_json = ? WHERE session_id = ? AND message_id = ?').run( + '{"id":"m2-p","type":"text","text":"cosmic-ray corrupt b', + 's1', + 'm2', + ); + db.prepare('DELETE FROM message_fts').run(); + db.prepare('DELETE FROM summary_fts').run(); + db.prepare('DELETE FROM artifact_fts').run(); + db.close(); + db = undefined; + + const results = await store.grep({ query: 'cosmic-ray', sessionID: 's1', limit: 10 }); + const messageResults = results.filter((result) => result.id.startsWith('m')); + + assert.deepEqual( + messageResults.map((result) => result.id), + ['m1', 'm3'], + ); + assert.equal( + results.some((result) => result.id === 'm2'), + false, + ); + } finally { + db?.close(); + store?.close(); + await cleanupWorkspace(workspace); + } +}); + test('capture ignores malformed message and part update events', async () => { const workspace = makeWorkspace('lcm-capture-malformed-events'); let store;