Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ export {
export type { CclPublishedEvaluationRecord, CclPublishedResultEntry } from './dkg-agent.js';
export { monotonicTransition, versionedWrite, type MonotonicStages } from './workspace-consistency.js';
export { StaleWriteError, type CASCondition } from '@origintrail-official/dkg-publisher';
export * from './source-worker.js';
export * from './source-registry.js';
33 changes: 33 additions & 0 deletions packages/agent/src/source-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import type { SourceKindHandler } from './source-worker.js';

export interface SourceRegistry<TSource extends { kind: string }, TAsset = unknown> {
register(kind: string, handler: SourceKindHandler<TSource, TAsset>): void;
resolve(source: TSource): SourceKindHandler<TSource, TAsset>;
has(kind: string): boolean;
listKinds(): string[];
}

export function createSourceRegistry<TSource extends { kind: string }, TAsset = unknown>(
seed: Record<string, SourceKindHandler<TSource, TAsset>> = {},
): SourceRegistry<TSource, TAsset> {
const handlers = new Map<string, SourceKindHandler<TSource, TAsset>>(Object.entries(seed));

return {
register(kind: string, handler: SourceKindHandler<TSource, TAsset>) {
handlers.set(kind, handler);
},
resolve(source: TSource): SourceKindHandler<TSource, TAsset> {
const handler = handlers.get(source.kind);
if (!handler) {
throw new Error(`Unsupported source kind: ${source.kind}`);
}
return handler;
},
has(kind: string): boolean {
return handlers.has(kind);
},
listKinds(): string[] {
return [...handlers.keys()].sort();
},
};
}
165 changes: 165 additions & 0 deletions packages/agent/src/source-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { writeFile, readFile, mkdir } from 'node:fs/promises';
import { dirname } from 'node:path';

export interface SourceWorkerJobState {
fingerprint?: string;
lastRunAt?: string;
lastJobIds?: string[];
lastJobStatuses?: Record<string, string>;
lastStatus?: string;
lastError?: string;
attemptCount?: number;
manualReviewRequired?: boolean;
manualReviewReason?: string;
}

export interface SourceWorkerState {
sources: Record<string, SourceWorkerJobState>;
}

export interface SourceWorkerSource {
id: string;
maxRetries?: number;
}

export interface SourcePreparationResult<TAsset = unknown> {
fingerprint: string;
assets: TAsset[];
warnings?: string[];
}

export interface SourceKindHandler<TSource = SourceWorkerSource, TAsset = unknown> {
computeFingerprint(source: TSource): Promise<string>;
prepare(source: TSource): Promise<SourcePreparationResult<TAsset>>;
}

export interface SourceWorkerResult {
sourceId: string;
skipped: boolean;
reason?: string;
jobIds?: string[];
jobStatuses?: Record<string, string>;
status?: string;
nextState: SourceWorkerJobState;
}

export interface SourceWorkerDeps<TSource extends SourceWorkerSource> {
now(): string;
getFingerprint(source: TSource): Promise<string>;
processSource(source: TSource, fingerprint: string, state: SourceWorkerJobState | undefined): Promise<SourceWorkerResult>;
getJobStatus(jobId: string): Promise<string>;
}

export async function loadSourceWorkerState(path: string): Promise<SourceWorkerState> {
try {
const raw = await readFile(path, 'utf8');
const parsed = JSON.parse(raw) as SourceWorkerState;
return { sources: parsed.sources ?? {} };
} catch (error: any) {
if (error?.code === 'ENOENT') return { sources: {} };
throw error;
}
}

export async function saveSourceWorkerState(path: string, state: SourceWorkerState): Promise<void> {
await mkdir(dirname(path), { recursive: true });
await writeFile(path, JSON.stringify(state, null, 2) + '\n', 'utf8');
}

export async function runSourceWorkerOnce<TSource extends SourceWorkerSource>(
sources: readonly TSource[],
statePath: string,
deps: SourceWorkerDeps<TSource>,
): Promise<SourceWorkerState> {
const state = await loadSourceWorkerState(statePath);
const nextState: SourceWorkerState = { sources: { ...state.sources } };

for (const source of sources) {
const current = state.sources[source.id];
const fingerprint = await deps.getFingerprint(source);
const statuses = current?.lastJobIds?.length
? Object.fromEntries(await Promise.all(current.lastJobIds.map(async (jobId) => [jobId, await deps.getJobStatus(jobId)] as const)))
: {};
const aggregate = aggregateStatuses(statuses);

if (current?.fingerprint === fingerprint && current.manualReviewRequired) {
nextState.sources[source.id] = {
...current,
lastRunAt: deps.now(),
lastJobStatuses: statuses,
lastStatus: 'manual-review-required',
};
continue;
}

if (current?.fingerprint === fingerprint && isActiveStatus(aggregate)) {
nextState.sources[source.id] = {
...current,
lastRunAt: deps.now(),
lastJobStatuses: statuses,
lastStatus: aggregate,
};
continue;
}

if (current?.fingerprint === fingerprint && isSuccessStatus(aggregate)) {
nextState.sources[source.id] = {
...current,
lastRunAt: deps.now(),
lastJobStatuses: statuses,
lastStatus: aggregate,
};
continue;
}

const nextAttemptCount = current?.fingerprint === fingerprint ? (current.attemptCount ?? 0) + 1 : 1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: no-matching-rows is treated as a success state, but this loop only derives success from lastJobIds. If a handler returns { lastStatus: 'no-matching-rows' } with no jobs, the next poll sees aggregate === '', falls through here, and keeps retrying until the source is incorrectly forced into manual review. Short-circuit same-fingerprint no-matching-rows before incrementing retries (or persist a synthetic terminal status into the aggregate calculation).

const maxRetries = source.maxRetries ?? 3;
if (current?.fingerprint === fingerprint && nextAttemptCount > maxRetries) {
nextState.sources[source.id] = {
...current,
lastRunAt: deps.now(),
lastJobStatuses: statuses,
lastStatus: 'manual-review-required',
lastError: `max retries exceeded (${maxRetries})`,
attemptCount: nextAttemptCount,
manualReviewRequired: true,
manualReviewReason: `max retries exceeded (${maxRetries})`,
};
continue;
}

try {
const result = await deps.processSource(source, fingerprint, current);
nextState.sources[source.id] = result.nextState;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: processSource() gets the computed fingerprint, but the worker persists result.nextState verbatim. If a handler omits that field from nextState (which is easy with the current API), the next poll sees current.fingerprint as missing and reprocesses the same source again, potentially creating duplicate share/publish jobs forever. Merge the framework-owned fields here (fingerprint, lastRunAt, and any retry/manual-review resets) instead of requiring every handler to remember them.

} catch (error: any) {
nextState.sources[source.id] = {
...current,
fingerprint,
lastRunAt: deps.now(),
lastStatus: 'failed',
lastError: error?.message ?? String(error),
attemptCount: nextAttemptCount,
};
}
}

await saveSourceWorkerState(statePath, nextState);
return nextState;
}

function aggregateStatuses(statuses: Record<string, string>): string {
const values = Object.values(statuses);
if (values.length === 0) return '';
if (values.every((status) => status === 'finalized' || status === 'completed')) return 'finalized';
if (values.some((status) => status === 'failed' || status === 'error')) return 'failed';
if (values.some((status) => isActiveStatus(status))) return 'in-flight';
return values[0] ?? '';
}

function isSuccessStatus(status: string | undefined): boolean {
return status === 'completed' || status === 'finalized' || status === 'no-matching-rows';
}

function isActiveStatus(status: string | undefined): boolean {
return status === 'accepted' || status === 'claimed' || status === 'validated' || status === 'broadcast' || status === 'included' || status === 'queued' || status === 'in-flight';
}
22 changes: 22 additions & 0 deletions packages/agent/test/source-registry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { describe, expect, it } from 'vitest';
import { createSourceRegistry } from '../src/source-registry.js';

describe('source registry', () => {
it('registers and resolves handlers by source kind', async () => {
const registry = createSourceRegistry<{ kind: string }, string>();
registry.register('demo', {
async computeFingerprint() { return 'fp'; },
async prepare() { return { fingerprint: 'fp', assets: ['a'] }; },
});

expect(registry.has('demo')).toBe(true);
expect(registry.listKinds()).toEqual(['demo']);
const handler = registry.resolve({ kind: 'demo' });
await expect(handler.computeFingerprint({ kind: 'demo' })).resolves.toBe('fp');
});

it('throws for unsupported source kinds', () => {
const registry = createSourceRegistry<{ kind: string }, string>();
expect(() => registry.resolve({ kind: 'missing' })).toThrow(/Unsupported source kind/);
});
});
37 changes: 37 additions & 0 deletions packages/agent/test/source-worker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { afterEach, describe, expect, it, vi } from 'vitest';
import { runSourceWorkerOnce } from '../src/source-worker.js';

const cleanup: string[] = [];
afterEach(async () => {
await Promise.all(cleanup.splice(0).map((path) => rm(path, { recursive: true, force: true })));
});

describe('source worker runtime', () => {
it('skips unchanged finalized jobs and persists reconciled state', async () => {
const dir = await mkdtemp(join(tmpdir(), 'source-worker-'));
cleanup.push(dir);
const statePath = join(dir, 'state.json');

const deps = {
now: () => '2026-04-28T00:00:00.000Z',
getFingerprint: vi.fn(async () => 'fp-1'),
getJobStatus: vi.fn(async () => 'finalized'),
processSource: vi.fn(async () => ({
sourceId: 'src-1',
skipped: false,
fingerprint: 'fp-1',
status: 'queued',
nextState: { fingerprint: 'fp-1', lastStatus: 'queued', lastJobIds: ['job-1'] },
})),
};

await runSourceWorkerOnce([{ id: 'src-1', maxRetries: 3 }], statePath, deps);
const second = await runSourceWorkerOnce([{ id: 'src-1', maxRetries: 3 }], statePath, deps);

expect(deps.processSource).toHaveBeenCalledTimes(1);
expect(second.sources['src-1']?.lastStatus).toBe('finalized');
});
});
27 changes: 26 additions & 1 deletion packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
} from './config.js';
import { ApiClient } from './api-client.js';
import { parsePositiveMsOption } from './publisher-runner.js';
import { runConfiguredSourceWorker } from './source-worker-runner.js';

