From e0642641f7f05583de19474b5053f1705617b4df Mon Sep 17 00:00:00 2001 From: Viktor Pelle Date: Wed, 29 Apr 2026 16:05:45 +0200 Subject: [PATCH 1/2] Add generic query-catalog and source-worker publishing flow --- packages/agent/src/index.ts | 2 + packages/agent/src/source-registry.ts | 33 +++ packages/agent/src/source-worker.ts | 165 +++++++++++++++ packages/agent/test/source-registry.test.ts | 22 ++ packages/agent/test/source-worker.test.ts | 37 ++++ packages/cli/src/cli.ts | 27 ++- packages/cli/src/config.ts | 3 + packages/cli/src/daemon/routes/status.ts | 12 +- packages/cli/src/source-worker-config.ts | 31 +++ .../cli/src/source-worker-daemon-client.ts | 73 +++++++ packages/cli/src/source-worker-runner.ts | 137 +++++++++++++ .../test/source-worker-daemon-client.test.ts | 43 ++++ packages/core/src/index.ts | 1 + packages/core/src/transducers.ts | 192 ++++++++++++++++++ packages/core/test/transducers.test.ts | 54 +++++ .../node-ui/src/ui/hooks/useProjectProfile.ts | 183 ++++++++++++++--- .../src/ui/views/project/components.tsx | 52 +++-- .../node-ui/src/ui/views/project/helpers.ts | 3 +- .../node-ui/test/use-project-profile.test.ts | 91 +++++++++ .../src/async-lift-publisher-impl.ts | 22 +- packages/publisher/src/index.ts | 1 + packages/publisher/src/share-batching.ts | 85 ++++++++ .../test/fencing-and-kc-anchor-extra.test.ts | 26 +-- .../publisher/test/share-batching.test.ts | 39 ++++ scripts/devnet.sh | 7 +- scripts/import-profile.mjs | 58 +++++- scripts/lib/ontology.mjs | 4 + 27 files changed, 1326 insertions(+), 77 deletions(-) create mode 100644 packages/agent/src/source-registry.ts create mode 100644 packages/agent/src/source-worker.ts create mode 100644 packages/agent/test/source-registry.test.ts create mode 100644 packages/agent/test/source-worker.test.ts create mode 100644 packages/cli/src/source-worker-config.ts create mode 100644 packages/cli/src/source-worker-daemon-client.ts create mode 100644 packages/cli/src/source-worker-runner.ts create mode 100644 packages/cli/test/source-worker-daemon-client.test.ts create mode 100644 packages/core/src/transducers.ts create mode 100644 packages/core/test/transducers.test.ts create mode 100644 packages/node-ui/test/use-project-profile.test.ts create mode 100644 packages/publisher/src/share-batching.ts create mode 100644 packages/publisher/test/share-batching.test.ts diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 91155c5f1..9f9c6d2ca 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -67,3 +67,5 @@ export { export type { CclPublishedEvaluationRecord, CclPublishedResultEntry } from './dkg-agent.js'; export { monotonicTransition, versionedWrite, type MonotonicStages } from './workspace-consistency.js'; export { StaleWriteError, type CASCondition } from '@origintrail-official/dkg-publisher'; +export * from './source-worker.js'; +export * from './source-registry.js'; diff --git a/packages/agent/src/source-registry.ts b/packages/agent/src/source-registry.ts new file mode 100644 index 000000000..81203a041 --- /dev/null +++ b/packages/agent/src/source-registry.ts @@ -0,0 +1,33 @@ +import type { SourceKindHandler } from './source-worker.js'; + +export interface SourceRegistry { + register(kind: string, handler: SourceKindHandler): void; + resolve(source: TSource): SourceKindHandler; + has(kind: string): boolean; + listKinds(): string[]; +} + +export function createSourceRegistry( + seed: Record> = {}, +): SourceRegistry { + const handlers = new Map>(Object.entries(seed)); + + return { + register(kind: string, handler: SourceKindHandler) { + handlers.set(kind, handler); + }, + resolve(source: TSource): SourceKindHandler { + const handler = handlers.get(source.kind); + if (!handler) { + throw new Error(`Unsupported source kind: ${source.kind}`); + } + return handler; + }, + has(kind: string): boolean { + return handlers.has(kind); + }, + listKinds(): string[] { + return [...handlers.keys()].sort(); + }, + }; +} diff --git a/packages/agent/src/source-worker.ts b/packages/agent/src/source-worker.ts new file mode 100644 index 000000000..2311786d3 --- /dev/null +++ b/packages/agent/src/source-worker.ts @@ -0,0 +1,165 @@ +import { writeFile, readFile, mkdir } from 'node:fs/promises'; +import { dirname } from 'node:path'; + +export interface SourceWorkerJobState { + fingerprint?: string; + lastRunAt?: string; + lastJobIds?: string[]; + lastJobStatuses?: Record; + lastStatus?: string; + lastError?: string; + attemptCount?: number; + manualReviewRequired?: boolean; + manualReviewReason?: string; +} + +export interface SourceWorkerState { + sources: Record; +} + +export interface SourceWorkerSource { + id: string; + maxRetries?: number; +} + +export interface SourcePreparationResult { + fingerprint: string; + assets: TAsset[]; + warnings?: string[]; +} + +export interface SourceKindHandler { + computeFingerprint(source: TSource): Promise; + prepare(source: TSource): Promise>; +} + +export interface SourceWorkerResult { + sourceId: string; + skipped: boolean; + reason?: string; + jobIds?: string[]; + jobStatuses?: Record; + status?: string; + nextState: SourceWorkerJobState; +} + +export interface SourceWorkerDeps { + now(): string; + getFingerprint(source: TSource): Promise; + processSource(source: TSource, fingerprint: string, state: SourceWorkerJobState | undefined): Promise; + getJobStatus(jobId: string): Promise; +} + +export async function loadSourceWorkerState(path: string): Promise { + try { + const raw = await readFile(path, 'utf8'); + const parsed = JSON.parse(raw) as SourceWorkerState; + return { sources: parsed.sources ?? {} }; + } catch (error: any) { + if (error?.code === 'ENOENT') return { sources: {} }; + throw error; + } +} + +export async function saveSourceWorkerState(path: string, state: SourceWorkerState): Promise { + await mkdir(dirname(path), { recursive: true }); + await writeFile(path, JSON.stringify(state, null, 2) + '\n', 'utf8'); +} + +export async function runSourceWorkerOnce( + sources: readonly TSource[], + statePath: string, + deps: SourceWorkerDeps, +): Promise { + const state = await loadSourceWorkerState(statePath); + const nextState: SourceWorkerState = { sources: { ...state.sources } }; + + for (const source of sources) { + const current = state.sources[source.id]; + const fingerprint = await deps.getFingerprint(source); + const statuses = current?.lastJobIds?.length + ? Object.fromEntries(await Promise.all(current.lastJobIds.map(async (jobId) => [jobId, await deps.getJobStatus(jobId)] as const))) + : {}; + const aggregate = aggregateStatuses(statuses); + + if (current?.fingerprint === fingerprint && current.manualReviewRequired) { + nextState.sources[source.id] = { + ...current, + lastRunAt: deps.now(), + lastJobStatuses: statuses, + lastStatus: 'manual-review-required', + }; + continue; + } + + if (current?.fingerprint === fingerprint && isActiveStatus(aggregate)) { + nextState.sources[source.id] = { + ...current, + lastRunAt: deps.now(), + lastJobStatuses: statuses, + lastStatus: aggregate, + }; + continue; + } + + if (current?.fingerprint === fingerprint && isSuccessStatus(aggregate)) { + nextState.sources[source.id] = { + ...current, + lastRunAt: deps.now(), + lastJobStatuses: statuses, + lastStatus: aggregate, + }; + continue; + } + + const nextAttemptCount = current?.fingerprint === fingerprint ? (current.attemptCount ?? 0) + 1 : 1; + const maxRetries = source.maxRetries ?? 3; + if (current?.fingerprint === fingerprint && nextAttemptCount > maxRetries) { + nextState.sources[source.id] = { + ...current, + lastRunAt: deps.now(), + lastJobStatuses: statuses, + lastStatus: 'manual-review-required', + lastError: `max retries exceeded (${maxRetries})`, + attemptCount: nextAttemptCount, + manualReviewRequired: true, + manualReviewReason: `max retries exceeded (${maxRetries})`, + }; + continue; + } + + try { + const result = await deps.processSource(source, fingerprint, current); + nextState.sources[source.id] = result.nextState; + } catch (error: any) { + nextState.sources[source.id] = { + ...current, + fingerprint, + lastRunAt: deps.now(), + lastStatus: 'failed', + lastError: error?.message ?? String(error), + attemptCount: nextAttemptCount, + }; + } + } + + await saveSourceWorkerState(statePath, nextState); + return nextState; +} + +function aggregateStatuses(statuses: Record): string { + const values = Object.values(statuses); + if (values.length === 0) return ''; + if (values.every((status) => status === 'finalized' || status === 'completed')) return 'finalized'; + if (values.some((status) => status === 'failed' || status === 'error')) return 'failed'; + if (values.some((status) => isActiveStatus(status))) return 'in-flight'; + return values[0] ?? ''; +} + +function isSuccessStatus(status: string | undefined): boolean { + return status === 'completed' || status === 'finalized' || status === 'no-matching-rows'; +} + +function isActiveStatus(status: string | undefined): boolean { + return status === 'accepted' || status === 'claimed' || status === 'validated' || status === 'broadcast' || status === 'included' || status === 'queued' || status === 'in-flight'; +} diff --git a/packages/agent/test/source-registry.test.ts b/packages/agent/test/source-registry.test.ts new file mode 100644 index 000000000..97800e295 --- /dev/null +++ b/packages/agent/test/source-registry.test.ts @@ -0,0 +1,22 @@ +import { describe, expect, it } from 'vitest'; +import { createSourceRegistry } from '../src/source-registry.js'; + +describe('source registry', () => { + it('registers and resolves handlers by source kind', async () => { + const registry = createSourceRegistry<{ kind: string }, string>(); + registry.register('demo', { + async computeFingerprint() { return 'fp'; }, + async prepare() { return { fingerprint: 'fp', assets: ['a'] }; }, + }); + + expect(registry.has('demo')).toBe(true); + expect(registry.listKinds()).toEqual(['demo']); + const handler = registry.resolve({ kind: 'demo' }); + await expect(handler.computeFingerprint({ kind: 'demo' })).resolves.toBe('fp'); + }); + + it('throws for unsupported source kinds', () => { + const registry = createSourceRegistry<{ kind: string }, string>(); + expect(() => registry.resolve({ kind: 'missing' })).toThrow(/Unsupported source kind/); + }); +}); diff --git a/packages/agent/test/source-worker.test.ts b/packages/agent/test/source-worker.test.ts new file mode 100644 index 000000000..d4e7611a1 --- /dev/null +++ b/packages/agent/test/source-worker.test.ts @@ -0,0 +1,37 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { runSourceWorkerOnce } from '../src/source-worker.js'; + +const cleanup: string[] = []; +afterEach(async () => { + await Promise.all(cleanup.splice(0).map((path) => rm(path, { recursive: true, force: true }))); +}); + +describe('source worker runtime', () => { + it('skips unchanged finalized jobs and persists reconciled state', async () => { + const dir = await mkdtemp(join(tmpdir(), 'source-worker-')); + cleanup.push(dir); + const statePath = join(dir, 'state.json'); + + const deps = { + now: () => '2026-04-28T00:00:00.000Z', + getFingerprint: vi.fn(async () => 'fp-1'), + getJobStatus: vi.fn(async () => 'finalized'), + processSource: vi.fn(async () => ({ + sourceId: 'src-1', + skipped: false, + fingerprint: 'fp-1', + status: 'queued', + nextState: { fingerprint: 'fp-1', lastStatus: 'queued', lastJobIds: ['job-1'] }, + })), + }; + + await runSourceWorkerOnce([{ id: 'src-1', maxRetries: 3 }], statePath, deps); + const second = await runSourceWorkerOnce([{ id: 'src-1', maxRetries: 3 }], statePath, deps); + + expect(deps.processSource).toHaveBeenCalledTimes(1); + expect(second.sources['src-1']?.lastStatus).toBe('finalized'); + }); +}); diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 3d96016e3..85e97cb02 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -22,6 +22,7 @@ import { } from './config.js'; import { ApiClient } from './api-client.js'; import { parsePositiveMsOption } from './publisher-runner.js'; +import { runConfiguredSourceWorker } from './source-worker-runner.js'; function isDaemonUnreachable(err: unknown): boolean { const msg = err instanceof Error ? err.message : String(err); @@ -2153,6 +2154,26 @@ sharedMemoryCmd } }); +// ─── dkg source-worker ──────────────────────────────────────────────── + +const sourceWorkerCmd = program + .command('source-worker') + .description('Run generic source workers against the DKG daemon'); + +sourceWorkerCmd + .command('run') + .description('Run a source worker from a JSON config file') + .requiredOption('--config ', 'Worker config JSON file') + .option('--once', 'Run a single iteration and exit') + .action(async (opts: ActionOpts) => { + try { + await runConfiguredSourceWorker(String(opts.config), { once: opts.once === true }); + } catch (err) { + console.error(toErrorMessage(err)); + process.exit(1); + } + }); + // ─── dkg publisher ──────────────────────────────────────────────────── const publisherCmd = program @@ -2576,6 +2597,7 @@ program const chainResolved = resolveChainConfig(config, network); const rpcUrl = chainResolved?.rpcUrl; const hubAddress = chainResolved?.hubAddress; + const tokenAddress = config.chain?.tokenAddress ?? network?.chain?.tokenAddress; const chainId = chainResolved?.chainId ?? '(unknown)'; let provider: ethers.JsonRpcProvider | null = null; @@ -2585,7 +2607,10 @@ program if (rpcUrl) { try { provider = new ethers.JsonRpcProvider(rpcUrl); - if (hubAddress) { + if (tokenAddress && tokenAddress !== ethers.ZeroAddress) { + token = new ethers.Contract(tokenAddress, ['function balanceOf(address) view returns (uint256)', 'function symbol() view returns (string)'], provider); + tokenSymbol = await token.symbol().catch(() => 'TRAC'); + } else if (hubAddress) { const hub = new ethers.Contract(hubAddress, ['function getContractAddress(string) view returns (address)'], provider); const tokenAddr = await hub.getContractAddress('Token'); if (tokenAddr !== ethers.ZeroAddress) { diff --git a/packages/cli/src/config.ts b/packages/cli/src/config.ts index 467371666..29f3af7be 100644 --- a/packages/cli/src/config.ts +++ b/packages/cli/src/config.ts @@ -85,6 +85,7 @@ export interface NetworkConfig { type: 'evm'; rpcUrl: string; hubAddress: string; + tokenAddress?: string; chainId: string; }; faucet?: { @@ -100,6 +101,8 @@ export interface ChainConfig { rpcUrl: string; /** Hub contract address */ hubAddress: string; + /** Optional token contract address override. When omitted, resolve from Hub.Token. */ + tokenAddress?: string; /** Chain identifier (e.g., 'base:84532') */ chainId?: string; /** diff --git a/packages/cli/src/daemon/routes/status.ts b/packages/cli/src/daemon/routes/status.ts index d1f9d8939..cd38307a9 100644 --- a/packages/cli/src/daemon/routes/status.ts +++ b/packages/cli/src/daemon/routes/status.ts @@ -620,12 +620,12 @@ export async function handleStatusRoutes(ctx: RequestContext): Promise { } try { const provider = new ethers.JsonRpcProvider(rpcUrl); - const hub = new ethers.Contract( - hubAddress, - ["function getContractAddress(string) view returns (address)"], - provider, - ); - const tokenAddr = await hub.getContractAddress("Token").catch(() => null); + const tokenAddr = config.chain?.tokenAddress + ?? (await new ethers.Contract( + hubAddress, + ["function getContractAddress(string) view returns (address)"], + provider, + ).getContractAddress("Token").catch(() => null)); let token: ethers.Contract | null = null; let tokenSymbol = "TRAC"; if (tokenAddr && tokenAddr !== ethers.ZeroAddress) { diff --git a/packages/cli/src/source-worker-config.ts b/packages/cli/src/source-worker-config.ts new file mode 100644 index 000000000..1a9c8b663 --- /dev/null +++ b/packages/cli/src/source-worker-config.ts @@ -0,0 +1,31 @@ +import { readFile } from 'node:fs/promises'; +import { dirname, resolve } from 'node:path'; + +export interface SourceWorkerConfig { + pollIntervalMs: number; + stateFile: string; + daemonUrl: string; + daemonToken: string; + handlerModule: string; + handlerExport?: string; + sources: TSource[]; +} + +export async function loadSourceWorkerConfig(configPath: string): Promise> { + const resolvedConfigPath = resolve(configPath); + const raw = await readFile(resolvedConfigPath, 'utf8'); + const parsed = JSON.parse(raw) as SourceWorkerConfig; + if (!Array.isArray(parsed.sources) || parsed.sources.length === 0) { + throw new Error('Source worker config must define at least one source'); + } + if (!parsed.handlerModule) { + throw new Error('Source worker config must define handlerModule'); + } + return { + ...parsed, + pollIntervalMs: parsed.pollIntervalMs > 0 ? parsed.pollIntervalMs : 60000, + stateFile: resolve(dirname(resolvedConfigPath), parsed.stateFile), + daemonUrl: parsed.daemonUrl.replace(/\/$/, ''), + handlerModule: resolve(dirname(resolvedConfigPath), parsed.handlerModule), + }; +} diff --git a/packages/cli/src/source-worker-daemon-client.ts b/packages/cli/src/source-worker-daemon-client.ts new file mode 100644 index 000000000..004ef0a74 --- /dev/null +++ b/packages/cli/src/source-worker-daemon-client.ts @@ -0,0 +1,73 @@ +import type { LiftRequest } from '@origintrail-official/dkg-publisher'; +import type { AssetPartitionQuad } from '@origintrail-official/dkg-core'; + +export interface SharedMemoryWriteResult { + shareOperationId: string; +} + +export interface SharedMemoryWriteClient { + share(contextGraphId: string, quads: AssetPartitionQuad[], options?: { subGraphName?: string }): Promise; +} + +export interface AsyncLiftJobClient { + lift(request: LiftRequest): Promise; + getJobStatus(jobId: string): Promise; +} + +export function createDaemonSharedMemoryWriteClient(daemonUrl: string, token: string): SharedMemoryWriteClient { + return { + async share(contextGraphId: string, quads: AssetPartitionQuad[], options: { subGraphName?: string } = {}): Promise { + const response = await fetch(`${daemonUrl}/api/shared-memory/write`, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + contextGraphId, + quads, + subGraphName: options.subGraphName, + }), + }); + const payload = await response.json().catch(() => ({})); + if (!response.ok) { + throw new Error((payload as { error?: string }).error ?? `HTTP ${response.status}`); + } + return { shareOperationId: (payload as { shareOperationId?: string }).shareOperationId ?? '' }; + }, + }; +} + +export function createDaemonAsyncLiftJobClient(daemonUrl: string, token: string): AsyncLiftJobClient { + return { + async lift(request: LiftRequest): Promise { + const response = await fetch(`${daemonUrl}/api/publisher/enqueue`, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(request), + }); + const payload = await response.json().catch(() => ({})); + if (!response.ok) { + throw new Error((payload as { error?: string }).error ?? `HTTP ${response.status}`); + } + const jobId = (payload as { jobId?: string }).jobId; + if (!jobId) throw new Error('Async publisher enqueue did not return a job id'); + return jobId; + }, + async getJobStatus(jobId: string): Promise { + const response = await fetch(`${daemonUrl}/api/publisher/job?id=${encodeURIComponent(jobId)}`, { + headers: { + Authorization: `Bearer ${token}`, + }, + }); + const payload = await response.json().catch(() => ({})); + if (!response.ok) { + throw new Error((payload as { error?: string }).error ?? `HTTP ${response.status}`); + } + return (payload as { job?: { status?: string } }).job?.status ?? 'unknown'; + }, + }; +} diff --git a/packages/cli/src/source-worker-runner.ts b/packages/cli/src/source-worker-runner.ts new file mode 100644 index 000000000..73988fefd --- /dev/null +++ b/packages/cli/src/source-worker-runner.ts @@ -0,0 +1,137 @@ +import { pathToFileURL } from 'node:url'; +import { + loadSourceWorkerState, + runSourceWorkerOnce, + type SourceWorkerDeps, + type SourceWorkerJobState, + type SourceWorkerSource, +} from '@origintrail-official/dkg-agent'; +import { + createDaemonAsyncLiftJobClient, + createDaemonSharedMemoryWriteClient, + type AsyncLiftJobClient, + type SharedMemoryWriteClient, +} from './source-worker-daemon-client.js'; +import { loadSourceWorkerConfig, type SourceWorkerConfig } from './source-worker-config.js'; + +export interface SourceWorkerHandlerContext { + config: SourceWorkerConfig; + sharedMemory: SharedMemoryWriteClient; + asyncLift: AsyncLiftJobClient; +} + +export interface SourceWorkerHandlerModule { + createSourceWorkerDeps(context: SourceWorkerHandlerContext): Promise< + Pick, 'getFingerprint' | 'processSource'> + > | Pick, 'getFingerprint' | 'processSource'>; +} + +export async function runConfiguredSourceWorker(configPath: string, options: { once?: boolean } = {}): Promise { + const config = await loadSourceWorkerConfig(configPath); + const sharedMemory = createDaemonSharedMemoryWriteClient(config.daemonUrl, config.daemonToken); + const asyncLift = createDaemonAsyncLiftJobClient(config.daemonUrl, config.daemonToken); + const handlerModule = await loadHandlerModule(config); + const workerDeps = await handlerModule.createSourceWorkerDeps({ + config, + sharedMemory, + asyncLift, + }); + + const deps: SourceWorkerDeps = { + now() { + return new Date().toISOString(); + }, + getFingerprint: workerDeps.getFingerprint, + processSource: workerDeps.processSource, + getJobStatus(jobId: string) { + return asyncLift.getJobStatus(jobId); + }, + }; + + const runOnce = async () => { + const previousState = await loadSourceWorkerState(config.stateFile); + const state = await runSourceWorkerOnce(config.sources as SourceWorkerSource[], config.stateFile, deps); + for (const source of config.sources as SourceWorkerSource[]) { + const priorState = previousState.sources[source.id]; + const nextState = state.sources[source.id]; + console.log(formatSourceWorkerMessage(source.id, priorState, nextState)); + } + }; + + await runOnce(); + if (options.once) return; + + let running = false; + setInterval(() => { + void (async () => { + if (running) { + console.warn('[source-worker] skipping interval tick because previous run is still in progress'); + return; + } + running = true; + try { + await runOnce(); + } catch (error) { + console.error('[source-worker] loop failed:', error instanceof Error ? error.message : String(error)); + } finally { + running = false; + } + })(); + }, config.pollIntervalMs); +} + +async function loadHandlerModule( + config: SourceWorkerConfig, +): Promise> { + const namespace = await import(pathToFileURL(config.handlerModule).href); + const candidate = config.handlerExport + ? namespace[config.handlerExport] + : (namespace.default ?? namespace.sourceWorker ?? namespace); + if (!candidate || typeof candidate.createSourceWorkerDeps !== 'function') { + throw new Error( + `Source worker handler module must export createSourceWorkerDeps()${config.handlerExport ? ` via ${config.handlerExport}` : ''}`, + ); + } + return candidate as SourceWorkerHandlerModule; +} + +function formatSourceWorkerMessage( + sourceId: string, + priorState: SourceWorkerJobState | undefined, + state: SourceWorkerJobState | undefined, +): string { + if (!state) return `[source-worker] ${sourceId}: no state`; + if (state.manualReviewRequired) { + return `[source-worker] ${sourceId}: manual review required (${state.manualReviewReason ?? state.lastError ?? 'unknown reason'})`; + } + const sameFingerprint = Boolean(priorState?.fingerprint && priorState.fingerprint === state.fingerprint); + const sameJobs = sameStringArray(priorState?.lastJobIds, state.lastJobIds); + if (state.lastStatus === 'finalized' || state.lastStatus === 'completed') { + const jobs = state.lastJobIds?.length ? ` jobs=${state.lastJobIds.join(',')}` : ''; + if (sameFingerprint && sameJobs) { + return `[source-worker] ${sourceId}: skipped (${state.lastStatus})${jobs}`; + } + return `[source-worker] ${sourceId}: published${jobs}`; + } + if (state.lastStatus === 'failed') { + return `[source-worker] ${sourceId}: failed (${state.lastError ?? 'unknown error'})`; + } + if (state.lastStatus === 'in-flight') { + const jobs = state.lastJobIds?.length ? ` jobs=${state.lastJobIds.join(',')}` : ''; + return `[source-worker] ${sourceId}: in-flight${jobs}`; + } + if (state.lastStatus === 'no-matching-rows') { + return `[source-worker] ${sourceId}: skipped (no-matching-rows)`; + } + if (state.lastStatus && state.fingerprint) { + return `[source-worker] ${sourceId}: skipped (${state.lastStatus})`; + } + return `[source-worker] ${sourceId}: state updated`; +} + +function sameStringArray(left: readonly string[] | undefined, right: readonly string[] | undefined): boolean { + const a = left ?? []; + const b = right ?? []; + if (a.length !== b.length) return false; + return a.every((value, index) => value === b[index]); +} diff --git a/packages/cli/test/source-worker-daemon-client.test.ts b/packages/cli/test/source-worker-daemon-client.test.ts new file mode 100644 index 000000000..6947825ac --- /dev/null +++ b/packages/cli/test/source-worker-daemon-client.test.ts @@ -0,0 +1,43 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { createDaemonAsyncLiftJobClient, createDaemonSharedMemoryWriteClient } from '../src/source-worker-daemon-client.js'; + +const originalFetch = globalThis.fetch; + +describe('source worker daemon client', () => { + beforeEach(() => { + globalThis.fetch = vi.fn(async (input: any) => { + const url = String(input); + if (url.includes('/api/shared-memory/write')) { + return new Response(JSON.stringify({ shareOperationId: 'swm-1' }), { status: 200, headers: { 'Content-Type': 'application/json' } }); + } + if (url.includes('/api/publisher/enqueue')) { + return new Response(JSON.stringify({ jobId: 'job-1' }), { status: 200, headers: { 'Content-Type': 'application/json' } }); + } + if (url.includes('/api/publisher/job')) { + return new Response(JSON.stringify({ job: { status: 'finalized' } }), { status: 200, headers: { 'Content-Type': 'application/json' } }); + } + return new Response('{}', { status: 404, headers: { 'Content-Type': 'application/json' } }); + }) as any; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it('writes shared memory and enqueues/polls async jobs', async () => { + const share = createDaemonSharedMemoryWriteClient('http://127.0.0.1:9200', 'token'); + const jobs = createDaemonAsyncLiftJobClient('http://127.0.0.1:9200', 'token'); + await expect(share.share('cg', [{ subject: 'urn:s', predicate: 'urn:p', object: '"o"' }])).resolves.toEqual({ shareOperationId: 'swm-1' }); + await expect(jobs.lift({ + swmId: 'swm', + shareOperationId: 'swm-1', + roots: ['urn:s'], + contextGraphId: 'cg', + namespace: 'ns', + scope: 'scope', + transitionType: 'CREATE', + authority: { type: 'owner', proofRef: 'proof' }, + })).resolves.toBe('job-1'); + await expect(jobs.getJobStatus('job-1')).resolves.toBe('finalized'); + }); +}); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index ce6267f41..1408e92de 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -68,3 +68,4 @@ export { type ExtractionPipeline, ExtractionPipelineRegistry, } from './extraction-pipeline.js'; +export * from './transducers.js'; diff --git a/packages/core/src/transducers.ts b/packages/core/src/transducers.ts new file mode 100644 index 000000000..4598a5145 --- /dev/null +++ b/packages/core/src/transducers.ts @@ -0,0 +1,192 @@ +export interface SourceAdapterContext { + dataset: string; + sourceType: string; + sourceRef?: string; +} + +export interface SourceRecord { + dataset: string; + sourceTable: string; + values: Record; + rowNumber?: number; + sourceRef?: string; +} + +export interface SourceAdapterResult { + rows: TRow[]; + errors?: string[]; + warnings?: string[]; +} + +export interface SourceAdapter { + read( + input: TInput, + context: SourceAdapterContext, + ): Promise> | SourceAdapterResult; +} + +export interface NormalizedRecord extends SourceRecord { + keys: Record; +} + +export interface NormalizerContext { + dataset: string; +} + +export interface NormalizeResult { + records: TRecord[]; + errors?: string[]; + warnings?: string[]; +} + +export interface Normalizer { + normalize( + rows: TRow[], + context: NormalizerContext, + ): Promise> | NormalizeResult; +} + +export interface TransformContext { + dataset: string; + record: TRecord; +} + +export type Transform = ( + value: unknown, + context: TransformContext, +) => unknown; + +export type TransformRegistry = Record>; + +export interface FieldMapping { + sourceField: string; + targetPredicate: string; + required?: boolean; + transform?: string; +} + +export interface IdentityContext { + dataset: string; + classIri: string; + record: TRecord; +} + +export interface IdentityStrategy { + keyFields: string[]; + buildId(record: TRecord, context: IdentityContext): string; +} + +export interface RelationRule { + predicate: string; + sourceDataset: string; + targetDataset: string; + sourceField: string; + targetField: string; + many?: boolean; +} + +export interface DatasetMappingSpec { + dataset: string; + classIri: string; + description?: string; + identity: IdentityStrategy; + fieldMappings: FieldMapping[]; + relationRules?: RelationRule[]; +} + +export interface AssetPartitionQuad { + subject: string; + predicate: string; + object: string; + graph?: string; +} + +export interface AssetPartitionAsset { + rootEntity: string; + quads: AssetPartitionQuad[]; +} + +export interface AssetPartitionContext> { + dataset: string; + contextGraphId: string; + nodes: readonly TNode[]; + quads: readonly AssetPartitionQuad[]; +} + +export interface AssetPartitionStrategy> { + partition( + context: AssetPartitionContext, + ): Promise | AssetPartitionAsset[]; +} + +export interface DatasetTransducerContext { + dataset: string; + mappingSpec: DatasetMappingSpec; +} + +export interface TransduceResult> { + records: TRecord[]; + nodes: TNode[]; + quads: AssetPartitionQuad[]; + assets: AssetPartitionAsset[]; + errors?: string[]; + warnings?: string[]; +} + +export interface DatasetTransducer> { + transduce( + input: TInput, + context: DatasetTransducerContext, + ): Promise> | TransduceResult; +} + +export function defineSourceAdapter(adapter: SourceAdapter): SourceAdapter { + return adapter; +} + +export function readWithSourceAdapter( + adapter: SourceAdapter, + input: TInput, + context: SourceAdapterContext, +): Promise> | SourceAdapterResult { + return adapter.read(input, context); +} + +export function defineNormalizer(normalizer: Normalizer): Normalizer { + return normalizer; +} + +export function normalizeWith( + normalizer: Normalizer, + rows: TRow[], + context: NormalizerContext, +): Promise> | NormalizeResult { + return normalizer.normalize(rows, context); +} + +export function defineAssetPartitionStrategy( + strategy: AssetPartitionStrategy, +): AssetPartitionStrategy { + return strategy; +} + +export function partitionAssetsWith( + strategy: AssetPartitionStrategy, + context: AssetPartitionContext, +) { + return strategy.partition(context); +} + +export function defineDatasetTransducer( + transducer: DatasetTransducer, +): DatasetTransducer { + return transducer; +} + +export function transduceWith( + transducer: DatasetTransducer, + input: TInput, + context: DatasetTransducerContext, +): Promise> | TransduceResult { + return transducer.transduce(input, context); +} diff --git a/packages/core/test/transducers.test.ts b/packages/core/test/transducers.test.ts new file mode 100644 index 000000000..6d3fb31dc --- /dev/null +++ b/packages/core/test/transducers.test.ts @@ -0,0 +1,54 @@ +import { describe, expect, it } from 'vitest'; +import { + defineSourceAdapter, + readWithSourceAdapter, + defineNormalizer, + normalizeWith, + defineDatasetTransducer, + transduceWith, +} from '../src/transducers.js'; + +describe('core transducer helpers', () => { + it('adapts source, normalize, and transduce helpers generically', async () => { + const source = defineSourceAdapter }>({ + read(input, context) { + return { rows: [{ dataset: context.dataset, sourceTable: context.sourceType, values: { raw: input } }] }; + }, + }); + const sourceRows = await readWithSourceAdapter(source, 'a', { dataset: 'demo', sourceType: 'csv' }); + + const normalizer = defineNormalizer<{ dataset: string; sourceTable: string; values: Record }, { dataset: string; sourceTable: string; values: Record; keys: Record }>({ + normalize(rows) { + return { records: rows.map((row) => ({ ...row, keys: { raw: String(row.values.raw ?? '') } })) }; + }, + }); + const normalized = await normalizeWith(normalizer, sourceRows.rows, { dataset: 'demo' }); + + const transducer = defineDatasetTransducer({ + transduce(input) { + return { + records: input, + nodes: [{ '@id': 'urn:test:1' }], + quads: [{ subject: 'urn:test:1', predicate: 'urn:p', object: '"a"' }], + assets: [{ rootEntity: 'urn:test:1', quads: [{ subject: 'urn:test:1', predicate: 'urn:p', object: '"a"' }] }], + }; + }, + }); + const result = transduceWith(transducer, normalized.records, { + dataset: 'demo', + mappingSpec: { + dataset: 'demo', + classIri: 'urn:test:Class', + identity: { keyFields: ['raw'], buildId: () => 'urn:test:1' }, + fieldMappings: [], + }, + }); + + expect(result).toEqual({ + records: normalized.records, + nodes: [{ '@id': 'urn:test:1' }], + quads: [{ subject: 'urn:test:1', predicate: 'urn:p', object: '"a"' }], + assets: [{ rootEntity: 'urn:test:1', quads: [{ subject: 'urn:test:1', predicate: 'urn:p', object: '"a"' }] }], + }); + }); +}); diff --git a/packages/node-ui/src/ui/hooks/useProjectProfile.ts b/packages/node-ui/src/ui/hooks/useProjectProfile.ts index ecae57c9e..2974e8beb 100644 --- a/packages/node-ui/src/ui/hooks/useProjectProfile.ts +++ b/packages/node-ui/src/ui/hooks/useProjectProfile.ts @@ -93,10 +93,24 @@ export interface FilterChip { export interface SavedQuery { slug: string; subGraph: string; + catalogSlug: string; + catalogName: string; + catalogDescription?: string; + catalogRank: number; name: string; description?: string; sparql: string; resultColumn: string; + rank: number; +} + +export interface QueryCatalog { + slug: string; + subGraph: string; + name: string; + description?: string; + rank: number; + queries: SavedQuery[]; } export interface ProjectProfile { @@ -109,6 +123,7 @@ export interface ProjectProfile { typeBindings: EntityTypeBinding[]; views: ViewConfig[]; filterChips: FilterChip[]; + queryCatalogs: QueryCatalog[]; savedQueries: SavedQuery[]; loading: boolean; error?: string; @@ -116,6 +131,7 @@ export interface ProjectProfile { forType: (typeIri: string) => EntityTypeBinding | undefined; view: (slug: string) => ViewConfig | undefined; chipsFor: (subGraphSlug: string) => FilterChip[]; + savedQueryCatalogsFor: (subGraphSlug: string) => QueryCatalog[]; savedQueriesFor: (subGraphSlug: string) => SavedQuery[]; } @@ -235,23 +251,149 @@ WHERE { GROUP BY ?chip ?subGraph ?type ?predicate ?label`; } +function buildQueryCatalogsQuery(contextGraphId: string): string { + return `PREFIX prof: <${PROFILE_NS}> +PREFIX schema: +SELECT ?catalog ?subGraph ?name ?description ?rank +WHERE { + GRAPH ?g { + ?catalog a prof:QueryCatalog ; + prof:forSubGraph ?subGraph . + OPTIONAL { ?catalog prof:displayName ?name } + OPTIONAL { ?catalog schema:description ?description } + OPTIONAL { ?catalog prof:rank ?rank } + } + ${metaGraphFilter(contextGraphId)} +}`; +} + function buildSavedQueriesQuery(contextGraphId: string): string { return `PREFIX prof: <${PROFILE_NS}> PREFIX schema: -SELECT ?q ?subGraph ?name ?description ?sparql ?column +SELECT ?q ?subGraph ?catalog ?name ?description ?sparql ?column ?rank WHERE { GRAPH ?g { ?q a prof:SavedQuery ; prof:forSubGraph ?subGraph ; prof:sparqlQuery ?sparql . + OPTIONAL { ?q prof:inCatalog ?catalog } OPTIONAL { ?q prof:displayName ?name } OPTIONAL { ?q schema:description ?description } OPTIONAL { ?q prof:resultColumn ?column } + OPTIONAL { ?q prof:rank ?rank } } ${metaGraphFilter(contextGraphId)} }`; } +interface QueryCatalogRowShape extends Record {} +interface SavedQueryRowShape extends Record {} + +export function buildQueryCatalogState( + catalogRows: readonly QueryCatalogRowShape[], + queryRows: readonly SavedQueryRowShape[], +): { + queryCatalogs: QueryCatalog[]; + savedQueries: SavedQuery[]; + catalogsBySubGraph: Map; + queriesBySubGraph: Map; +} { + const catalogsByUri = new Map(); + + for (const row of catalogRows) { + const catalogIri = stripIri(row.catalog); + if (!catalogIri) continue; + const slug = catalogIri.split(':catalog:').pop() ?? catalogIri; + catalogsByUri.set(catalogIri, { + slug, + subGraph: stripLiteral(row.subGraph), + name: stripLiteral(row.name) || slug, + description: stripLiteral(row.description) || undefined, + rank: parseInt10(row.rank) || 99, + queries: [], + }); + } + + const queries: SavedQuery[] = queryRows + .map(row => { + const qIri = stripIri(row.q); + const slug = qIri.split(':query:').pop() ?? qIri; + const subGraph = stripLiteral(row.subGraph); + const catalogIri = stripIri(row.catalog); + const catalog = catalogIri ? catalogsByUri.get(catalogIri) : undefined; + const implicitCatalogSlug = `default:${subGraph}`; + return { + slug, + subGraph, + catalogSlug: catalog?.slug ?? implicitCatalogSlug, + catalogName: catalog?.name ?? 'Queries', + catalogDescription: catalog?.description, + catalogRank: catalog?.rank ?? 999, + name: stripLiteral(row.name) || slug, + description: stripLiteral(row.description) || undefined, + sparql: stripLiteral(row.sparql), + resultColumn: stripLiteral(row.column) || '', + rank: parseInt10(row.rank) || 99, + }; + }) + .filter(q => q.subGraph && q.sparql) + .sort((a, b) => + a.subGraph.localeCompare(b.subGraph) + || a.catalogRank - b.catalogRank + || a.catalogName.localeCompare(b.catalogName) + || a.rank - b.rank + || a.name.localeCompare(b.name), + ); + + const catalogsByComposite = new Map(); + for (const catalog of catalogsByUri.values()) { + catalogsByComposite.set(`${catalog.subGraph}|${catalog.slug}`, { ...catalog, queries: [] }); + } + + for (const query of queries) { + const key = `${query.subGraph}|${query.catalogSlug}`; + const existing = catalogsByComposite.get(key); + if (existing) { + existing.queries.push(query); + continue; + } + catalogsByComposite.set(key, { + slug: query.catalogSlug, + subGraph: query.subGraph, + name: query.catalogName, + description: query.catalogDescription, + rank: query.catalogRank, + queries: [query], + }); + } + + const queryCatalogs = Array.from(catalogsByComposite.values()) + .filter(catalog => catalog.queries.length > 0) + .map(catalog => ({ + ...catalog, + queries: [...catalog.queries].sort((a, b) => a.rank - b.rank || a.name.localeCompare(b.name)), + })) + .sort((a, b) => + a.subGraph.localeCompare(b.subGraph) + || a.rank - b.rank + || a.name.localeCompare(b.name), + ); + + const catalogsBySubGraph = new Map(); + const queriesBySubGraph = new Map(); + for (const catalog of queryCatalogs) { + const nextCatalogs = catalogsBySubGraph.get(catalog.subGraph) ?? []; + nextCatalogs.push(catalog); + catalogsBySubGraph.set(catalog.subGraph, nextCatalogs); + + const nextQueries = queriesBySubGraph.get(catalog.subGraph) ?? []; + nextQueries.push(...catalog.queries); + queriesBySubGraph.set(catalog.subGraph, nextQueries); + } + + return { queryCatalogs, savedQueries: queries, catalogsBySubGraph, queriesBySubGraph }; +} + function buildTypeBindingsQuery(contextGraphId: string): string { return `PREFIX prof: <${PROFILE_NS}> SELECT ?type ?label ?icon ?color ?detailHint @@ -312,11 +454,13 @@ export function useProjectProfile(contextGraphId: string | undefined): ProjectPr const [typeBindings, setTypeBindings] = useState([]); const [views, setViews] = useState([]); const [filterChips, setFilterChips] = useState([]); + const [queryCatalogs, setQueryCatalogs] = useState([]); const [savedQueries, setSavedQueries] = useState([]); const typeIndexRef = useRef>(new Map()); const subIndexRef = useRef>(new Map()); const viewIndexRef = useRef>(new Map()); const chipsBySgRef = useRef>(new Map()); + const queryCatalogsBySgRef = useRef>(new Map()); const queriesBySgRef = useRef>(new Map()); useEffect(() => { @@ -330,12 +474,13 @@ export function useProjectProfile(contextGraphId: string | undefined): ProjectPr setLoading(true); setError(undefined); try { - const [rootRows, sgRows, typeRows, viewRows, chipRows, queryRows] = await Promise.all([ + const [rootRows, sgRows, typeRows, viewRows, chipRows, catalogRows, queryRows] = await Promise.all([ runProjectQuery(buildProfileRootQuery(contextGraphId), contextGraphId).catch(() => []), runProjectQuery(buildSubGraphBindingsQuery(contextGraphId), contextGraphId).catch(() => []), runProjectQuery(buildTypeBindingsQuery(contextGraphId), contextGraphId).catch(() => []), runProjectQuery(buildViewConfigsQuery(contextGraphId), contextGraphId).catch(() => []), runProjectQuery(buildFilterChipsQuery(contextGraphId), contextGraphId).catch(() => []), + runProjectQuery(buildQueryCatalogsQuery(contextGraphId), contextGraphId).catch(() => []), runProjectQuery(buildSavedQueriesQuery(contextGraphId), contextGraphId).catch(() => []), ]); if (cancelled) return; @@ -432,28 +577,11 @@ export function useProjectProfile(contextGraphId: string | undefined): ProjectPr } chipsBySgRef.current = chipsBySg; - const queries: SavedQuery[] = queryRows - .map(row => { - const qIri = stripIri(row.q); - const slug = qIri.split(':query:').pop() ?? qIri; - return { - slug, - subGraph: stripLiteral(row.subGraph), - name: stripLiteral(row.name) || slug, - description: stripLiteral(row.description) || undefined, - sparql: stripLiteral(row.sparql), - resultColumn: stripLiteral(row.column) || '', - }; - }) - .filter(q => q.subGraph && q.sparql); - setSavedQueries(queries); - const queriesBySg = new Map(); - for (const q of queries) { - const list = queriesBySg.get(q.subGraph) ?? []; - list.push(q); - queriesBySg.set(q.subGraph, list); - } - queriesBySgRef.current = queriesBySg; + const queryCatalogState = buildQueryCatalogState(catalogRows, queryRows); + setQueryCatalogs(queryCatalogState.queryCatalogs); + setSavedQueries(queryCatalogState.savedQueries); + queryCatalogsBySgRef.current = queryCatalogState.catalogsBySubGraph; + queriesBySgRef.current = queryCatalogState.queriesBySubGraph; } catch (err: any) { if (!cancelled) setError(err?.message ?? String(err)); } finally { @@ -480,6 +608,10 @@ export function useProjectProfile(contextGraphId: string | undefined): ProjectPr (slug: string) => chipsBySgRef.current.get(slug) ?? [], [], ); + const savedQueryCatalogsFor = useCallback( + (slug: string) => queryCatalogsBySgRef.current.get(slug) ?? [], + [], + ); const savedQueriesFor = useCallback( (slug: string) => queriesBySgRef.current.get(slug) ?? [], [], @@ -495,6 +627,7 @@ export function useProjectProfile(contextGraphId: string | undefined): ProjectPr typeBindings, views, filterChips, + queryCatalogs, savedQueries, loading, error, @@ -502,6 +635,7 @@ export function useProjectProfile(contextGraphId: string | undefined): ProjectPr forType, view, chipsFor, + savedQueryCatalogsFor, savedQueriesFor, }; } @@ -512,4 +646,3 @@ export const ProjectProfileContext = React.createContext( export function useProjectProfileContext(): ProjectProfile | null { return useContext(ProjectProfileContext); } - diff --git a/packages/node-ui/src/ui/views/project/components.tsx b/packages/node-ui/src/ui/views/project/components.tsx index 8ae214c62..c81ff109c 100644 --- a/packages/node-ui/src/ui/views/project/components.tsx +++ b/packages/node-ui/src/ui/views/project/components.tsx @@ -2356,7 +2356,7 @@ export function SubGraphDetailView({ const profile = useProjectProfileContext(); const binding = profile?.forSubGraph(slug); const chips = profile?.chipsFor(slug) ?? []; - const savedQueries = profile?.savedQueriesFor(slug) ?? []; + const queryCatalogs = profile?.savedQueryCatalogsFor(slug) ?? []; const timelinePredicate = binding?.timelinePredicate; const [activeTab, setActiveTab] = useState('items'); @@ -2601,27 +2601,38 @@ export function SubGraphDetailView({ /> - {savedQueries.length > 0 && ( + {queryCatalogs.length > 0 && (
- Saved queries - {savedQueries.map(q => { - const isActive = activeQuerySlug === q.slug; - return ( - - ); - })} + {catalog.name} + + {catalog.queries.map(q => { + const isActive = activeQuerySlug === q.slug; + return ( + + ); + })} + + ))} {queryError && ( ✕ query failed )} @@ -2763,4 +2774,3 @@ export function SubGraphDetailView({
); } - diff --git a/packages/node-ui/src/ui/views/project/helpers.ts b/packages/node-ui/src/ui/views/project/helpers.ts index 41746f016..dc6fa18e3 100644 --- a/packages/node-ui/src/ui/views/project/helpers.ts +++ b/packages/node-ui/src/ui/views/project/helpers.ts @@ -369,7 +369,8 @@ export function formatTimelineBucket(ym: string): string { // Graph / (optional) Timeline / Documents tabs as the layer views. The // layer axis becomes a secondary filter in the header via the mini // pyramid chips; `profile:FilterChip` rows filter by predicate value; -// `profile:SavedQuery` pills run SPARQL and narrow the entity list. +// `profile:QueryCatalog` + `profile:SavedQuery` render grouped SPARQL +// pills that narrow the entity list. export type SubGraphTab = 'items' | 'graph' | 'timeline' | 'docs'; export type SubGraphEntitySort = 'created-desc' | 'created-asc' | 'triples' | 'label'; diff --git a/packages/node-ui/test/use-project-profile.test.ts b/packages/node-ui/test/use-project-profile.test.ts new file mode 100644 index 000000000..6e3bb08a7 --- /dev/null +++ b/packages/node-ui/test/use-project-profile.test.ts @@ -0,0 +1,91 @@ +import { describe, expect, it } from 'vitest'; + +import { buildQueryCatalogState } from '../src/ui/hooks/useProjectProfile.js'; + +describe('buildQueryCatalogState', () => { + it('groups saved queries into explicit catalogs and sorts them by rank', () => { + const state = buildQueryCatalogState( + [ + { + catalog: '', + subGraph: 'tasks', + name: 'Task triage', + description: 'Important task filters', + rank: '2', + }, + { + catalog: { value: 'urn:dkg:profile:demo:catalog:ops' }, + subGraph: { value: 'tasks' }, + name: { value: 'Operations' }, + rank: { value: '1' }, + }, + ], + [ + { + q: '', + subGraph: 'tasks', + catalog: '', + name: 'Blocked tasks', + sparql: 'SELECT ?task WHERE { ?task ?p ?o }', + column: 'task', + rank: '2', + }, + { + q: '', + subGraph: 'tasks', + catalog: '', + name: 'High priority tasks', + sparql: 'SELECT ?task WHERE { ?task ?p ?o }', + column: 'task', + rank: '1', + }, + { + q: { value: 'urn:dkg:profile:demo:query:handoffs' }, + subGraph: { value: 'tasks' }, + catalog: { value: 'urn:dkg:profile:demo:catalog:ops' }, + name: { value: 'Handoffs' }, + sparql: { value: 'SELECT ?task WHERE { ?task ?p ?o }' }, + column: { value: 'task' }, + rank: { value: '1' }, + }, + ], + ); + + expect(state.queryCatalogs).toHaveLength(2); + expect(state.queryCatalogs[0].slug).toBe('ops'); + expect(state.queryCatalogs[1].slug).toBe('triage'); + expect(state.queryCatalogs[1].queries.map(query => query.slug)).toEqual([ + 'high-priority', + 'blocked', + ]); + expect(state.queriesBySubGraph.get('tasks')?.map(query => query.slug)).toEqual([ + 'handoffs', + 'high-priority', + 'blocked', + ]); + }); + + it('creates an implicit default catalog for legacy saved queries without catalog links', () => { + const state = buildQueryCatalogState([], [ + { + q: '', + subGraph: 'github', + name: 'Legacy query', + sparql: 'SELECT ?pr WHERE { ?pr ?p ?o }', + column: 'pr', + }, + ]); + + expect(state.queryCatalogs).toHaveLength(1); + expect(state.queryCatalogs[0]).toMatchObject({ + slug: 'default:github', + subGraph: 'github', + name: 'Queries', + }); + expect(state.queryCatalogs[0].queries[0]).toMatchObject({ + slug: 'legacy', + catalogSlug: 'default:github', + catalogName: 'Queries', + }); + }); +}); diff --git a/packages/publisher/src/async-lift-publisher-impl.ts b/packages/publisher/src/async-lift-publisher-impl.ts index acb3f1b46..8f985143e 100644 --- a/packages/publisher/src/async-lift-publisher-impl.ts +++ b/packages/publisher/src/async-lift-publisher-impl.ts @@ -142,7 +142,9 @@ export class TripleStoreAsyncLiftPublisher implements AsyncLiftPublisher { async update(jobId: string, status: LiftJobState, data: Partial = {}): Promise { await this.ensureGraph(); - const next = this.refreshActiveLease(this.mergeJob(await this.getRequiredJob(jobId), status, data)); + const current = await this.getRequiredJob(jobId); + await this.assertActiveClaimFence(current); + const next = this.refreshActiveLease(this.mergeJob(current, status, data)); this.assertJobMatchesStatus(next); await this.writeJob(next); await this.syncWalletLockForJob(next); @@ -735,6 +737,24 @@ export class TripleStoreAsyncLiftPublisher implements AsyncLiftPublisher { return true; } + private async assertActiveClaimFence(job: LiftJob): Promise { + const walletId = job.claim?.walletId; + if (!walletId) return; + const currentLock = await this.readWalletLock(walletId); + if (!currentLock) { + throw new Error(`stale_claim: wallet lock missing for ${walletId}`); + } + if (!this.lockMatchesJob(currentLock, job)) { + throw new Error(`fence_token_mismatch: wallet lock for ${walletId} no longer matches job ${job.jobId}`); + } + if (currentLock.status !== 'active') { + throw new Error(`stale_claim: wallet lock for ${walletId} is not active`); + } + if (typeof currentLock.expiresAt === 'number' && currentLock.expiresAt <= this.now()) { + throw new Error(`stale_claim: wallet lock for ${walletId} expired`); + } + } + private async withClaimLock(fn: () => Promise): Promise { const previous = TripleStoreAsyncLiftPublisher.claimQueues.get(this.graphUri) ?? Promise.resolve(); let release!: () => void; diff --git a/packages/publisher/src/index.ts b/packages/publisher/src/index.ts index 8e4e54dbd..850bfc813 100644 --- a/packages/publisher/src/index.ts +++ b/packages/publisher/src/index.ts @@ -153,3 +153,4 @@ export { UpdateHandler } from './update-handler.js'; export { ChainEventPoller, type ChainEventPollerConfig, type OnContextGraphCreated } from './chain-event-poller.js'; export { AccessHandler, type AccessPolicy } from './access-handler.js'; export { AccessClient, type AccessResult } from './access-client.js'; +export * from './share-batching.js'; diff --git a/packages/publisher/src/share-batching.ts b/packages/publisher/src/share-batching.ts new file mode 100644 index 000000000..d99e84aa2 --- /dev/null +++ b/packages/publisher/src/share-batching.ts @@ -0,0 +1,85 @@ +export interface ShareBatchQuad { + subject: string; + predicate: string; + object: string; + graph?: string; +} + +export interface ShareBatchAsset { + rootEntity: string; + quads: ShareBatchQuad[]; +} + +export interface ShareBatch { + assets: ShareBatchAsset[]; + roots: string[]; + quads: ShareBatchQuad[]; + estimatedBytes: number; +} + +export const DEFAULT_MAX_SHARE_BATCH_BYTES = 450 * 1024; + +export function groupAssetsByRootEntity(assets: readonly ShareBatchAsset[]): ShareBatchAsset[] { + const grouped = new Map(); + for (const asset of assets) { + const current = grouped.get(asset.rootEntity) ?? []; + grouped.set(asset.rootEntity, [...current, ...asset.quads.map((quad) => ({ ...quad, graph: quad.graph ?? '' }))]); + } + return Array.from(grouped.entries()).map(([rootEntity, quads]) => ({ rootEntity, quads })); +} + +export function batchAssetsByEstimatedBytes( + assets: readonly ShareBatchAsset[], + maxBatchBytes = DEFAULT_MAX_SHARE_BATCH_BYTES, +): ShareBatch[] { + const groupedAssets = groupAssetsByRootEntity(assets); + const batches: ShareBatch[] = []; + let current = emptyBatch(); + + for (const asset of groupedAssets) { + const assetRoots = [asset.rootEntity]; + const assetQuads = asset.quads.map((quad) => ({ ...quad, graph: quad.graph ?? '' })); + const estimatedBytes = estimatePayloadBytes(assetQuads); + + if (estimatedBytes > maxBatchBytes) { + batches.push({ + assets: [asset], + roots: assetRoots, + quads: assetQuads, + estimatedBytes, + }); + continue; + } + + if (current.assets.length > 0 && current.estimatedBytes + estimatedBytes > maxBatchBytes) { + batches.push(current); + current = emptyBatch(); + } + + current = { + assets: [...current.assets, asset], + roots: [...current.roots, ...assetRoots], + quads: [...current.quads, ...assetQuads], + estimatedBytes: current.estimatedBytes + estimatedBytes, + }; + } + + if (current.assets.length > 0) { + batches.push(current); + } + + return batches; +} + +function emptyBatch(): ShareBatch { + return { + assets: [], + roots: [], + quads: [], + estimatedBytes: 0, + }; +} + +function estimatePayloadBytes(quads: readonly ShareBatchQuad[]): number { + return Buffer.byteLength(JSON.stringify({ quads }), 'utf8'); +} diff --git a/packages/publisher/test/fencing-and-kc-anchor-extra.test.ts b/packages/publisher/test/fencing-and-kc-anchor-extra.test.ts index d3c58f05a..d8678bb85 100644 --- a/packages/publisher/test/fencing-and-kc-anchor-extra.test.ts +++ b/packages/publisher/test/fencing-and-kc-anchor-extra.test.ts @@ -184,7 +184,7 @@ describe('P-2 (CRITICAL): fencing token — stale worker after health-check rese }, ); - it('PROD-BUG: update() accepts an in-memory LiftJob even when the on-disk claim token has been overwritten by a different worker', async () => { + it('rejects update() when the on-disk claim token or wallet lock no longer matches the in-memory job claim', async () => { const publisher = createPublisher(); const jobId = await publisher.lift(request()); const claimedByA = await publisher.claimNext('wallet-A'); @@ -225,25 +225,11 @@ describe('P-2 (CRITICAL): fencing token — stale worker after health-check rese } catch (err) { caughtStale = err; } - // PROD-BUG: update() signs off on this mutation with no fence check, - // and the publisher even rewrites the wallet lock for wallet-A - // (re-acquiring a lease that the control plane had explicitly - // invalidated). Observe: lock came back. - expect(caughtStale).toBeNull(); - expect(await walletLockRowCount('wallet-A')).toBeGreaterThan(0); - - // Make the spec expectation explicit: under a correct fencing - // implementation, either the update or the lock recreation would - // fail. The two assertions below codify "at least one of these - // must be false". - const staleWriteAccepted = caughtStale === null; - const lockSilentlyRecreated = (await walletLockRowCount('wallet-A')) > 0; - // PROD-BUG evidence: BOTH are currently true. - expect( - staleWriteAccepted && lockSilentlyRecreated, - 'PROD-BUG: stale worker was allowed to write AND silently regained ' + - 'a wallet lock the control plane had invalidated. See BUGS_FOUND.md P-2.', - ).toBe(false); + expect(caughtStale).toBeInstanceOf(Error); + if (caughtStale instanceof Error) { + expect(caughtStale.message).toMatch(/fenc|stale|lock|claim/i); + } + expect(await walletLockRowCount('wallet-A')).toBe(0); }); }); diff --git a/packages/publisher/test/share-batching.test.ts b/packages/publisher/test/share-batching.test.ts new file mode 100644 index 000000000..d4e84e345 --- /dev/null +++ b/packages/publisher/test/share-batching.test.ts @@ -0,0 +1,39 @@ +import { describe, expect, it } from 'vitest'; +import { batchAssetsByEstimatedBytes, DEFAULT_MAX_SHARE_BATCH_BYTES, groupAssetsByRootEntity } from '../src/share-batching.js'; + +describe('share batching helpers', () => { + it('groups assets by root entity before batching', () => { + const grouped = groupAssetsByRootEntity([ + { rootEntity: 'urn:root:1', quads: [{ subject: 'urn:root:1', predicate: 'urn:p1', object: '"a"' }] }, + { rootEntity: 'urn:root:1', quads: [{ subject: 'urn:child:1', predicate: 'urn:p2', object: '"b"' }] }, + ]); + + expect(grouped).toEqual([ + { + rootEntity: 'urn:root:1', + quads: [ + { subject: 'urn:root:1', predicate: 'urn:p1', object: '"a"', graph: '' }, + { subject: 'urn:child:1', predicate: 'urn:p2', object: '"b"', graph: '' }, + ], + }, + ]); + }); + + it('uses a conservative default below the 512KB SWM gossip cap', () => { + expect(DEFAULT_MAX_SHARE_BATCH_BYTES).toBe(450 * 1024); + expect(DEFAULT_MAX_SHARE_BATCH_BYTES).toBeLessThan(512 * 1024); + }); + + it('splits grouped assets when batch byte estimate exceeds threshold', () => { + const asset = (id: string, size: number) => ({ + rootEntity: id, + quads: [{ subject: id, predicate: 'urn:test:data', object: JSON.stringify('x'.repeat(size)), graph: '' }], + }); + const batches = batchAssetsByEstimatedBytes([ + asset('urn:test:1', 200), + asset('urn:test:2', 200), + asset('urn:test:3', 200), + ], 350); + expect(batches).toHaveLength(3); + }); +}); diff --git a/scripts/devnet.sh b/scripts/devnet.sh index 2a12e00e1..4b96f40d7 100755 --- a/scripts/devnet.sh +++ b/scripts/devnet.sh @@ -353,6 +353,11 @@ create_node_config() { ${relay_value} ${store_block} "contextGraphs": ["devnet-test", "devnet-isolation"], + "publisher": { + "enabled": true, + "pollIntervalMs": 12000, + "errorBackoffMs": 5000 + }, ${devnet_auth_block} "chain": { "type": "evm", @@ -743,7 +748,7 @@ cmd_start() { reg_resp=$(curl -sS --max-time 30 -X POST \ -H "$register_auth_header" \ -H "Content-Type: application/json" \ - -d "{\"id\":\"$cg\"}" \ + -d "{\"id\":\"$cg\",\"accessPolicy\":0}" \ "$register_endpoint" 2>&1 || true) if echo "$reg_resp" | grep -q '"onChainId"'; then on_chain_id=$(echo "$reg_resp" | python3 -c "import sys,json;print(json.load(sys.stdin).get('onChainId',''))" 2>/dev/null || echo '') diff --git a/scripts/import-profile.mjs b/scripts/import-profile.mjs index 08e14123f..006a4c720 100644 --- a/scripts/import-profile.mjs +++ b/scripts/import-profile.mjs @@ -288,7 +288,51 @@ for (const c of chips) { for (const v of c.values) emit(uri(id), uri(Profile.P.chipValue), lit(v)); } -// ── Saved SPARQL queries ────────────────────────────────────── +// ── Query catalogs + saved SPARQL queries ───────────────────── +// Query catalogs let any project group generic DKG-native queries into +// named sets without hardcoding domain logic into the UI. Each catalog is +// declared in the profile data, scoped to a sub-graph, and contains one or +// more SavedQuery entries. The UI renders catalogs as grouped query pills. +const queryCatalogs = [ + { + slug: 'decision-review', + sg: 'decisions', + name: 'Decision review', + description: 'Queries that help reviewers inspect architectural choices and their downstream impact.', + rank: 1, + }, + { + slug: 'task-triage', + sg: 'tasks', + name: 'Task triage', + description: 'Queries for finding urgent, blocked, or dependency-heavy work items.', + rank: 1, + }, + { + slug: 'change-impact', + sg: 'github', + name: 'Change impact', + description: 'Queries for spotting high-signal pull requests and code hotspots.', + rank: 1, + }, + { + slug: 'collaboration', + sg: 'chat', + name: 'Collaboration', + description: 'Queries for surfacing shared working context from people and agents.', + rank: 1, + }, +]; +for (const c of queryCatalogs) { + const id = Profile.uri.catalog(PROJECT_ID, c.slug); + emit(uri(id), uri(Common.type), uri(Profile.T.QueryCatalog)); + emit(uri(id), uri(Profile.P.ofProfile), uri(profileId)); + emit(uri(id), uri(Profile.P.forSubGraph), lit(c.sg)); + emit(uri(id), uri(Profile.P.displayName), lit(c.name)); + emit(uri(id), uri(Common.description), lit(c.description)); + emit(uri(id), uri(Profile.P.rank), lit(c.rank, XSD.int)); +} + // Rendered as pills above the entity list. Clicking a pill runs the query // against /api/sparql/query and displays the result set as the filtered // entity list. `resultColumn` tells the UI which SELECT var holds the @@ -296,9 +340,11 @@ for (const c of chips) { const savedQueries = [ { slug: 'decisions-touching-node-ui', + catalog: 'decision-review', sg: 'decisions', name: 'Decisions affecting node-ui', description: 'Every decision whose `affects` reaches a file in packages/node-ui.', + rank: 1, resultColumn: 'decision', sparql: ` SELECT DISTINCT ?decision WHERE { @@ -309,9 +355,11 @@ SELECT DISTINCT ?decision WHERE { }, { slug: 'p0-p1-tasks-in-flight', + catalog: 'task-triage', sg: 'tasks', name: 'P0 / P1 tasks in flight', description: 'High-priority tasks currently `in_progress` or `blocked`.', + rank: 1, resultColumn: 'task', sparql: ` SELECT DISTINCT ?task WHERE { @@ -326,9 +374,11 @@ SELECT DISTINCT ?task WHERE { }, { slug: 'prs-that-affected-vm-packages', + catalog: 'change-impact', sg: 'github', name: 'PRs that touched flagship packages', description: 'Closed PRs that changed node-ui or graph-viz — likely VM candidates.', + rank: 1, resultColumn: 'pr', sparql: ` SELECT DISTINCT ?pr WHERE { @@ -340,9 +390,11 @@ SELECT DISTINCT ?pr WHERE { }, { slug: 'chat-shared-with-me', + catalog: 'collaboration', sg: 'chat', name: 'Chat shared with me', description: 'Recent SWM-visible chat sessions from other participants — what are your teammates\' assistants working on?', + rank: 1, resultColumn: 'session', sparql: ` SELECT DISTINCT ?session WHERE { @@ -355,9 +407,11 @@ SELECT DISTINCT ?session WHERE { }, { slug: 'decisions-with-open-tasks', + catalog: 'decision-review', sg: 'decisions', name: 'Decisions with open tasks', description: 'Decisions still tracked by at least one non-done task.', + rank: 2, resultColumn: 'decision', sparql: ` SELECT DISTINCT ?decision WHERE { @@ -377,6 +431,8 @@ for (const q of savedQueries) { emit(uri(id), uri(Profile.P.forSubGraph), lit(q.sg)); emit(uri(id), uri(Profile.P.displayName), lit(q.name)); emit(uri(id), uri(Common.description), lit(q.description)); + if (q.catalog) emit(uri(id), uri(Profile.P.inCatalog), uri(Profile.uri.catalog(PROJECT_ID, q.catalog))); + if (q.rank !== undefined) emit(uri(id), uri(Profile.P.rank), lit(q.rank, XSD.int)); emit(uri(id), uri(Profile.P.sparqlQuery), lit(q.sparql)); emit(uri(id), uri(Profile.P.resultColumn), lit(q.resultColumn)); } diff --git a/scripts/lib/ontology.mjs b/scripts/lib/ontology.mjs index 888e1273c..b7519da52 100644 --- a/scripts/lib/ontology.mjs +++ b/scripts/lib/ontology.mjs @@ -353,6 +353,7 @@ export const Profile = { SubGraphBinding: NS.profile + 'SubGraphBinding', EntityTypeBinding: NS.profile + 'EntityTypeBinding', ViewConfig: NS.profile + 'ViewConfig', + QueryCatalog: NS.profile + 'QueryCatalog', // A FilterChip declares an interactive filter for an entity type in a // sub-graph page (e.g. status chips for decisions, priority chips for // tasks). The UI reads these and renders a chip row above the entity @@ -392,6 +393,7 @@ export const Profile = { // predicate and renders a time-sorted ribbon of the sub-graph's entities. timelinePredicate: NS.profile + 'timelinePredicate', // SubGraphBinding -> predicate IRI (a date/dateTime) // ── Saved SPARQL queries (ViewConfig extension) ────────────── + inCatalog: NS.profile + 'inCatalog', // SavedQuery -> QueryCatalog // A SavedQuery ViewConfig renders as a pill above the entity list. On // click the UI runs the query against the project's SPARQL endpoint // and displays the result set as the filtered entity list. @@ -438,6 +440,8 @@ export const Profile = { `urn:dkg:profile:${encodeURIComponent(projectId)}:view:${encodeURIComponent(slug)}`, chip: (projectId, slug) => `urn:dkg:profile:${encodeURIComponent(projectId)}:chip:${encodeURIComponent(slug)}`, + catalog: (projectId, slug) => + `urn:dkg:profile:${encodeURIComponent(projectId)}:catalog:${encodeURIComponent(slug)}`, query: (projectId, slug) => `urn:dkg:profile:${encodeURIComponent(projectId)}:query:${encodeURIComponent(slug)}`, }, From bfe8ad94169209a1235e028e23de86b85ed69299 Mon Sep 17 00:00:00 2001 From: Viktor Pelle Date: Thu, 30 Apr 2026 16:00:51 +0200 Subject: [PATCH 2/2] FE for query catalog --- packages/cli/src/daemon/lifecycle.ts | 84 +-- packages/cli/src/daemon/routes/memory.ts | 71 ++- packages/node-ui/src/ui/api.ts | 21 + packages/node-ui/src/ui/styles.css | 88 +++- packages/node-ui/src/ui/views/ProjectView.tsx | 5 + .../src/ui/views/project/components.tsx | 478 ++++++++++++++++++ .../node-ui/src/ui/views/project/helpers.ts | 2 +- 7 files changed, 713 insertions(+), 36 deletions(-) diff --git a/packages/cli/src/daemon/lifecycle.ts b/packages/cli/src/daemon/lifecycle.ts index fd33096ab..6e735d65a 100644 --- a/packages/cli/src/daemon/lifecycle.ts +++ b/packages/cli/src/daemon/lifecycle.ts @@ -644,7 +644,6 @@ export async function runDaemonInner( }); await agent.start(); - await agent.publishProfile(); const publisherChainBase = chainBase?.rpcUrl && chainBase?.hubAddress ? { @@ -653,38 +652,56 @@ export async function runDaemonInner( chainId: chainBase.chainId, } : undefined; - publisherRuntime = await startPublisherRuntimeIfEnabled({ - dataDir: dkgDir(), - config, - store: agent.store, - keypair: agent.wallet.keypair, - chainBase: publisherChainBase, - ackTransportFactory: () => ({ - publisherPeerId: agent.peerId, - gossipPublish: async (topic: string, data: Uint8Array) => { - await agent.gossip.publish(topic, data); - }, - sendP2P: async (peerId: string, protocol: string, data: Uint8Array) => { - return agent.router.send(peerId, protocol, data); - }, - getConnectedCorePeers: () => { - const allPeers = agent.node.libp2p - .getPeers() - .map((p) => p.toString()) - .filter((id) => id !== agent.peerId); - const knownCorePeerIds = (agent as any).knownCorePeerIds as - | Set - | undefined; - if (knownCorePeerIds && knownCorePeerIds.size > 0) { - const filtered = allPeers.filter((id) => knownCorePeerIds.has(id)); - if (filtered.length > 0) return filtered; - } - return allPeers; - }, - log, - }), - log, - }); + const startPostApiPublishing = () => { + const profileTimer = setTimeout(() => { + void agent.publishProfile().catch((err: any) => { + log(`Agent profile publish failed: ${err?.message ?? String(err)}`); + }); + }, 0); + if (profileTimer.unref) profileTimer.unref(); + + const publisherTimer = setTimeout(() => { + void startPublisherRuntimeIfEnabled({ + dataDir: dkgDir(), + config, + store: agent.store, + keypair: agent.wallet.keypair, + chainBase: publisherChainBase, + ackTransportFactory: () => ({ + publisherPeerId: agent.peerId, + gossipPublish: async (topic: string, data: Uint8Array) => { + await agent.gossip.publish(topic, data); + }, + sendP2P: async (peerId: string, protocol: string, data: Uint8Array) => { + return agent.router.send(peerId, protocol, data); + }, + getConnectedCorePeers: () => { + const allPeers = agent.node.libp2p + .getPeers() + .map((p) => p.toString()) + .filter((id) => id !== agent.peerId); + const knownCorePeerIds = (agent as any).knownCorePeerIds as + | Set + | undefined; + if (knownCorePeerIds && knownCorePeerIds.size > 0) { + const filtered = allPeers.filter((id) => knownCorePeerIds.has(id)); + if (filtered.length > 0) return filtered; + } + return allPeers; + }, + log, + }), + log, + }) + .then((runtime) => { + publisherRuntime = runtime; + }) + .catch((err: any) => { + log(`Async publisher startup failed: ${err?.message ?? String(err)}`); + }); + }, 0); + if (publisherTimer.unref) publisherTimer.unref(); + }; log(`PeerId: ${agent.peerId}`); for (const a of agent.multiaddrs) log(` ${a}`); @@ -1632,6 +1649,7 @@ export async function runDaemonInner( log(`API listening on http://${apiHost}:${boundPort}`); log(`Node UI: http://${apiHost}:${boundPort}/ui`); log('Node is running. Use "dkg status" or "dkg peers" to interact.'); + startPostApiPublishing(); // Graceful shutdown let shuttingDown = false; diff --git a/packages/cli/src/daemon/routes/memory.ts b/packages/cli/src/daemon/routes/memory.ts index b1b71f467..37e5a472d 100644 --- a/packages/cli/src/daemon/routes/memory.ts +++ b/packages/cli/src/daemon/routes/memory.ts @@ -57,7 +57,7 @@ const execAsync = promisify(exec); const execFileAsync = promisify(execFile); import { enrichEvmError, MockChainAdapter } from '@origintrail-official/dkg-chain'; import { DKGAgent, loadOpWallets } from '@origintrail-official/dkg-agent'; -import { computeNetworkId, createOperationContext, DKGEvent, Logger, PayloadTooLargeError, GET_VIEWS, TrustLevel, validateSubGraphName, validateAssertionName, validateContextGraphId, isSafeIri, assertSafeIri, sparqlIri, contextGraphSharedMemoryUri, contextGraphAssertionUri, contextGraphMetaUri } from '@origintrail-official/dkg-core'; +import { computeNetworkId, createOperationContext, DKGEvent, Logger, PayloadTooLargeError, GET_VIEWS, TrustLevel, validateSubGraphName, validateAssertionName, validateContextGraphId, isSafeIri, assertSafeIri, assertSafeRdfTerm, sparqlIri, contextGraphSharedMemoryUri, contextGraphAssertionUri, contextGraphMetaUri } from '@origintrail-official/dkg-core'; import { findReservedSubjectPrefix, isSkolemizedUri } from '@origintrail-official/dkg-publisher'; import { DashboardDB, @@ -363,6 +363,75 @@ export async function handleMemoryRoutes(ctx: RequestContext): Promise { } = ctx; + // POST /api/profile/query-catalog/write + // + // UI profile metadata intentionally lives in unregistered `.../meta/...` + // graphs. Do not route this through shared-memory sub-graph writes: that + // path correctly enforces registered sub-graphs, which is wrong for this + // local profile/catalog namespace. + if (req.method === "POST" && path === "/api/profile/query-catalog/write") { + const body = await readBody(req, SMALL_BODY_BYTES); + const parsed = safeParseJson(body, res); + if (!parsed) return; + + const contextGraphId = parsed.contextGraphId ?? parsed.paranetId; + if (!validateRequiredContextGraphId(contextGraphId, res)) return; + + const { quads } = parsed; + if (!Array.isArray(quads) || quads.length === 0) { + return jsonResponse(res, 400, { + error: 'Missing or invalid "quads" (must be a non-empty array)', + }); + } + + const graph = `did:dkg:context-graph:${contextGraphId}/meta/query-catalog`; + try { + assertSafeIri(graph); + const normalized = quads.map((quad: unknown, index: number) => { + if (!quad || typeof quad !== "object" || Array.isArray(quad)) { + throw new Error(`quads[${index}] must be an object`); + } + const q = quad as Record; + if (typeof q.subject !== "string" || q.subject.length === 0) { + throw new Error(`quads[${index}].subject must be a non-empty string`); + } + if (typeof q.predicate !== "string" || q.predicate.length === 0) { + throw new Error(`quads[${index}].predicate must be a non-empty string`); + } + if (typeof q.object !== "string" || q.object.length === 0) { + throw new Error(`quads[${index}].object must be a non-empty string`); + } + + assertSafeIri(q.subject); + assertSafeIri(q.predicate); + if (q.object.startsWith('"')) { + assertSafeRdfTerm(q.object); + } else { + assertSafeIri(q.object); + } + + return { + subject: q.subject, + predicate: q.predicate, + object: q.object, + graph, + }; + }); + + await agent.store.insert(normalized); + return jsonResponse(res, 200, { + ok: true, + contextGraphId, + graph, + triplesWritten: normalized.length, + }); + } catch (err: any) { + return jsonResponse(res, 400, { + error: err?.message ?? "Invalid query catalog write", + }); + } + } + // POST /api/shared-memory/write (V10) or /api/workspace/write (legacy) if ( req.method === "POST" && diff --git a/packages/node-ui/src/ui/api.ts b/packages/node-ui/src/ui/api.ts index bc71a0258..89d11aeaa 100644 --- a/packages/node-ui/src/ui/api.ts +++ b/packages/node-ui/src/ui/api.ts @@ -396,6 +396,27 @@ export const publishTriples = async (contextGraphId: string, quads: any[]) => { return post('/api/shared-memory/publish', { contextGraphId, selection: 'all', clearAfter: true }); }; +export const writeSharedMemory = ( + contextGraphId: string, + quads: Array<{ subject: string; predicate: string; object: string; graph?: string }>, + opts: { subGraphName?: string; localOnly?: boolean } = {}, +) => + post('/api/shared-memory/write', { + contextGraphId, + quads, + ...(opts.subGraphName ? { subGraphName: opts.subGraphName } : {}), + ...(opts.localOnly !== undefined ? { localOnly: opts.localOnly } : {}), + }); + +export const writeProfileQueryCatalog = ( + contextGraphId: string, + quads: Array<{ subject: string; predicate: string; object: string; graph?: string }>, +) => + post('/api/profile/query-catalog/write', { + contextGraphId, + quads, + }); + // --- Assertions (WM objects) --- export interface AssertionInfo { diff --git a/packages/node-ui/src/ui/styles.css b/packages/node-ui/src/ui/styles.css index da7dea476..f193501f8 100644 --- a/packages/node-ui/src/ui/styles.css +++ b/packages/node-ui/src/ui/styles.css @@ -1554,6 +1554,41 @@ button:focus:not(:focus-visible) { outline: none; } color: var(--text-primary); } .v10-mlv-query-input:focus { border-color: var(--border-strong); } +.v10-cg-query-view { max-width: 1180px; } +.v10-cg-query-editor { + display: grid; + grid-template-columns: minmax(0, 1fr) auto; + gap: 8px; + align-items: stretch; + margin-bottom: 16px; +} +.v10-cg-query-textarea { + min-height: 156px; + resize: vertical; + padding: 10px 12px; + font-family: var(--font-mono); + font-size: 12px; + line-height: 1.5; + background: var(--bg-surface); + border: 1px solid var(--border-default); + border-radius: 6px; + color: var(--text-primary); +} +.v10-cg-query-textarea:focus { border-color: var(--border-strong); outline: none; } +.v10-cg-query-actions { + display: flex; + flex-direction: column; + gap: 8px; +} +.v10-cg-query-catalog { + display: flex; + align-items: center; + gap: 6px; + flex-wrap: wrap; + padding: 8px 0 14px; + margin-bottom: 14px; + border-bottom: 1px solid var(--border-subtle); +} .v10-mlv-run-btn { padding: 8px 16px; border-radius: 6px; @@ -1565,6 +1600,52 @@ button:focus:not(:focus-visible) { outline: none; } transition: all 0.15s; } .v10-mlv-run-btn:hover { border-color: var(--border-strong); color: var(--text-primary); } +.v10-mlv-save-btn { + padding: 8px 14px; + border-radius: 6px; + background: rgba(56, 189, 248, 0.08); + border: 1px solid rgba(56, 189, 248, 0.35); + font-size: 12px; + font-weight: 500; + color: #7dd3fc; + transition: all 0.15s; +} +.v10-mlv-save-btn:hover { + border-color: rgba(56, 189, 248, 0.65); + color: #bae6fd; +} +.v10-cg-query-save-panel { + display: grid; + grid-template-columns: minmax(160px, 0.6fr) minmax(220px, 1fr) auto; + gap: 10px; + align-items: end; + padding: 12px; + margin: -4px 0 16px; + border: 1px solid var(--border-subtle); + border-radius: 6px; + background: var(--bg-panel); +} +.v10-cg-query-save-field { + display: flex; + flex-direction: column; + gap: 5px; +} +.v10-cg-query-save-field span { + font-size: 10px; + font-weight: 700; + letter-spacing: 0.08em; + text-transform: uppercase; + color: var(--text-tertiary); +} +.v10-cg-query-save-field input { + height: 34px; + font-family: var(--font-sans); +} +.v10-cg-query-save-actions { + display: flex; + gap: 8px; + align-items: center; +} .v10-mlv-status { font-size: 12px; color: var(--text-tertiary); padding: 8px 0; } .v10-mlv-empty { padding: 32px 0; text-align: center; } .v10-mlv-empty p { font-size: 12px; color: var(--text-tertiary); } @@ -1596,7 +1677,11 @@ button:focus:not(:focus-visible) { outline: none; } @media (max-width: 900px) { .v10-vm-search-header { flex-direction: column; align-items: stretch; } .v10-vm-search-controls, - .v10-mlv-query-bar { grid-template-columns: 1fr; display: flex; flex-direction: column; } + .v10-mlv-query-bar, + .v10-cg-query-editor { grid-template-columns: 1fr; display: flex; flex-direction: column; } + .v10-cg-query-actions { flex-direction: row; } + .v10-cg-query-save-panel { grid-template-columns: 1fr; } + .v10-cg-query-save-actions { justify-content: flex-end; } } /* ── Assertion List (WM → SWM) ── */ @@ -2942,6 +3027,7 @@ body.light .v10-me-graph-legend { background: rgba(255,255,255,0.8); } .v10-layer-switch-btn:hover { color: var(--text-secondary); background: var(--bg-hover); } .v10-layer-switch-btn.active { color: var(--text-primary); } .v10-layer-switch-btn.active[data-layer="overview"] { border-bottom-color: var(--text-secondary); } +.v10-layer-switch-btn.active[data-layer="query"] { border-bottom-color: #38bdf8; } .v10-layer-switch-btn.active[data-layer="wm"] { border-bottom-color: #64748b; } .v10-layer-switch-btn.active[data-layer="swm"] { border-bottom-color: #f59e0b; } .v10-layer-switch-btn.active[data-layer="vm"] { border-bottom-color: #22c55e; } diff --git a/packages/node-ui/src/ui/views/ProjectView.tsx b/packages/node-ui/src/ui/views/ProjectView.tsx index 749faea66..ec01323f5 100644 --- a/packages/node-ui/src/ui/views/ProjectView.tsx +++ b/packages/node-ui/src/ui/views/ProjectView.tsx @@ -20,6 +20,7 @@ import { PendingJoinRequestsBar, MemoryStrip, SubGraphOverviewGrid, + ContextGraphQueryView, LayerDetailView, ProvenanceBar, } from './project/components.js'; @@ -250,6 +251,10 @@ export function ProjectView({ contextGraphId }: ProjectViewProps) { /> )} + {!activeSubGraph && activeLayer === 'query' && !selectedEntity && ( + + )} + {/* Layer Detail Views */} {!activeSubGraph && (activeLayer === 'wm' || activeLayer === 'swm' || activeLayer === 'vm') && !selectedEntity && ( <> diff --git a/packages/node-ui/src/ui/views/project/components.tsx b/packages/node-ui/src/ui/views/project/components.tsx index c81ff109c..017b2ab1e 100644 --- a/packages/node-ui/src/ui/views/project/components.tsx +++ b/packages/node-ui/src/ui/views/project/components.tsx @@ -6,6 +6,7 @@ import { listJoinRequests, approveJoinRequest, rejectJoinRequest, listParticipants, listAssertions, promoteAssertion, publishSharedMemory, executeQuery, + writeProfileQueryCatalog, fetchSubGraphs, type PendingJoinRequest, type PublishResult, type SubGraphInfo, } from '../../api.js'; @@ -17,6 +18,7 @@ import { } from '../../hooks/useMemoryEntities.js'; import { useProjectProfile, ProjectProfileContext, useProjectProfileContext, + type QueryCatalog, } from '../../hooks/useProjectProfile.js'; import { useAgents, AgentsContext, useAgentsContext, @@ -82,6 +84,13 @@ export function LayerSwitcher({ active, counts, onSwitch, onShare, onImport, onR > Graph Overview + + ); + })} + + ); + })} + + +
+