Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6c5f921
docs: add OpenTelemetry instrumentation design (issue #34)
SokratisVidros May 21, 2026
bd62d41
docs: add OTel instrumentation implementation plan (issue #34)
SokratisVidros May 21, 2026
f930809
build: add OpenTelemetry deps for otelPlugin
SokratisVidros May 21, 2026
3cb3bec
feat(types): add wrap hook and context arg to WorkflowPlugin
SokratisVidros May 21, 2026
1c54bdc
feat(engine): compose plugin.wrap middleware around handler
SokratisVidros May 21, 2026
1d62ef0
test: add OTel test bootstrap helper
SokratisVidros May 21, 2026
a97b13b
feat(otel): plugin skeleton
SokratisVidros May 21, 2026
6c50c3e
feat(types): expose resourceId and attempt on WorkflowContext
SokratisVidros May 21, 2026
aae27e5
feat(otel): emit workflow.run span via wrap hook
SokratisVidros May 21, 2026
33eeb34
feat(otel): record exception on workflow.run span on failure
SokratisVidros May 21, 2026
98f2493
feat(otel): wrap step.run with span, cache-hit suppression, error path
SokratisVidros May 21, 2026
57b291c
fix(otel): preserve step.run span duration and original error throw
SokratisVidros May 21, 2026
3111e16
feat(otel): wrap waitFor, delay, waitUntil, pause with spans
SokratisVidros May 21, 2026
21a2669
feat(otel): wrap step.poll with span
SokratisVidros May 21, 2026
1d3e41d
feat(otel): wrap step.invokeChildWorkflow with binding-aware cache check
SokratisVidros May 21, 2026
4312549
test(otel): direct coverage for isCachedHit predicate
SokratisVidros May 22, 2026
d2352bb
test(otel): verify plugin composition order with another wrap
SokratisVidros May 22, 2026
399d19c
feat(otel): export otelPlugin and document usage
SokratisVidros May 22, 2026
0573c9f
fix(otel): wrap step.sleep alias and align span names with implementa…
SokratisVidros May 22, 2026
a93ace7
docs: replace internal spec/plan with public observability page
SokratisVidros May 22, 2026
3b5eef8
build: sync bun.lock with new OpenTelemetry deps
SokratisVidros May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ await engine.stop(); // graceful shutdown (also closes pool if engine

**Dependencies**: `pg` is a peer dependency (you install it); `pg-boss` is a regular dependency (bundled, no install needed).

### `otelPlugin(options?)` - OpenTelemetry tracing

```typescript
import { workflow, otelPlugin } from 'pg-workflows';

// Optional peer dep: install `@opentelemetry/api` and an OTel SDK (e.g. NodeSDK).
// One `pg_workflows.workflow.run` span per worker execution, with child spans
// per step kind. Spans replayed from cache after a pause are suppressed.
const tracedWorkflow = workflow.use(otelPlugin({
// tracer?: Tracer // default: trace.getTracer('pg-workflows')
// spanNamePrefix?: string // default: 'pg_workflows'
// attributes?: (ctx) => Record<string, AttributeValue>
}));
```

### Step Types (available on `context.step`)

#### `step.run(stepId, handler)` - Execute a durable step
Expand Down
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,49 @@ See [runnable examples](https://github.com/SokratisVidros/pg-workflows/tree/main
- **[Examples](docs/examples.md)** - conditional steps, batch loops, scheduled reminders, retries, monitoring
- **[API Reference](docs/api-reference.md)** - `WorkflowEngine`, `WorkflowClient`, `WorkflowRef`, types
- **[Configuration](docs/configuration.md)** - env vars, database setup, requirements
- **[Observability](docs/observability.md)** - OpenTelemetry tracing via `otelPlugin`

---

## Observability with OpenTelemetry

pg-workflows ships a first-party plugin that emits OTel spans for workflow and step execution. `@opentelemetry/api` is an optional peer dependency — install it only if you want tracing.

```bash
npm install @opentelemetry/api @opentelemetry/sdk-node
```

```ts
import { NodeSDK } from '@opentelemetry/sdk-node'
import { trace } from '@opentelemetry/api'
import { workflow, otelPlugin } from 'pg-workflows'

// Initialize your OTel SDK however you normally do — for Node apps the
// NodeSDK registers an AsyncHooks context manager, which is required for
// hierarchical (parent/child) spans across async boundaries.
new NodeSDK({ /* exporters, resource, ... */ }).start()

const tracedWorkflow = workflow.use(otelPlugin())

const myWorkflow = tracedWorkflow('checkout', async ({ step }) => {
await step.run('charge', async () => { /* ... */ })
await step.waitFor('await-shipment', { eventName: 'shipped' })
})
```

The plugin emits a `pg_workflows.workflow.run` span per worker execution (one per resume cycle), with child spans per step kind (`pg_workflows.step.run`, `pg_workflows.step.waitFor`, etc.). Spans carry `workflow.id`, `workflow.run_id`, `workflow.attempt` and, where set, `workflow.resource_id`. Steps replayed from cache after a pause emit no spans.

**Options:**

```ts
otelPlugin({
tracer: trace.getTracer('my-app'), // default: trace.getTracer('pg-workflows')
spanNamePrefix: 'pg_workflows', // default shown
attributes: (ctx) => ({ tenant: ctx.resourceId }), // extra static attrs on workflow.run
})
```

Metrics, distributed trace context propagation across child workflows, and HTTP-caller context propagation are not in v1 — see [the observability docs](docs/observability.md#not-in-v1) for the deferral rationale.

---

Expand Down
19 changes: 19 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 117 additions & 0 deletions docs/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Observability with OpenTelemetry

pg-workflows ships a first-party `otelPlugin` that emits OpenTelemetry spans for workflow and step execution. `@opentelemetry/api` is an **optional peer dependency** — users who don't import the plugin pay zero runtime cost.

## Quick start

```bash
npm install @opentelemetry/api @opentelemetry/sdk-node
```

```ts
import { NodeSDK } from '@opentelemetry/sdk-node';
import { workflow, otelPlugin } from 'pg-workflows';

// Initialize your OTel SDK however you normally do. NodeSDK registers an
// AsyncHooks context manager, which is required for hierarchical (parent/child)
// spans across `await` boundaries inside workflow handlers.
new NodeSDK({ /* exporters, resource, ... */ }).start();

const tracedWorkflow = workflow.use(otelPlugin());

const checkout = tracedWorkflow('checkout', async ({ step }) => {
await step.run('charge', async () => { /* ... */ });
await step.waitFor('await-shipment', { eventName: 'shipped' });
});
```

## Span hierarchy

Each worker execution of a workflow run produces one trace. A workflow that pauses (`step.waitFor`, `step.delay`, etc.) and resumes later produces a **new trace per resume cycle**. Traces are stitched together via the shared `workflow.id` and `workflow.run_id` attributes.

```
pg_workflows.workflow.run
├── pg_workflows.step.run
├── pg_workflows.step.waitFor
├── pg_workflows.step.delay
├── pg_workflows.step.waitUntil
├── pg_workflows.step.pause
├── pg_workflows.step.poll
└── pg_workflows.step.invokeChildWorkflow
```

`step.sleep` is an alias for `step.delay`; calls to it emit a `pg_workflows.step.delay` span (semantic consistency — both represent a sleep).

## Attributes

| Span | Attributes |
| ------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------- |
| `pg_workflows.workflow.run` | `workflow.id`, `workflow.run_id`, `workflow.attempt` (= `run.retryCount`), `workflow.resource_id` (when set), plus any user-supplied attrs |
| `pg_workflows.step.<kind>` | `step.id`, `step.type` (matches the `StepType` enum value) |
| Any span on error | `recordException(err)`, `status.code = ERROR`, `status.message = err.message` |
| Any span on success | `status.code = OK` |

## Cache-hit suppression

When a workflow resumes after a pause, the handler re-runs from the top. Steps that completed in a prior execution return their cached output instantly. The plugin detects these cache-hit replays and **does not emit a span** for them.

Detection is based on `context.timeline`:

- A step has an output cached in the timeline (`timeline[stepId].output !== undefined`) → cache hit.
- `step.invokeChildWorkflow` additionally checks for the in-flight binding key (`__invokeChildWorkflow:<stepId>`) — a parent run that re-enters this step during a resume-while-child-still-running cycle is also treated as a cache hit.

Exception: `step.poll` does not use the cache-hit guard. Each handler invocation that reaches `step.poll` represents a meaningful poll attempt worth tracing.

## Plugin composition

The OTel plugin uses the same `wrap(context, next)` middleware hook that any plugin can implement. If you register multiple plugins via `workflow.use(...)`, their wraps compose in registration order — the first plugin's wrap is outermost.

```ts
const w = workflow
.use(loggingPlugin) // outermost wrap
.use(otelPlugin()) // inner wrap (workflow.run span opens inside loggingPlugin)
('checkout', async ({ step }) => { /* ... */ });
```

## Options

```ts
otelPlugin({
// Tracer to use. Defaults to `trace.getTracer('pg-workflows')`.
tracer: trace.getTracer('my-app'),

// Span name prefix. Defaults to 'pg_workflows'.
spanNamePrefix: 'pg_workflows',

// Optional callback returning extra attributes for the workflow.run span.
// Receives the WorkflowContext so you can extract values from the input
// or the run's resourceId.
attributes: (ctx) => ({ tenant: ctx.resourceId }),
});
```

## Error semantics

When a step or workflow handler throws:

1. The span's exception is recorded via `span.recordException(error)`.
2. The span status is set to `ERROR` with the error's message.
3. The **original error** is re-thrown — engine retry/DLQ behaviour is unaffected.

Non-`Error` throws (e.g., `throw 'msg'`) are coerced to an `Error` for the OTel API only; the original value is preserved on the re-throw path.

## Not in v1

These are deliberately out of scope for the initial release. They share a common requirement (durable storage of trace context) and will likely land together when the underlying schema work is done.

- **Metrics** (counters, histograms, gauges) — different OTel API surface; layers onto the same plugin hooks.
- **Cross-execution trace context propagation** — paused workflows resume as a fresh root trace today. Linking the resume to the prior execution requires persisting the `traceparent` header.
- **`step.invokeChildWorkflow` parent-trace continuation** — child runs start a fresh root trace. Same persistence question.
- **Caller context propagation into `engine.startWorkflow`** — an incoming HTTP trace does not currently propagate into the workflow run.
- **DLQ span emission** — `handleWorkflowRunDlq` runs outside the workflow's plugin chain. DLQ-induced FAILED states therefore don't produce a `workflow.run` span. The precipitating error is already recorded on the last per-execution span via the catch path.
- **Sampling controls** — the plugin defers to your configured `TracerProvider` for sampling.

## Requirements

- `@opentelemetry/api` ^1.9.0 (optional peer)
- An OTel SDK that registers an AsyncHooks context manager. `@opentelemetry/sdk-node`'s `NodeSDK` does this automatically. If you're wiring OTel manually, install `@opentelemetry/context-async-hooks` and call `context.setGlobalContextManager(new AsyncHooksContextManager().enable())`.
93 changes: 93 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,20 @@
"typescript": "^5.9.3"
},
"peerDependencies": {
"@opentelemetry/api": "^1.9.0",
"pg": "^8.0.0"
},
"peerDependenciesMeta": {
"@opentelemetry/api": {
"optional": true
}
},
"devDependencies": {
"@biomejs/biome": "^2.3.10",
"@electric-sql/pglite": "^0.3.14",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/context-async-hooks": "^1.27.0",
"@opentelemetry/sdk-trace-base": "^1.27.0",
"@types/node": "^22.10.2",
"@types/pg": "^8.11.10",
"bunup": "^0.16.11",
Expand Down
Loading
Loading