function isDaemonUnreachable(err: unknown): boolean {
const msg = err instanceof Error ? err.message : String(err);
Expand Down Expand Up @@ -2153,6 +2154,26 @@ sharedMemoryCmd
}
});

// ─── dkg source-worker ────────────────────────────────────────────────

const sourceWorkerCmd = program
.command('source-worker')
.description('Run generic source workers against the DKG daemon');

sourceWorkerCmd
.command('run')
.description('Run a source worker from a JSON config file')
.requiredOption('--config <path>', 'Worker config JSON file')
.option('--once', 'Run a single iteration and exit')
.action(async (opts: ActionOpts) => {
try {
await runConfiguredSourceWorker(String(opts.config), { once: opts.once === true });
} catch (err) {
console.error(toErrorMessage(err));
process.exit(1);
}
});

// ─── dkg publisher ────────────────────────────────────────────────────

const publisherCmd = program
Expand Down Expand Up @@ -2576,6 +2597,7 @@ program
const chainResolved = resolveChainConfig(config, network);
const rpcUrl = chainResolved?.rpcUrl;
const hubAddress = chainResolved?.hubAddress;
const tokenAddress = config.chain?.tokenAddress ?? network?.chain?.tokenAddress;
const chainId = chainResolved?.chainId ?? '(unknown)';

