Skip to content

Add generic query-catalog and source-worker publishing flow#326

Open
lupuszr wants to merge 2 commits intomainfrom
feat/query-catalog-dkg
Open

Add generic query-catalog and source-worker publishing flow#326
lupuszr wants to merge 2 commits intomainfrom
feat/query-catalog-dkg

Conversation

@lupuszr
Copy link
Copy Markdown
Contributor

@lupuszr lupuszr commented Apr 29, 2026

Summary

  • add generic QueryCatalog support to v9 profile loading and UI rendering so saved queries can be grouped and displayed as first-class profile resources
  • introduce reusable transducer, share-batching, source-registry, and source-worker plumbing, plus dkg source-worker run --config ... [--once], to support generic async source ingestion workflows
  • harden async publishing with stale-worker fencing coverage and related targeted tests/build validation

CLI Usage
The new source-worker entrypoint runs a generic ingestion handler from a config file and pushes work through the existing async publish flow.
Run continuously:

dkg source-worker run --config /path/to/worker.config.json

Run once for cron/manual execution:

dkg source-worker run --config /path/to/worker.config.json --once

The worker config points the CLI at:

  • a handler module and export to load source-specific ingestion logic
  • a daemon URL/token for SWM writes and async job polling
  • a state file for persistence across runs
    This keeps the worker runtime generic in dkg, while allowing source-specific adapters to live outside the core repo.
{
  "daemonUrl": "http://127.0.0.1:9200",
  "daemonToken": "your-daemon-token",
  "stateFile": "./.tmp/source-worker-state.json",
  "pollIntervalMs": 30000,
  "handlerModule": "/absolute/path/to/source-worker-handler.js",
  "handlerExport": "createSourceHandler",
  "sources": [
    {
      "id": "example-source",
      "kind": "custom",
      "enabled": true,
      "config": {
        "inputPath": "/data/example.csv",
        "dataset": "example-dataset"
      }
    }
  ]
}

@lupuszr lupuszr force-pushed the feat/query-catalog-dkg branch from f9975ae to b45a1de Compare April 30, 2026 08:54
await this.ensureGraph();
const next = this.refreshActiveLease(this.mergeJob(await this.getRequiredJob(jobId), status, data));
const current = await this.getRequiredJob(jobId);
await this.assertActiveClaimFence(current);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: This only fences the update() path. A worker that loses its claim while publishExecutor is running can still mutate the job through recordPublishResult() / recordPublishFailure(), because those methods still write state without re-checking the wallet lock / claim token. Please apply the same fence validation before every post-claim mutation and add regression coverage for the publish-result/failure paths.

continue;
}

const nextAttemptCount = current?.fingerprint === fingerprint ? (current.attemptCount ?? 0) + 1 : 1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: no-matching-rows is treated as a success state, but this loop only derives success from lastJobIds. If a handler returns { lastStatus: 'no-matching-rows' } with no jobs, the next poll sees aggregate === '', falls through here, and keeps retrying until the source is incorrectly forced into manual review. Short-circuit same-fingerprint no-matching-rows before incrementing retries (or persist a synthetic terminal status into the aggregate calculation).

provider,
);
const tokenAddr = await hub.getContractAddress("Token").catch(() => null);
const tokenAddr = config.chain?.tokenAddress
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: This reads only config.chain?.tokenAddress, so a token override coming from the merged network config is ignored here. In that case /api/wallets/balances still falls back to Hub.Token, which can make the UI show the wrong token or fail entirely while dkg wallet succeeds. Use the resolved chain/network token address in this route too.

@lupuszr lupuszr force-pushed the feat/query-catalog-dkg branch from b45a1de to bfe8ad9 Compare April 30, 2026 14:02

try {
const result = await deps.processSource(source, fingerprint, current);
nextState.sources[source.id] = result.nextState;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: processSource() gets the computed fingerprint, but the worker persists result.nextState verbatim. If a handler omits that field from nextState (which is easy with the current API), the next poll sees current.fingerprint as missing and reprocesses the same source again, potentially creating duplicate share/publish jobs forever. Merge the framework-owned fields here (fingerprint, lastRunAt, and any retry/manual-review resets) instead of requiring every handler to remember them.

{ subject: queryUri, predicate: `${PROFILE_NS}inCatalog`, object: catalogUri, graph: '' },
{ subject: queryUri, predicate: `${PROFILE_NS}displayName`, object: literal(name), graph: '' },
{ subject: queryUri, predicate: `${PROFILE_NS}sparqlQuery`, object: literal(sparql), graph: '' },
{ subject: queryUri, predicate: `${PROFILE_NS}rank`, object: intLiteral(rank), graph: '' },
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: rank comes from Date.now() above, but this writes it as xsd:int. Millisecond timestamps are already around 1.7e12, far beyond the 32-bit range of xsd:int, so the stored literal is invalid/overflow-prone and query ordering can become store-dependent. Persist this as xsd:integer/xsd:long, or use a smaller bounded rank value.

const [saveMessage, setSaveMessage] = useState<string | null>(null);
const builtInCatalog = useMemo(() => contextGraphBuiltInCatalog(contextGraphId), [contextGraphId]);
const queryCatalogs = useMemo(
() => [builtInCatalog, ...localSavedCatalogs, ...(profile?.queryCatalogs ?? [])],
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: This concatenates localSavedCatalogs and profile.queryCatalogs without merging by catalog identity. If the persisted profile already contains ui-saved-queries and the user saves another query, the UI renders duplicate catalog sections with the same React key and splits the queries across them. Merge catalogs by subGraph|slug before rendering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant