diff --git a/AGENTS.md b/AGENTS.md index c3a66d3..9d20167 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -118,6 +118,21 @@ await engine.stop(); // graceful shutdown (also closes pool if engine **Dependencies**: `pg` is a peer dependency (you install it); `pg-boss` is a regular dependency (bundled, no install needed). +### `otelPlugin(options?)` - OpenTelemetry tracing + +```typescript +import { workflow, otelPlugin } from 'pg-workflows'; + +// Optional peer dep: install `@opentelemetry/api` and an OTel SDK (e.g. NodeSDK). +// One `pg_workflows.workflow.run` span per worker execution, with child spans +// per step kind. Spans replayed from cache after a pause are suppressed. +const tracedWorkflow = workflow.use(otelPlugin({ + // tracer?: Tracer // default: trace.getTracer('pg-workflows') + // spanNamePrefix?: string // default: 'pg_workflows' + // attributes?: (ctx) => Record +})); +``` + ### Step Types (available on `context.step`) #### `step.run(stepId, handler)` - Execute a durable step diff --git a/README.md b/README.md index a5a8993..f461c69 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,49 @@ See [runnable examples](https://github.com/SokratisVidros/pg-workflows/tree/main - **[Examples](docs/examples.md)** - conditional steps, batch loops, scheduled reminders, retries, monitoring - **[API Reference](docs/api-reference.md)** - `WorkflowEngine`, `WorkflowClient`, `WorkflowRef`, types - **[Configuration](docs/configuration.md)** - env vars, database setup, requirements +- **[Observability](docs/observability.md)** - OpenTelemetry tracing via `otelPlugin` + +--- + +## Observability with OpenTelemetry + +pg-workflows ships a first-party plugin that emits OTel spans for workflow and step execution. `@opentelemetry/api` is an optional peer dependency — install it only if you want tracing. + +```bash +npm install @opentelemetry/api @opentelemetry/sdk-node +``` + +```ts +import { NodeSDK } from '@opentelemetry/sdk-node' +import { trace } from '@opentelemetry/api' +import { workflow, otelPlugin } from 'pg-workflows' + +// Initialize your OTel SDK however you normally do — for Node apps the +// NodeSDK registers an AsyncHooks context manager, which is required for +// hierarchical (parent/child) spans across async boundaries. +new NodeSDK({ /* exporters, resource, ... */ }).start() + +const tracedWorkflow = workflow.use(otelPlugin()) + +const myWorkflow = tracedWorkflow('checkout', async ({ step }) => { + await step.run('charge', async () => { /* ... */ }) + await step.waitFor('await-shipment', { eventName: 'shipped' }) +}) +``` + +The plugin emits a `pg_workflows.workflow.run` span per worker execution (one per resume cycle), with child spans per step kind (`pg_workflows.step.run`, `pg_workflows.step.waitFor`, etc.). Spans carry `workflow.id`, `workflow.run_id`, `workflow.attempt` and, where set, `workflow.resource_id`. Steps replayed from cache after a pause emit no spans. + +**Options:** + +```ts +otelPlugin({ + tracer: trace.getTracer('my-app'), // default: trace.getTracer('pg-workflows') + spanNamePrefix: 'pg_workflows', // default shown + attributes: (ctx) => ({ tenant: ctx.resourceId }), // extra static attrs on workflow.run +}) +``` + +Metrics, distributed trace context propagation across child workflows, and HTTP-caller context propagation are not in v1 — see [the observability docs](docs/observability.md#not-in-v1) for the deferral rationale. --- diff --git a/bun.lock b/bun.lock index 4897864..6f93878 100644 --- a/bun.lock +++ b/bun.lock @@ -15,6 +15,9 @@ "devDependencies": { "@biomejs/biome": "^2.3.10", "@electric-sql/pglite": "^0.3.14", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^1.27.0", + "@opentelemetry/sdk-trace-base": "^1.27.0", "@types/node": "^22.10.2", "@types/pg": "^8.11.10", "bunup": "^0.16.11", @@ -24,8 +27,12 @@ "zod": "^3.24.0", }, "peerDependencies": { + "@opentelemetry/api": "^1.9.0", "pg": "^8.0.0", }, + "optionalPeers": [ + "@opentelemetry/api", + ], }, }, "packages": { @@ -123,6 +130,18 @@ "@napi-rs/wasm-runtime": ["@napi-rs/wasm-runtime@1.1.0", "", { "dependencies": { "@emnapi/core": "^1.7.1", "@emnapi/runtime": "^1.7.1", "@tybys/wasm-util": "^0.10.1" } }, "sha512-Fq6DJW+Bb5jaWE69/qOE0D1TUN9+6uWhCeZpdnSBk14pjLcCWR7Q8n49PTSPHazM37JqrsdpEthXy2xn6jWWiA=="], + "@opentelemetry/api": ["@opentelemetry/api@1.9.1", "", {}, "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q=="], + + "@opentelemetry/context-async-hooks": ["@opentelemetry/context-async-hooks@1.30.1", "", { "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-s5vvxXPVdjqS3kTLKMeBMvop9hbWkwzBpu+mUO2M7sZtlkyDJGwFe33wRKnbaYDo8ExRVBIIdwIGrqpxHuKttA=="], + + "@opentelemetry/core": ["@opentelemetry/core@1.30.1", "", { "dependencies": { "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-OOCM2C/QIURhJMuKaekP3TRBxBKxG/TWWA0TL2J6nXUtDnuCtccy49LUJF8xPFXMX+0LMcxFpCo8M9cGY1W6rQ=="], + + "@opentelemetry/resources": ["@opentelemetry/resources@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1", "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-5UxZqiAgLYGFjS4s9qm5mBVo433u+dSPUFWVWXmLAD4wB65oMCoXaJP1KJa9DIYYMeHu3z4BZcStG3LC593cWA=="], + + "@opentelemetry/sdk-trace-base": ["@opentelemetry/sdk-trace-base@1.30.1", "", { "dependencies": { "@opentelemetry/core": "1.30.1", "@opentelemetry/resources": "1.30.1", "@opentelemetry/semantic-conventions": "1.28.0" }, "peerDependencies": { "@opentelemetry/api": ">=1.0.0 <1.10.0" } }, "sha512-jVPgBbH1gCy2Lb7X0AVQ8XAfgg0pJ4nvl8/IiQA6nxOsPvS+0zMJaFSs2ltXe0J6C8dqjcnpyqINDJmU30+uOg=="], + + "@opentelemetry/semantic-conventions": ["@opentelemetry/semantic-conventions@1.28.0", "", {}, "sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA=="], + "@oxc-minify/binding-android-arm64": ["@oxc-minify/binding-android-arm64@0.93.0", "", { "os": "android", "cpu": "arm64" }, "sha512-N3j/JoK4hXwQbnyOJoEltM8MEkddWV3XtfYimO6jsMjr5R6QdauKaSVeQHO/lSezB7SFkrMPqr6X7tBfghHiXA=="], "@oxc-minify/binding-darwin-arm64": ["@oxc-minify/binding-darwin-arm64@0.93.0", "", { "os": "darwin", "cpu": "arm64" }, "sha512-kLJJe7uBE+a9ql6eLGAtJ1g1LuEXi4aHbsiu342wGe+wRieSPi/Cx0aeDsQjdetwK5mqJWjWS2FO/n03jiw+IQ=="], diff --git a/docs/observability.md b/docs/observability.md new file mode 100644 index 0000000..ed631ae --- /dev/null +++ b/docs/observability.md @@ -0,0 +1,117 @@ +# Observability with OpenTelemetry + +pg-workflows ships a first-party `otelPlugin` that emits OpenTelemetry spans for workflow and step execution. `@opentelemetry/api` is an **optional peer dependency** — users who don't import the plugin pay zero runtime cost. + +## Quick start + +```bash +npm install @opentelemetry/api @opentelemetry/sdk-node +``` + +```ts +import { NodeSDK } from '@opentelemetry/sdk-node'; +import { workflow, otelPlugin } from 'pg-workflows'; + +// Initialize your OTel SDK however you normally do. NodeSDK registers an +// AsyncHooks context manager, which is required for hierarchical (parent/child) +// spans across `await` boundaries inside workflow handlers. +new NodeSDK({ /* exporters, resource, ... */ }).start(); + +const tracedWorkflow = workflow.use(otelPlugin()); + +const checkout = tracedWorkflow('checkout', async ({ step }) => { + await step.run('charge', async () => { /* ... */ }); + await step.waitFor('await-shipment', { eventName: 'shipped' }); +}); +``` + +## Span hierarchy + +Each worker execution of a workflow run produces one trace. A workflow that pauses (`step.waitFor`, `step.delay`, etc.) and resumes later produces a **new trace per resume cycle**. Traces are stitched together via the shared `workflow.id` and `workflow.run_id` attributes. + +``` +pg_workflows.workflow.run +├── pg_workflows.step.run +├── pg_workflows.step.waitFor +├── pg_workflows.step.delay +├── pg_workflows.step.waitUntil +├── pg_workflows.step.pause +├── pg_workflows.step.poll +└── pg_workflows.step.invokeChildWorkflow +``` + +`step.sleep` is an alias for `step.delay`; calls to it emit a `pg_workflows.step.delay` span (semantic consistency — both represent a sleep). + +## Attributes + +| Span | Attributes | +| ------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------- | +| `pg_workflows.workflow.run` | `workflow.id`, `workflow.run_id`, `workflow.attempt` (= `run.retryCount`), `workflow.resource_id` (when set), plus any user-supplied attrs | +| `pg_workflows.step.` | `step.id`, `step.type` (matches the `StepType` enum value) | +| Any span on error | `recordException(err)`, `status.code = ERROR`, `status.message = err.message` | +| Any span on success | `status.code = OK` | + +## Cache-hit suppression + +When a workflow resumes after a pause, the handler re-runs from the top. Steps that completed in a prior execution return their cached output instantly. The plugin detects these cache-hit replays and **does not emit a span** for them. + +Detection is based on `context.timeline`: + +- A step has an output cached in the timeline (`timeline[stepId].output !== undefined`) → cache hit. +- `step.invokeChildWorkflow` additionally checks for the in-flight binding key (`__invokeChildWorkflow:`) — a parent run that re-enters this step during a resume-while-child-still-running cycle is also treated as a cache hit. + +Exception: `step.poll` does not use the cache-hit guard. Each handler invocation that reaches `step.poll` represents a meaningful poll attempt worth tracing. + +## Plugin composition + +The OTel plugin uses the same `wrap(context, next)` middleware hook that any plugin can implement. If you register multiple plugins via `workflow.use(...)`, their wraps compose in registration order — the first plugin's wrap is outermost. + +```ts +const w = workflow + .use(loggingPlugin) // outermost wrap + .use(otelPlugin()) // inner wrap (workflow.run span opens inside loggingPlugin) + ('checkout', async ({ step }) => { /* ... */ }); +``` + +## Options + +```ts +otelPlugin({ + // Tracer to use. Defaults to `trace.getTracer('pg-workflows')`. + tracer: trace.getTracer('my-app'), + + // Span name prefix. Defaults to 'pg_workflows'. + spanNamePrefix: 'pg_workflows', + + // Optional callback returning extra attributes for the workflow.run span. + // Receives the WorkflowContext so you can extract values from the input + // or the run's resourceId. + attributes: (ctx) => ({ tenant: ctx.resourceId }), +}); +``` + +## Error semantics + +When a step or workflow handler throws: + +1. The span's exception is recorded via `span.recordException(error)`. +2. The span status is set to `ERROR` with the error's message. +3. The **original error** is re-thrown — engine retry/DLQ behaviour is unaffected. + +Non-`Error` throws (e.g., `throw 'msg'`) are coerced to an `Error` for the OTel API only; the original value is preserved on the re-throw path. + +## Not in v1 + +These are deliberately out of scope for the initial release. They share a common requirement (durable storage of trace context) and will likely land together when the underlying schema work is done. + +- **Metrics** (counters, histograms, gauges) — different OTel API surface; layers onto the same plugin hooks. +- **Cross-execution trace context propagation** — paused workflows resume as a fresh root trace today. Linking the resume to the prior execution requires persisting the `traceparent` header. +- **`step.invokeChildWorkflow` parent-trace continuation** — child runs start a fresh root trace. Same persistence question. +- **Caller context propagation into `engine.startWorkflow`** — an incoming HTTP trace does not currently propagate into the workflow run. +- **DLQ span emission** — `handleWorkflowRunDlq` runs outside the workflow's plugin chain. DLQ-induced FAILED states therefore don't produce a `workflow.run` span. The precipitating error is already recorded on the last per-execution span via the catch path. +- **Sampling controls** — the plugin defers to your configured `TracerProvider` for sampling. + +## Requirements + +- `@opentelemetry/api` ^1.9.0 (optional peer) +- An OTel SDK that registers an AsyncHooks context manager. `@opentelemetry/sdk-node`'s `NodeSDK` does this automatically. If you're wiring OTel manually, install `@opentelemetry/context-async-hooks` and call `context.setGlobalContextManager(new AsyncHooksContextManager().enable())`. diff --git a/package-lock.json b/package-lock.json index be8a2c6..4d4faa3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,9 @@ "devDependencies": { "@biomejs/biome": "^2.3.10", "@electric-sql/pglite": "^0.3.14", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^1.27.0", + "@opentelemetry/sdk-trace-base": "^1.27.0", "@types/node": "^22.10.2", "@types/pg": "^8.11.10", "bunup": "^0.16.11", @@ -31,7 +34,13 @@ "node": ">=18.0.0" }, "peerDependencies": { + "@opentelemetry/api": "^1.9.0", "pg": "^8.0.0" + }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } } }, "node_modules/@babel/helper-string-parser": { @@ -360,6 +369,90 @@ "@emnapi/runtime": "^1.7.1" } }, + "node_modules/@opentelemetry/api": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.1.tgz", + "integrity": "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=8.0.0" + } + }, + "node_modules/@opentelemetry/context-async-hooks": { + "version": "1.30.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/context-async-hooks/-/context-async-hooks-1.30.1.tgz", + "integrity": "sha512-s5vvxXPVdjqS3kTLKMeBMvop9hbWkwzBpu+mUO2M7sZtlkyDJGwFe33wRKnbaYDo8ExRVBIIdwIGrqpxHuKttA==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/core": { + "version": "1.30.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.30.1.tgz", + "integrity": "sha512-OOCM2C/QIURhJMuKaekP3TRBxBKxG/TWWA0TL2J6nXUtDnuCtccy49LUJF8xPFXMX+0LMcxFpCo8M9cGY1W6rQ==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/semantic-conventions": "1.28.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/resources": { + "version": "1.30.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-1.30.1.tgz", + "integrity": "sha512-5UxZqiAgLYGFjS4s9qm5mBVo433u+dSPUFWVWXmLAD4wB65oMCoXaJP1KJa9DIYYMeHu3z4BZcStG3LC593cWA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/core": "1.30.1", + "@opentelemetry/semantic-conventions": "1.28.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/sdk-trace-base": { + "version": "1.30.1", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-base/-/sdk-trace-base-1.30.1.tgz", + "integrity": "sha512-jVPgBbH1gCy2Lb7X0AVQ8XAfgg0pJ4nvl8/IiQA6nxOsPvS+0zMJaFSs2ltXe0J6C8dqjcnpyqINDJmU30+uOg==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/core": "1.30.1", + "@opentelemetry/resources": "1.30.1", + "@opentelemetry/semantic-conventions": "1.28.0" + }, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/semantic-conventions": { + "version": "1.28.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.28.0.tgz", + "integrity": "sha512-lp4qAiMTD4sNWW4DbKLBkfiMZ4jbAboJIGOQr5DvciMRI494OapieI9qiODpOt0XBr1LjIDy1xAGAnVs5supTA==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=14" + } + }, "node_modules/@oxc-minify/binding-android-arm64": { "version": "0.93.0", "resolved": "https://registry.npmjs.org/@oxc-minify/binding-android-arm64/-/binding-android-arm64-0.93.0.tgz", diff --git a/package.json b/package.json index d5183aa..cee1f6f 100644 --- a/package.json +++ b/package.json @@ -85,11 +85,20 @@ "typescript": "^5.9.3" }, "peerDependencies": { + "@opentelemetry/api": "^1.9.0", "pg": "^8.0.0" }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } + }, "devDependencies": { "@biomejs/biome": "^2.3.10", "@electric-sql/pglite": "^0.3.14", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^1.27.0", + "@opentelemetry/sdk-trace-base": "^1.27.0", "@types/node": "^22.10.2", "@types/pg": "^8.11.10", "bunup": "^0.16.11", diff --git a/src/engine.test.ts b/src/engine.test.ts index f0c78b2..ed06663 100644 --- a/src/engine.test.ts +++ b/src/engine.test.ts @@ -279,6 +279,61 @@ describe('WorkflowEngine', () => { await engine.stop(); }); + + it('should call plugin.wrap around the handler and compose multiple wraps in registration order', async () => { + const calls: string[] = []; + + const outerPlugin: WorkflowPlugin = { + name: 'outer', + methods: () => ({}), + wrap: async (_ctx, next) => { + calls.push('outer:before'); + const result = await next(); + calls.push('outer:after'); + return result; + }, + }; + + const innerPlugin: WorkflowPlugin = { + name: 'inner', + methods: () => ({}), + wrap: async (_ctx, next) => { + calls.push('inner:before'); + const result = await next(); + calls.push('inner:after'); + return result; + }, + }; + + const engine = new WorkflowEngine({ workflows: [], pool: testPool, boss: testBoss }); + await engine.start(); + + const wrapped = workflow.use(outerPlugin).use(innerPlugin)( + 'wrap-order-workflow', + async ({ step }) => { + calls.push('handler'); + await step.run('only-step', async () => 'ok'); + return 'done'; + }, + ); + + await engine.registerWorkflow(wrapped); + const run = await engine.startWorkflow({ workflowId: 'wrap-order-workflow', input: {} }); + + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + expect(calls).toEqual([ + 'outer:before', + 'inner:before', + 'handler', + 'inner:after', + 'outer:after', + ]); + + await engine.stop(); + }); }); describe('unregisterWorkflow()', () => { diff --git a/src/engine.ts b/src/engine.ts index 2c06058..8c3f14a 100644 --- a/src/engine.ts +++ b/src/engine.ts @@ -1123,15 +1123,13 @@ export class WorkflowEngine { let step = { ...baseStep }; const plugins = workflow.plugins ?? []; - for (const plugin of plugins) { - const extra = plugin.methods(step); - step = { ...step, ...extra }; - } const context: WorkflowContext = { input: run.input as InferInputParameters, workflowId: run.workflowId, runId: run.id, + resourceId: run.resourceId ?? undefined, + attempt: run.retryCount, get timeline() { // Read through to the live run so callers see entries written by // previously completed steps within the same handler invocation. @@ -1141,7 +1139,22 @@ export class WorkflowEngine { step, }; - const result = await workflow.handler(context); + for (const plugin of plugins) { + const extra = plugin.methods(step, context); + step = { ...step, ...extra }; + context.step = step; + } + + let next: () => Promise = () => workflow.handler(context); + for (const plugin of [...plugins].reverse()) { + if (plugin.wrap) { + const inner = next; + const wrap = plugin.wrap; + next = () => wrap(context, inner); + } + } + + const result = await next(); run = await this.getRun({ runId, resourceId: scopedResourceId }); diff --git a/src/index.ts b/src/index.ts index 4090c25..f3e6aaa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,6 +7,7 @@ export { createWorkflowRef, workflow } from './definition'; export type { Duration } from './duration'; export { WorkflowEngine, type WorkflowEngineOptions } from './engine'; export { WorkflowEngineError, WorkflowRunNotFoundError } from './error'; +export { type OtelPluginOptions, otelPlugin } from './plugins/otel'; export type { InferInputParameters, InputParameters, diff --git a/src/plugins/otel-test-helpers.ts b/src/plugins/otel-test-helpers.ts new file mode 100644 index 0000000..1524543 --- /dev/null +++ b/src/plugins/otel-test-helpers.ts @@ -0,0 +1,44 @@ +import { context, type Tracer, trace } from '@opentelemetry/api'; +import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; +import { + BasicTracerProvider, + InMemorySpanExporter, + type ReadableSpan, + SimpleSpanProcessor, +} from '@opentelemetry/sdk-trace-base'; + +/** + * Build a fresh tracer + in-memory exporter for a single test. + * Callers MUST invoke `teardown()` in `afterEach`. + */ +export function setupOtel(): { + tracer: Tracer; + getSpans: () => ReadableSpan[]; + getSpansByName: (name: string) => ReadableSpan[]; + teardown: () => Promise; +} { + const exporter = new InMemorySpanExporter(); + const provider = new BasicTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)], + }); + + // AsyncHooks context manager is required for nested step spans to attach + // to the workflow.run span across `await` boundaries. We register it + // globally because OTel's context API reads from the global manager. + const contextManager = new AsyncHooksContextManager().enable(); + context.setGlobalContextManager(contextManager); + + const tracer = provider.getTracer('pg-workflows-test'); + + return { + tracer, + getSpans: () => exporter.getFinishedSpans(), + getSpansByName: (name: string) => exporter.getFinishedSpans().filter((s) => s.name === name), + teardown: async () => { + await provider.shutdown(); + contextManager.disable(); + context.disable(); + trace.disable(); + }, + }; +} diff --git a/src/plugins/otel.test.ts b/src/plugins/otel.test.ts new file mode 100644 index 0000000..9615631 --- /dev/null +++ b/src/plugins/otel.test.ts @@ -0,0 +1,388 @@ +import type pg from 'pg'; +import type { PgBoss } from 'pg-boss'; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'; +import { workflow } from '../definition'; +import { WorkflowEngine } from '../engine'; +import { getBoss } from '../tests/pgboss'; +import { closeTestDatabase, createTestDatabase } from '../tests/test-db'; +import type { StepBaseContext, WorkflowPlugin } from '../types'; +import { WorkflowStatus } from '../types'; +import { otelPlugin } from './otel'; +import { setupOtel } from './otel-test-helpers'; + +let testBoss: PgBoss; +let testPool: pg.Pool; + +beforeAll(async () => { + testPool = await createTestDatabase(); + testBoss = await getBoss(testPool); +}); + +afterAll(async () => { + await closeTestDatabase(); +}); + +describe('otelPlugin', () => { + let otel: ReturnType; + let engine: WorkflowEngine; + + beforeEach(async () => { + otel = setupOtel(); + engine = new WorkflowEngine({ workflows: [], pool: testPool, boss: testBoss }); + await engine.start(); + }); + + afterEach(async () => { + await engine.stop(); + await otel.teardown(); + }); + + it('registers and lets a workflow complete', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))('otel-smoke', async ({ step }) => { + return await step.run('only', async () => 'ok'); + }); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-smoke', input: {} }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.COMPLETED, output: 'ok' }); + }); + + it('emits a workflow.run span on successful completion', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))('otel-wf-span', async () => 'done'); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ + resourceId: 'tenant-1', + workflowId: 'otel-wf-span', + input: {}, + }); + await expect + .poll(async () => await engine.getRun({ runId: run.id, resourceId: 'tenant-1' })) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + const spans = otel.getSpansByName('pg_workflows.workflow.run'); + expect(spans).toHaveLength(1); + expect(spans[0].attributes).toMatchObject({ + 'workflow.id': 'otel-wf-span', + 'workflow.run_id': run.id, + 'workflow.resource_id': 'tenant-1', + 'workflow.attempt': 0, + }); + expect(spans[0].status.code).toBe(1); // SpanStatusCode.OK + }); + + it('records exception and ERROR status on workflow.run when handler throws', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))( + 'otel-wf-throw', + async ({ step }) => { + await step.run('boom', async () => { + throw new Error('kaboom'); + }); + }, + { retries: 0 }, + ); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-wf-throw', input: {} }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.FAILED }); + + const wfSpan = otel.getSpansByName('pg_workflows.workflow.run')[0]; + expect(wfSpan.status.code).toBe(2); // SpanStatusCode.ERROR + expect(wfSpan.status.message).toBe('kaboom'); + expect(wfSpan.events.some((e) => e.name === 'exception')).toBe(true); + }); + + it('emits step.run span as a child of workflow.run', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))( + 'otel-step-run-child', + async ({ step }) => { + return await step.run('foo', async () => 'bar'); + }, + ); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-step-run-child', input: {} }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + const wfSpan = otel.getSpansByName('pg_workflows.workflow.run')[0]; + const stepSpan = otel.getSpansByName('pg_workflows.step.run')[0]; + expect(stepSpan).toBeDefined(); + expect(stepSpan.attributes).toMatchObject({ 'step.id': 'foo', 'step.type': 'run' }); + expect(stepSpan.parentSpanId).toBe(wfSpan.spanContext().spanId); + }); + + it('skips step.run span on cache-hit replay', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))( + 'otel-cache-skip', + async ({ step }) => { + const a = await step.run('first', async () => 'A'); + await step.waitFor('gate', { eventName: 'go' }); + const b = await step.run('second', async () => 'B'); + return { a, b }; + }, + ); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-cache-skip', input: {} }); + + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.PAUSED }); + + // First execution: workflow.run + step.run('first') + step.waitFor('gate') + expect( + otel.getSpansByName('pg_workflows.step.run').map((s) => s.attributes['step.id']), + ).toEqual(['first']); + + await engine.triggerEvent({ runId: run.id, eventName: 'go' }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + // Second execution: NEW workflow.run + step.run('second') only. + // 'first' is a cache hit and emits no span. + const stepRunSpans = otel.getSpansByName('pg_workflows.step.run'); + const ids = stepRunSpans.map((s) => s.attributes['step.id']); + expect(ids).toEqual(['first', 'second']); + expect(otel.getSpansByName('pg_workflows.workflow.run')).toHaveLength(2); + }); + + it('records exception and ERROR status on step.run when handler throws', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))( + 'otel-step-throw', + async ({ step }) => { + await step.run('explode', async () => { + throw new Error('nope'); + }); + }, + { retries: 0 }, + ); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-step-throw', input: {} }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.FAILED }); + + const stepSpan = otel.getSpansByName('pg_workflows.step.run')[0]; + expect(stepSpan.status.code).toBe(2); + expect(stepSpan.status.message).toBe('nope'); + expect(stepSpan.events.some((e) => e.name === 'exception')).toBe(true); + }); + + it('step.run span has non-zero duration matching the step handler runtime', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))( + 'otel-step-duration', + async ({ step }) => { + return await step.run('slow', async () => { + await new Promise((resolve) => setTimeout(resolve, 50)); + return 'done'; + }); + }, + ); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-step-duration', input: {} }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + const stepSpan = otel.getSpansByName('pg_workflows.step.run')[0]; + expect(stepSpan).toBeDefined(); + // Span duration = endTime - startTime in nanoseconds. With a 50ms sleep + // inside the handler, we expect at least ~30ms (allow generous margin). + const startNs = stepSpan.startTime[0] * 1_000_000_000 + stepSpan.startTime[1]; + const endNs = stepSpan.endTime[0] * 1_000_000_000 + stepSpan.endTime[1]; + const durationMs = (endNs - startNs) / 1_000_000; + expect(durationMs).toBeGreaterThan(30); + }); + + it('emits spans for waitFor, delay, waitUntil, pause', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))( + 'otel-other-steps', + async ({ step }) => { + await step.waitFor('wf', { eventName: 'evt' }); + await step.delay('d', '1ms'); + await step.waitUntil('wu', new Date(Date.now() + 1)); + await step.pause('p'); + return 'ok'; + }, + ); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-other-steps', input: {} }); + + // Workflow pauses immediately on first waitFor; drive it through completion. + const drive = async (stepId?: string) => { + for (let i = 0; i < 40; i++) { + const r = await engine.getRun({ runId: run.id }); + if (r.status === WorkflowStatus.PAUSED && (!stepId || r.currentStepId === stepId)) break; + await new Promise((res) => setTimeout(res, 50)); + } + }; + await drive('wf'); + await engine.triggerEvent({ runId: run.id, eventName: 'evt' }); + // delay + waitUntil resolve themselves; wait until paused at the explicit pause step. + await drive('p'); + await engine.resumeWorkflow({ runId: run.id }); + await expect + .poll(async () => await engine.getRun({ runId: run.id }), { timeout: 5000 }) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + const stepNames = otel + .getSpans() + .map((s) => s.name) + .filter((n) => n.startsWith('pg_workflows.step.')); + expect(stepNames).toEqual( + expect.arrayContaining([ + 'pg_workflows.step.waitFor', + 'pg_workflows.step.delay', + 'pg_workflows.step.waitUntil', + 'pg_workflows.step.pause', + ]), + ); + const waitForSpan = otel.getSpansByName('pg_workflows.step.waitFor')[0]; + expect(waitForSpan.attributes).toMatchObject({ 'step.id': 'wf', 'step.type': 'waitFor' }); + }); + + it('emits invokeChildWorkflow span on creation and skips on cache-hit resume', async () => { + const child = workflow('otel-child', async ({ step }) => + step.run('done', async () => 'child-done'), + ); + await engine.registerWorkflow(child); + + const parent = workflow.use(otelPlugin({ tracer: otel.tracer }))( + 'otel-parent', + async ({ step }) => { + const r = await step.invokeChildWorkflow('call-child', { + workflowId: child.id, + input: {}, + }); + return r; + }, + ); + await engine.registerWorkflow(parent); + const run = await engine.startWorkflow({ workflowId: 'otel-parent', input: {} }); + + await expect + .poll(async () => await engine.getRun({ runId: run.id }), { timeout: 5000 }) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + const invokeSpans = otel.getSpansByName('pg_workflows.step.invokeChildWorkflow'); + expect(invokeSpans).toHaveLength(1); + expect(invokeSpans[0].attributes).toMatchObject({ + 'step.id': 'call-child', + 'step.type': 'invokeChildWorkflow', + }); + }); + + it('emits step.poll span on each poll attempt', async () => { + let attempt = 0; + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))('otel-poll', async ({ step }) => { + const result = await step.poll( + 'poller', + async () => { + attempt += 1; + return attempt >= 2 ? { value: attempt } : false; + }, + { interval: '30s', timeout: '60s' }, + ); + return result; + }); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-poll', input: {} }); + + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.PAUSED }); + + // First execution emitted exactly one step.poll span + const firstPolls = otel.getSpansByName('pg_workflows.step.poll'); + expect(firstPolls).toHaveLength(1); + expect(firstPolls[0].attributes).toMatchObject({ 'step.id': 'poller', 'step.type': 'poll' }); + + // Simulate the poll-interval re-fire via fastForwardWorkflow + await engine.fastForwardWorkflow({ runId: run.id }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + // Second execution emits a new poll span (the previous one is not a cache hit + // because the step's *output* is not yet in timeline, only a poll-state entry) + expect(otel.getSpansByName('pg_workflows.step.poll').length).toBeGreaterThanOrEqual(2); + }); + + it('wraps step.sleep (alias for step.delay) with a span', async () => { + const w = workflow.use(otelPlugin({ tracer: otel.tracer }))('otel-sleep', async ({ step }) => { + await step.sleep('napping', '1ms'); + return 'ok'; + }); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-sleep', input: {} }); + await expect + .poll(async () => await engine.getRun({ runId: run.id }), { timeout: 5000 }) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + // sleep is an alias for delay — the span name should be pg_workflows.step.delay + // so users can search uniformly for "delay" spans regardless of the alias used. + const delaySpans = otel.getSpansByName('pg_workflows.step.delay'); + expect(delaySpans.some((s) => s.attributes['step.id'] === 'napping')).toBe(true); + }); + + it('composes wrap with another plugin in registration order', async () => { + const calls: string[] = []; + const trackerPlugin: WorkflowPlugin = { + name: 'tracker', + methods: () => ({}), + wrap: async (_ctx, next) => { + calls.push('tracker:before'); + const r = await next(); + calls.push('tracker:after'); + return r; + }, + }; + + const w = workflow.use(trackerPlugin).use(otelPlugin({ tracer: otel.tracer }))( + 'otel-compose', + async () => 'ok', + ); + await engine.registerWorkflow(w); + const run = await engine.startWorkflow({ workflowId: 'otel-compose', input: {} }); + await expect + .poll(async () => await engine.getRun({ runId: run.id })) + .toMatchObject({ status: WorkflowStatus.COMPLETED }); + + // tracker registered first, so its wrap is outermost — its before runs + // before the workflow.run span opens, and its after runs after the span ends. + const wfSpan = otel.getSpansByName('pg_workflows.workflow.run')[0]; + expect(wfSpan).toBeDefined(); + expect(calls).toEqual(['tracker:before', 'tracker:after']); + }); +}); + +import { invokeChildWorkflowTimelineKey } from '../constants'; +import { isCachedHit } from './otel'; + +describe('isCachedHit', () => { + it('returns true when output is recorded for stepId', () => { + expect(isCachedHit({ s: { output: 'x', timestamp: new Date() } }, 's', 'run')).toBe(true); + }); + + it('returns false when output is undefined', () => { + expect(isCachedHit({ s: { output: undefined, timestamp: new Date() } }, 's', 'run')).toBe( + false, + ); + }); + + it('returns false when timeline has no entry for stepId', () => { + expect(isCachedHit({}, 's', 'run')).toBe(false); + }); + + it('returns false for non-object entry', () => { + expect(isCachedHit({ s: 'not-an-object' }, 's', 'run')).toBe(false); + }); + + it('returns true for invokeChildWorkflow when only the binding key is present', () => { + const timeline = { [invokeChildWorkflowTimelineKey('s')]: { invokeChildWorkflow: {} } }; + expect(isCachedHit(timeline, 's', 'invokeChildWorkflow')).toBe(true); + expect(isCachedHit(timeline, 's', 'run')).toBe(false); + }); +}); diff --git a/src/plugins/otel.ts b/src/plugins/otel.ts new file mode 100644 index 0000000..334c189 --- /dev/null +++ b/src/plugins/otel.ts @@ -0,0 +1,272 @@ +import { + type AttributeValue, + context as otelContext, + SpanStatusCode, + type Tracer, + trace, +} from '@opentelemetry/api'; +import { invokeChildWorkflowTimelineKey } from '../constants'; +import type { StepBaseContext, WorkflowContext, WorkflowPlugin } from '../types'; + +export type OtelPluginOptions = { + /** Tracer to use. Defaults to `trace.getTracer('pg-workflows')`. */ + tracer?: Tracer; + /** Prefix for all span names. Defaults to `pg_workflows`. */ + spanNamePrefix?: string; + /** Extra attributes merged onto the workflow.run span. */ + attributes?: (context: WorkflowContext) => Record; +}; + +const DEFAULT_PREFIX = 'pg_workflows'; + +type StepKind = + | 'run' + | 'waitFor' + | 'delay' + | 'waitUntil' + | 'pause' + | 'poll' + | 'invokeChildWorkflow'; + +export function isCachedHit( + timeline: Record, + stepId: string, + kind: StepKind, +): boolean { + const entry = timeline[stepId]; + if ( + entry && + typeof entry === 'object' && + 'output' in entry && + (entry as { output: unknown }).output !== undefined + ) { + return true; + } + if (kind === 'invokeChildWorkflow' && timeline[invokeChildWorkflowTimelineKey(stepId)]) { + return true; + } + return false; +} + +export function otelPlugin( + options: OtelPluginOptions = {}, +): WorkflowPlugin { + const tracer = options.tracer ?? trace.getTracer('pg-workflows'); + const prefix = options.spanNamePrefix ?? DEFAULT_PREFIX; + const extraAttrs = options.attributes; + + return { + name: 'opentelemetry', + + methods: (step, context) => { + const wrapVoidish = ( + kind: 'waitFor' | 'delay' | 'waitUntil' | 'pause', + base: (stepId: string, ...args: Args) => Promise, + ) => { + return async (stepId: string, ...args: Args): Promise => { + if (isCachedHit(context.timeline, stepId, kind)) { + return base(stepId, ...args); + } + const capturedCtx = otelContext.active(); + const startTime = new Date(); + let result: R; + let originalErr: unknown; + let thrownError: Error | undefined; + try { + result = await base(stepId, ...args); + } catch (err) { + originalErr = err; + thrownError = err instanceof Error ? err : new Error(String(err)); + } + const span = tracer.startSpan( + `${prefix}.step.${kind}`, + { + startTime, + attributes: { 'step.id': stepId, 'step.type': kind }, + }, + capturedCtx, + ); + if (thrownError) { + span.recordException(thrownError); + span.setStatus({ code: SpanStatusCode.ERROR, message: thrownError.message }); + span.end(); + throw originalErr; + } + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + // biome-ignore lint/style/noNonNullAssertion: result is assigned in try when not thrown + return result!; + }; + }; + + return { + run: async (stepId: string, handler: () => Promise) => { + if (isCachedHit(context.timeline, stepId, 'run')) { + return step.run(stepId, handler); + } + + // Capture the active context (workflow.run span) and the start time + // BEFORE running the step, so the emitted span has correct timing. + // We materialise the span only if the step actually ran or threw — + // skipped steps (engine short-circuit on paused/cancelled runs) return + // undefined and produce no span. + const capturedCtx = otelContext.active(); + const startTime = new Date(); + let result: T | undefined; + let originalErr: unknown; + let thrownError: Error | undefined; + + try { + result = await step.run(stepId, handler); + } catch (err) { + originalErr = err; + thrownError = err instanceof Error ? err : new Error(String(err)); + } + + if (result === undefined && !thrownError) { + return undefined as T; + } + + const span = tracer.startSpan( + `${prefix}.step.run`, + { + startTime, + attributes: { 'step.id': stepId, 'step.type': 'run' }, + }, + capturedCtx, + ); + + if (thrownError) { + span.recordException(thrownError); + span.setStatus({ code: SpanStatusCode.ERROR, message: thrownError.message }); + span.end(); + throw originalErr; + } + + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + return result as T; + }, + waitFor: wrapVoidish('waitFor', step.waitFor as never) as StepBaseContext['waitFor'], + delay: wrapVoidish('delay', step.delay as never) as StepBaseContext['delay'], + sleep: wrapVoidish('delay', step.delay as never) as StepBaseContext['sleep'], + waitUntil: wrapVoidish( + 'waitUntil', + step.waitUntil as never, + ) as StepBaseContext['waitUntil'], + pause: wrapVoidish('pause', step.pause as never) as StepBaseContext['pause'], + poll: (async ( + stepId: string, + conditionFn: () => Promise, + pollOptions?: Parameters[2], + ) => { + const capturedCtx = otelContext.active(); + const startTime = new Date(); + let result: Awaited> | undefined; + let originalErr: unknown; + let thrownError: Error | undefined; + try { + result = await step.poll(stepId, conditionFn, pollOptions); + } catch (err) { + originalErr = err; + thrownError = err instanceof Error ? err : new Error(String(err)); + } + const span = tracer.startSpan( + `${prefix}.step.poll`, + { + startTime, + attributes: { 'step.id': stepId, 'step.type': 'poll' }, + }, + capturedCtx, + ); + if (thrownError) { + span.recordException(thrownError); + span.setStatus({ code: SpanStatusCode.ERROR, message: thrownError.message }); + span.end(); + throw originalErr; + } + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + // biome-ignore lint/style/noNonNullAssertion: result is assigned in try when not thrown + return result!; + }) as StepBaseContext['poll'], + invokeChildWorkflow: (async ( + stepId: string, + refOrParams: Parameters[1], + inputArg?: unknown, + optionsArg?: unknown, + ) => { + if (isCachedHit(context.timeline, stepId, 'invokeChildWorkflow')) { + return (step.invokeChildWorkflow as (...args: unknown[]) => Promise)( + stepId, + refOrParams, + inputArg, + optionsArg, + ); + } + const capturedCtx = otelContext.active(); + const startTime = new Date(); + let result: unknown; + let originalErr: unknown; + let thrownError: Error | undefined; + try { + result = await (step.invokeChildWorkflow as (...args: unknown[]) => Promise)( + stepId, + refOrParams, + inputArg, + optionsArg, + ); + } catch (err) { + originalErr = err; + thrownError = err instanceof Error ? err : new Error(String(err)); + } + const span = tracer.startSpan( + `${prefix}.step.invokeChildWorkflow`, + { + startTime, + attributes: { 'step.id': stepId, 'step.type': 'invokeChildWorkflow' }, + }, + capturedCtx, + ); + if (thrownError) { + span.recordException(thrownError); + span.setStatus({ code: SpanStatusCode.ERROR, message: thrownError.message }); + span.end(); + throw originalErr; + } + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + return result; + }) as StepBaseContext['invokeChildWorkflow'], + }; + }, + + wrap: (context, next) => + tracer.startActiveSpan( + `${prefix}.workflow.run`, + { + attributes: { + 'workflow.id': context.workflowId, + 'workflow.run_id': context.runId, + 'workflow.attempt': context.attempt, + ...(context.resourceId ? { 'workflow.resource_id': context.resourceId } : {}), + ...(extraAttrs ? extraAttrs(context) : {}), + }, + }, + async (span) => { + try { + const result = await next(); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (err) { + const error = err instanceof Error ? err : new Error(String(err)); + span.recordException(error); + span.setStatus({ code: SpanStatusCode.ERROR, message: error.message }); + throw err; + } finally { + span.end(); + } + }, + ), + }; +} diff --git a/src/types.ts b/src/types.ts index 618aa4b..48452ae 100644 --- a/src/types.ts +++ b/src/types.ts @@ -96,7 +96,13 @@ export type StepBaseContext = { */ export interface WorkflowPlugin { name: string; - methods: (step: TStepBase) => TStepExt; + methods: (step: TStepBase, context: WorkflowContext) => TStepExt; + /** + * Optional middleware around the workflow handler call. Composes in + * registration order — the first plugin passed to `.use()` wraps everything + * inside. Implementations MUST call `next()` exactly once. + */ + wrap?: (context: WorkflowContext, next: () => Promise) => Promise; } export type WorkflowContext< @@ -107,6 +113,10 @@ export type WorkflowContext< step: TStep; workflowId: string; runId: string; + /** Tenant/scope identifier set when the run was started, if any. */ + resourceId?: string; + /** Zero-based retry attempt number (= `run.retryCount`). */ + attempt: number; timeline: Record; logger: WorkflowLogger; };