let provider: ethers.JsonRpcProvider | null = null;
Expand All @@ -2585,7 +2607,10 @@ program
if (rpcUrl) {
try {
provider = new ethers.JsonRpcProvider(rpcUrl);
if (hubAddress) {
if (tokenAddress && tokenAddress !== ethers.ZeroAddress) {
token = new ethers.Contract(tokenAddress, ['function balanceOf(address) view returns (uint256)', 'function symbol() view returns (string)'], provider);
tokenSymbol = await token.symbol().catch(() => 'TRAC');
} else if (hubAddress) {
const hub = new ethers.Contract(hubAddress, ['function getContractAddress(string) view returns (address)'], provider);
const tokenAddr = await hub.getContractAddress('Token');
if (tokenAddr !== ethers.ZeroAddress) {
Expand Down
3 changes: 3 additions & 0 deletions packages/cli/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export interface NetworkConfig {
type: 'evm';
rpcUrl: string;
hubAddress: string;
tokenAddress?: string;
chainId: string;
};
faucet?: {
Expand All @@ -100,6 +101,8 @@ export interface ChainConfig {
rpcUrl: string;
/** Hub contract address */
hubAddress: string;
/** Optional token contract address override. When omitted, resolve from Hub.Token. */
tokenAddress?: string;
/** Chain identifier (e.g., 'base:84532') */
chainId?: string;
/**
Expand Down
Loading
Loading