Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/store-artifacts.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { randomUUID } from 'node:crypto';

import type { Message, Part } from '@opencode-ai/sdk';
import { getLogger } from './logging.js';
import { runBinaryPreviewProviders } from './preview-providers.js';
import {
type CompiledPrivacyOptions,
Expand All @@ -23,6 +24,7 @@ import {
inferFileExtension,
inferUrlScheme,
parseJson,
parseJsonSafe,
sanitizeAutomaticRetrievalSourceText,
truncate,
} from './utils.js';
Expand Down Expand Up @@ -649,6 +651,18 @@ export function materializeArtifactRow(
): ArtifactData {
const blob = bindings.readArtifactBlobSync(row.content_hash);
const contentText = blob?.content_text ?? row.content_text;
const metadata =
parseJsonSafe<Record<string, unknown>>(row.metadata_json || '{}', (error, preview) => {
getLogger().warn('Corrupted artifact metadata ignored', {
operation: 'materializeArtifactRow',
sessionID: row.session_id,
messageID: row.message_id,
partID: row.part_id,
artifactID: row.artifact_id,
error: error.message,
preview,
});
}) ?? {};
return {
artifactID: row.artifact_id,
sessionID: row.session_id,
Expand All @@ -661,6 +675,6 @@ export function materializeArtifactRow(
contentHash: row.content_hash ?? hashContent(contentText),
charCount: blob?.char_count ?? row.char_count,
createdAt: row.created_at,
metadata: parseJson<Record<string, unknown>>(row.metadata_json || '{}'),
metadata,
};
}
115 changes: 92 additions & 23 deletions src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import {
hashContent,
isAutomaticRetrievalNoise,
parseJson,
parseJsonSafe,
sanitizeAutomaticRetrievalSourceText,
shortNodeID,
shouldSuppressLowSignalAutomaticRetrievalAnchor,
Expand Down Expand Up @@ -390,6 +391,73 @@ function filterValidConversationMessages(
return valid;
}

function parseStoredPart(
row: Pick<PartRow, 'session_id' | 'message_id' | 'part_id' | 'part_json'>,
operation: string,
): Part | undefined {
return parseJsonSafe<Part>(row.part_json, (error, preview) => {
logMalformedMessage(
'Skipping corrupted stored part',
{ operation, sessionID: row.session_id },
{
messageID: row.message_id,
partID: row.part_id,
error: error.message,
preview,
},
);
});
}

function parseStoredMessageInfo(
row: Pick<MessageRow, 'session_id' | 'message_id' | 'info_json'>,
operation: string,
): Message | undefined {
const info = parseJsonSafe<Message>(row.info_json, (error, preview) => {
logMalformedMessage(
'Skipping corrupted stored message',
{ operation, sessionID: row.session_id },
{
messageID: row.message_id,
error: error.message,
preview,
},
);
});
if (!info) return undefined;
if (!getValidMessageInfo(info)) {
logMalformedMessage(
'Skipping malformed stored message',
{
operation,
sessionID: row.session_id,
},
{ messageID: row.message_id },
);
return undefined;
}
return info;
}

function parseArtifactMetadata(
row: Pick<ArtifactRow, 'artifact_id' | 'session_id' | 'message_id' | 'part_id' | 'metadata_json'>,
operation: string,
): Record<string, unknown> {
return (
parseJsonSafe<Record<string, unknown>>(row.metadata_json || '{}', (error, preview) => {
getLogger().warn('Corrupted artifact metadata ignored', {
operation,
sessionID: row.session_id,
messageID: row.message_id,
partID: row.part_id,
artifactID: row.artifact_id,
error: error.message,
preview,
});
}) ?? {}
);
}

function isValidMessagePartUpdate(event: Event): boolean {
if (event.type !== 'message.part.updated') return false;
const part = asRecord(event.properties.part);
Expand Down Expand Up @@ -4860,7 +4928,7 @@ export class SqliteLcmStore {
contentHash: contentHash ?? hashContent(contentText),
charCount: blob?.char_count ?? row.char_count,
createdAt: row.created_at,
metadata: parseJson<Record<string, unknown>>(row.metadata_json || '{}'),
metadata: parseArtifactMetadata(row, 'readSessionsBatchSync'),
};
const list = artifactsByPart.get(artifact.partID) ?? [];
list.push(artifact);
Expand All @@ -4876,7 +4944,8 @@ export class SqliteLcmStore {
partsByMessage = new Map();
partsBySessionMessage.set(partRow.session_id, partsByMessage);
}
const part = parseJson<Part>(partRow.part_json);
const part = parseStoredPart(partRow, 'readSessionsBatchSync');
if (!part) continue;
const artifacts = artifactsByPart.get(part.id) ?? [];
hydratePartFromArtifacts(part, artifacts);
const parts = partsByMessage.get(partRow.message_id) ?? [];
Expand All @@ -4888,9 +4957,11 @@ export class SqliteLcmStore {
const messagesBySession = new Map<string, Array<{ info: Message; parts: Part[] }>>();
for (const messageRow of messageRows) {
const sessionParts = partsBySessionMessage.get(messageRow.session_id);
const info = parseStoredMessageInfo(messageRow, 'readSessionsBatchSync');
if (!info) continue;
const messages = messagesBySession.get(messageRow.session_id) ?? [];
messages.push({
info: parseJson<Message>(messageRow.info_json),
info,
parts: sessionParts?.get(messageRow.message_id) ?? [],
});
messagesBySession.set(messageRow.session_id, messages);
Expand Down Expand Up @@ -4937,18 +5008,25 @@ export class SqliteLcmStore {
const partsByMessage = new Map<string, Part[]>();
for (const partRow of partRows) {
const parts = partsByMessage.get(partRow.message_id) ?? [];
const part = parseJson<Part>(partRow.part_json);
const part = parseStoredPart(partRow, 'readSessionSync');
if (!part) continue;
const artifacts = artifactsByPart.get(part.id) ?? [];
hydratePartFromArtifacts(part, artifacts);
parts.push(part);
partsByMessage.set(partRow.message_id, parts);
}

const messages = filterValidConversationMessages(
messageRows.map((messageRow) => ({
info: parseJson<Message>(messageRow.info_json),
parts: partsByMessage.get(messageRow.message_id) ?? [],
})),
messageRows
.map((messageRow) => {
const info = parseStoredMessageInfo(messageRow, 'readSessionSync');
if (!info) return undefined;
return {
info,
parts: partsByMessage.get(messageRow.message_id) ?? [],
};
})
.filter((message): message is { info: Message; parts: Part[] } => Boolean(message)),
{ operation: 'readSessionSync', sessionID },
);

Expand Down Expand Up @@ -5053,25 +5131,16 @@ export class SqliteLcmStore {
)
.all(sessionID, messageID) as PartRow[];

const info = parseJson<Message>(row.info_json);
if (!getValidMessageInfo(info)) {
logMalformedMessage(
'Skipping malformed stored message',
{
operation: 'readMessageSync',
sessionID,
},
{ messageID },
);
return undefined;
}
const info = parseStoredMessageInfo(row, 'readMessageSync');
if (!info) return undefined;

return {
info,
parts: parts.map((partRow) => {
const part = parseJson<Part>(partRow.part_json);
parts: parts.flatMap((partRow) => {
const part = parseStoredPart(partRow, 'readMessageSync');
if (!part) return [];
if (hydrateArtifacts) hydratePartFromArtifacts(part, artifactsByPart.get(part.id) ?? []);
return part;
return [part];
}),
};
}
Expand Down
21 changes: 18 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,29 @@ export function shortNodeID(nodeID: string): string {
return nodeID.length <= 32 ? nodeID : `${nodeID.slice(0, 20)}...${nodeID.slice(-8)}`;
}

function jsonInputPreview(value: string): string {
return `${value.slice(0, 120)}${value.length > 120 ? '...' : ''}`;
}

export function parseJson<T>(value: string): T {
try {
return JSON.parse(value) as T;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
throw new Error(
`Failed to parse JSON: ${message}\nInput: ${value.slice(0, 120)}${value.length > 120 ? '...' : ''}`,
);
throw new Error(`Failed to parse JSON: ${message}\nInput: ${jsonInputPreview(value)}`);
}
}

export function parseJsonSafe<T>(
value: string,
onError?: (error: Error, preview: string) => void,
): T | undefined {
try {
return parseJson<T>(value);
} catch (error) {
const normalized = error instanceof Error ? error : new Error(String(error));
onError?.(normalized, jsonInputPreview(value));
return undefined;
}
}

Expand Down
110 changes: 110 additions & 0 deletions tests/store-transform.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,59 @@ test('resume and describe skip stored messages with malformed metadata', async (
}
});

test('resume and describe skip stored messages with corrupted info_json', async () => {
const workspace = makeWorkspace('lcm-resume-corrupted-info-json');
const dbPath = path.join(workspace, '.lcm', 'lcm.db');
let store;
let db;

try {
store = new SqliteLcmStore(
workspace,
makeOptions({ freshTailMessages: 1, minMessagesForTransform: 4 }),
);
await store.init();

await createSession(store, workspace, 's1', 1);
for (const [messageID, created, text] of [
['m1', 2, 'resume archived one'],
['m2', 3, 'resume archived two'],
['m3', 4, 'resume archived three'],
['m4', 5, 'resume fresh tail'],
]) {
await captureMessage(store, {
sessionID: 's1',
messageID,
created,
parts: [textPart('s1', messageID, `${messageID}-p`, text)],
});
}

db = new DatabaseSync(dbPath, {
enableForeignKeyConstraints: true,
timeout: 5000,
});
db.prepare('UPDATE messages SET info_json = ? WHERE session_id = ? AND message_id = ?').run(
'{"id":"m2","sessionID":"s1","role":"user","time":{"created":3}',
's1',
'm2',
);
db.close();
db = undefined;

const resumed = await store.resume('s1');
const described = await store.describe({ sessionID: 's1' });

assert.match(resumed, /resume archived one/);
assert.doesNotMatch(resumed, /resume archived two/);
assert.match(described, /Messages: 3/);
} finally {
db?.close();
store?.close();
await cleanupWorkspace(workspace);
}
});

test('grep scan fallback skips stored messages with malformed metadata', async () => {
const workspace = makeWorkspace('lcm-grep-malformed-info');
const dbPath = path.join(workspace, '.lcm', 'lcm.db');
Expand Down Expand Up @@ -1884,6 +1937,63 @@ test('grep scan fallback skips stored messages with malformed metadata', async (
}
});

test('grep scan survives a single corrupted part_json row', async () => {
const workspace = makeWorkspace('lcm-grep-corrupted-part-json');
const dbPath = path.join(workspace, '.lcm', 'lcm.db');
let store;
let db;

try {
store = new SqliteLcmStore(workspace, makeOptions());
await store.init();

await createSession(store, workspace, 's1', 1);
for (const [messageID, created, text] of [
['m1', 2, 'cosmic-ray keep alpha'],
['m2', 3, 'cosmic-ray corrupt beta'],
['m3', 4, 'cosmic-ray keep gamma'],
]) {
await captureMessage(store, {
sessionID: 's1',
messageID,
created,
parts: [textPart('s1', messageID, `${messageID}-p`, text)],
});
}

db = new DatabaseSync(dbPath, {
enableForeignKeyConstraints: true,
timeout: 5000,
});
db.prepare('UPDATE parts SET part_json = ? WHERE session_id = ? AND message_id = ?').run(
'{"id":"m2-p","type":"text","text":"cosmic-ray corrupt b',
's1',
'm2',
);
db.prepare('DELETE FROM message_fts').run();
db.prepare('DELETE FROM summary_fts').run();
db.prepare('DELETE FROM artifact_fts').run();
db.close();
db = undefined;

const results = await store.grep({ query: 'cosmic-ray', sessionID: 's1', limit: 10 });
const messageResults = results.filter((result) => result.id.startsWith('m'));

assert.deepEqual(
messageResults.map((result) => result.id),
['m1', 'm3'],
);
assert.equal(
results.some((result) => result.id === 'm2'),
false,
);
} finally {
db?.close();
store?.close();
await cleanupWorkspace(workspace);
}
});

test('capture ignores malformed message and part update events', async () => {
const workspace = makeWorkspace('lcm-capture-malformed-events');
let store;
Expand Down
Loading