diff --git a/.clinerules b/.clinerules index 01d42e6..5bf22f2 100644 --- a/.clinerules +++ b/.clinerules @@ -35,7 +35,7 @@ import { z } from 'zod'; const myWorkflow = workflow( 'workflow-id', - async ({ step, input, runId, workflowId, timeline, logger }) => { + async ({ step, input, runId, workflowId, timeline, logger, schedule }) => { const result = await step.run('step-1', async () => { return { processed: input.data }; }); @@ -45,6 +45,8 @@ const myWorkflow = workflow( inputSchema: z.object({ data: z.string() }), timeout: 60000, retries: 3, + cron: '*/15 * * * *', // cron expression string (UTC) + // or: cron: { expression: '*/15 * * * *', timezone: 'America/New_York' }, } ); ``` @@ -67,7 +69,7 @@ await engine.start(); - `step.run(stepId, handler)` - Durable step, runs exactly once, result persisted in PostgreSQL - `step.waitFor(stepId, { eventName, timeout?, schema? })` - Pause and wait for an external event - `step.pause(stepId)` - Manual pause; resume with `engine.resumeWorkflow()` -- `step.waitUntil(stepId, { date })` - Wait until a date (not yet implemented) +- `step.waitUntil(stepId, { date })` - Wait until a specific date ### Engine methods @@ -94,6 +96,17 @@ enum StepType { PAUSE = 'pause', RUN = 'run', WAIT_FOR = 'waitFor', WAIT_UNTIL = - `WorkflowEngineError(message, workflowId?, runId?, cause?)` - Base error - `WorkflowRunNotFoundError(runId?, workflowId?)` - Run not found +## Cron Workflows + +Workflows can run on a cron schedule via the `cron` option on `workflow()`. Uses pg-boss `schedule()` under the hood. + +- `cron` option accepts a string (`'*/15 * * * *'`) or object (`{ expression, timezone? }`) +- `CronConfig`: `{ expression: string, timezone?: string }` +- `ScheduleContext`: `{ timestamp: Date, lastTimestamp: Date | undefined, timezone: string }` — available as `schedule` on workflow context for cron-triggered runs. Built at runtime from `run.createdAt` and latest completed run. +- `WorkflowRun` stores `cron: string | null` and `timezone: string | null` as flat columns (no nested JSON) +- Cron-triggered runs are identified by `cron !== null` on the `WorkflowRun` +- Post-start registration: `registerWorkflow()` after `start()` automatically sets up cron schedule + ## AI & Agent Workflows pg-workflows is ideal for AI agents and LLM pipelines. Key patterns: diff --git a/.cursorrules b/.cursorrules index b7de09e..cc17ec3 100644 --- a/.cursorrules +++ b/.cursorrules @@ -39,7 +39,7 @@ import { z } from 'zod'; const myWorkflow = workflow( 'workflow-id', - async ({ step, input, runId, workflowId, timeline, logger }) => { + async ({ step, input, runId, workflowId, timeline, logger, schedule }) => { const result = await step.run('step-1', async () => { return { processed: input.data }; }); @@ -49,6 +49,8 @@ const myWorkflow = workflow( inputSchema: z.object({ data: z.string() }), timeout: 60000, retries: 3, + cron: '*/15 * * * *', // cron expression string (UTC) + // or: cron: { expression: '*/15 * * * *', timezone: 'America/New_York' }, } ); ``` @@ -71,7 +73,7 @@ await engine.start(); - `step.run(stepId, handler)` - Durable step, runs exactly once, result persisted in PostgreSQL - `step.waitFor(stepId, { eventName, timeout?, schema? })` - Pause and wait for an external event - `step.pause(stepId)` - Manual pause; resume with `engine.resumeWorkflow()` -- `step.waitUntil(stepId, { date })` - Wait until a date (not yet implemented) +- `step.waitUntil(stepId, { date })` - Wait until a specific date ### Engine methods @@ -98,6 +100,17 @@ enum StepType { PAUSE = 'pause', RUN = 'run', WAIT_FOR = 'waitFor', WAIT_UNTIL = - `WorkflowEngineError(message, workflowId?, runId?, cause?)` - Base error - `WorkflowRunNotFoundError(runId?, workflowId?)` - Run not found +## Cron Workflows + +Workflows can run on a cron schedule via the `cron` option on `workflow()`. Uses pg-boss `schedule()` under the hood. + +- `cron` option accepts a string (`'*/15 * * * *'`) or object (`{ expression, timezone? }`) +- `CronConfig`: `{ expression: string, timezone?: string }` +- `ScheduleContext`: `{ timestamp: Date, lastTimestamp: Date | undefined, timezone: string }` — available as `schedule` on workflow context for cron-triggered runs. Built at runtime from `run.createdAt` and latest completed run. +- `WorkflowRun` stores `cron: string | null` and `timezone: string | null` as flat columns (no nested JSON) +- Cron-triggered runs are identified by `cron !== null` on the `WorkflowRun` +- Post-start registration: `registerWorkflow()` after `start()` automatically sets up cron schedule + ## AI & Agent Workflows pg-workflows is ideal for AI agents and LLM pipelines. Key patterns: diff --git a/.windsurfrules b/.windsurfrules index 90bdb3f..214035e 100644 --- a/.windsurfrules +++ b/.windsurfrules @@ -35,7 +35,7 @@ import { z } from 'zod'; const myWorkflow = workflow( 'workflow-id', - async ({ step, input, runId, workflowId, timeline, logger }) => { + async ({ step, input, runId, workflowId, timeline, logger, schedule }) => { const result = await step.run('step-1', async () => { return { processed: input.data }; }); @@ -45,6 +45,8 @@ const myWorkflow = workflow( inputSchema: z.object({ data: z.string() }), timeout: 60000, retries: 3, + cron: '*/15 * * * *', // cron expression string (UTC) + // or: cron: { expression: '*/15 * * * *', timezone: 'America/New_York' }, } ); ``` @@ -67,7 +69,7 @@ await engine.start(); - `step.run(stepId, handler)` - Durable step, runs exactly once, result persisted in PostgreSQL - `step.waitFor(stepId, { eventName, timeout?, schema? })` - Pause and wait for an external event - `step.pause(stepId)` - Manual pause; resume with `engine.resumeWorkflow()` -- `step.waitUntil(stepId, { date })` - Wait until a date (not yet implemented) +- `step.waitUntil(stepId, { date })` - Wait until a specific date ### Engine methods @@ -94,6 +96,17 @@ enum StepType { PAUSE = 'pause', RUN = 'run', WAIT_FOR = 'waitFor', WAIT_UNTIL = - `WorkflowEngineError(message, workflowId?, runId?, cause?)` - Base error - `WorkflowRunNotFoundError(runId?, workflowId?)` - Run not found +## Cron Workflows + +Workflows can run on a cron schedule via the `cron` option on `workflow()`. Uses pg-boss `schedule()` under the hood. + +- `cron` option accepts a string (`'*/15 * * * *'`) or object (`{ expression, timezone? }`) +- `CronConfig`: `{ expression: string, timezone?: string }` +- `ScheduleContext`: `{ timestamp: Date, lastTimestamp: Date | undefined, timezone: string }` — available as `schedule` on workflow context for cron-triggered runs. Built at runtime from `run.createdAt` and latest completed run. +- `WorkflowRun` stores `cron: string | null` and `timezone: string | null` as flat columns (no nested JSON) +- Cron-triggered runs are identified by `cron !== null` on the `WorkflowRun` +- Post-start registration: `registerWorkflow()` after `start()` automatically sets up cron schedule + ## AI & Agent Workflows pg-workflows is ideal for AI agents and LLM pipelines. Key patterns: diff --git a/AGENTS.md b/AGENTS.md index a064a90..3756617 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -29,7 +29,8 @@ src/ │ └── migration.ts # Schema migrations └── tests/ # Test utilities examples/ -└── basic.ts # Example usage +├── basic.ts # Example usage +└── cron.ts # Cron workflow example ``` ## Commands @@ -66,13 +67,16 @@ import { z } from 'zod'; const myWorkflow = workflow( 'workflow-id', // unique string ID - async ({ step, input, runId, workflowId, timeline, logger }) => { + async ({ step, input, runId, workflowId, timeline, logger, schedule }) => { // workflow body with step calls + // `schedule` is populated for cron-triggered runs }, { inputSchema: z.object({ /* ... */ }), // optional Zod schema timeout: 60000, // optional, milliseconds retries: 3, // optional, max retry count + cron: '*/15 * * * *', // cron expression string (UTC) + // or: cron: { expression: '*/15 * * * *', timezone: 'America/New_York' }, } ); ``` @@ -202,6 +206,8 @@ type WorkflowRun = { retryCount: number; maxRetries: number; jobId: string | null; + cron: string | null; + timezone: string | null; }; type WorkflowRunProgress = WorkflowRun & { @@ -227,6 +233,57 @@ class WorkflowRunNotFoundError extends WorkflowEngineError {} | `WORKFLOW_RUN_WORKERS` | Number of worker processes | `3` | | `WORKFLOW_RUN_EXPIRE_IN_SECONDS` | Job expiration time in seconds | `300` | +## Cron Workflows + +Workflows can be scheduled to run on a cron expression using the `cron` option. The engine uses pg-boss `schedule()` under the hood. + +### CronConfig + +The `cron` option accepts a string or an object: + +```typescript +// String shorthand (defaults to UTC) +cron: '*/15 * * * *' + +// Object with explicit timezone +cron: { expression: '*/15 * * * *', timezone: 'America/New_York' } + +type CronConfig = { + expression: string; // standard cron expression + timezone?: string; // IANA timezone, defaults to 'UTC' +}; +``` + +### ScheduleContext + +Cron-triggered runs receive a `schedule` object on the workflow context. The `timestamp` is derived from `run.createdAt` and `lastTimestamp` is queried from the latest completed run at execution time. + +```typescript +type ScheduleContext = { + timestamp: Date; // when this cron trigger fired + lastTimestamp: Date | undefined; // when the last successful cron run completed + timezone: string; // the configured timezone +}; +``` + +### Cron workflow example + +```typescript +const sync = workflow('sync-data', async ({ step, schedule, logger }) => { + const since = schedule?.lastTimestamp ?? new Date(0); + const data = await step.run('fetch', async () => fetchSince(since)); + await step.run('write', async () => writeToDB(data)); + return { synced: data.length }; +}, { + cron: '*/15 * * * *', // every 15 minutes, UTC + retries: 3, +}); +``` + +### Post-start registration + +Calling `engine.registerWorkflow()` after `engine.start()` will automatically set up the cron schedule if the workflow has a `cron` config. + ## AI & Agent Workflow Patterns pg-workflows is well-suited for AI agents and LLM pipelines because LLM calls are slow, expensive, and unreliable - exactly the work that benefits most from durable execution. diff --git a/CLAUDE.md b/CLAUDE.md index 2b1c8bf..f6db7cb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -29,7 +29,8 @@ src/ │ └── migration.ts # Schema migrations └── tests/ # Test utilities examples/ -└── basic.ts # Example usage +├── basic.ts # Example usage +└── cron.ts # Cron workflow example ``` ## Commands @@ -67,13 +68,16 @@ import { z } from 'zod'; const myWorkflow = workflow( 'workflow-id', // unique string ID - async ({ step, input, runId, workflowId, timeline, logger }) => { + async ({ step, input, runId, workflowId, timeline, logger, schedule }) => { // workflow body with step calls + // `schedule` is populated for cron-triggered runs }, { inputSchema: z.object({ /* ... */ }), // optional Zod schema timeout: 60000, // optional, milliseconds retries: 3, // optional, max retry count + cron: '*/15 * * * *', // cron expression string (UTC) + // or: cron: { expression: '*/15 * * * *', timezone: 'America/New_York' }, } ); ``` @@ -239,6 +243,8 @@ type WorkflowRun = { retryCount: number; maxRetries: number; jobId: string | null; + cron: string | null; + timezone: string | null; }; type WorkflowRunProgress = WorkflowRun & { @@ -264,6 +270,57 @@ class WorkflowRunNotFoundError extends WorkflowEngineError {} | `WORKFLOW_RUN_WORKERS` | Number of worker processes | `3` | | `WORKFLOW_RUN_EXPIRE_IN_SECONDS` | Job expiration time in seconds | `300` | +## Cron Workflows + +Workflows can be scheduled to run on a cron expression using the `cron` option. The engine uses pg-boss `schedule()` under the hood. + +### CronConfig + +The `cron` option accepts a string or an object: + +```typescript +// String shorthand (defaults to UTC) +cron: '*/15 * * * *' + +// Object with explicit timezone +cron: { expression: '*/15 * * * *', timezone: 'America/New_York' } + +type CronConfig = { + expression: string; // standard cron expression + timezone?: string; // IANA timezone, defaults to 'UTC' +}; +``` + +### ScheduleContext + +Cron-triggered runs receive a `schedule` object on the workflow context. The `timestamp` is derived from `run.createdAt` and `lastTimestamp` is queried from the latest completed run at execution time. + +```typescript +type ScheduleContext = { + timestamp: Date; // when this cron trigger fired + lastTimestamp: Date | undefined; // when the last successful cron run completed + timezone: string; // the configured timezone +}; +``` + +### Cron workflow example + +```typescript +const sync = workflow('sync-data', async ({ step, schedule, logger }) => { + const since = schedule?.lastTimestamp ?? new Date(0); + const data = await step.run('fetch', async () => fetchSince(since)); + await step.run('write', async () => writeToDB(data)); + return { synced: data.length }; +}, { + cron: '*/15 * * * *', // every 15 minutes, UTC + retries: 3, +}); +``` + +### Post-start registration + +Calling `engine.registerWorkflow()` after `engine.start()` will automatically set up the cron schedule if the workflow has a `cron` config. + ## AI & Agent Workflow Patterns pg-workflows is well-suited for AI agents and LLM pipelines because LLM calls are slow, expensive, and unreliable - exactly the work that benefits most from durable execution. diff --git a/GEMINI.md b/GEMINI.md index 24b28d9..6b6dd50 100644 --- a/GEMINI.md +++ b/GEMINI.md @@ -29,7 +29,8 @@ src/ │ └── migration.ts # Schema migrations └── tests/ # Test utilities examples/ -└── basic.ts # Example usage +├── basic.ts # Example usage +└── cron.ts # Cron workflow example ``` ## Commands @@ -67,13 +68,16 @@ import { z } from 'zod'; const myWorkflow = workflow( 'workflow-id', // unique string ID - async ({ step, input, runId, workflowId, timeline, logger }) => { + async ({ step, input, runId, workflowId, timeline, logger, schedule }) => { // workflow body with step calls + // `schedule` is populated for cron-triggered runs }, { inputSchema: z.object({ /* ... */ }), // optional Zod schema timeout: 60000, // optional, milliseconds retries: 3, // optional, max retry count + cron: '*/15 * * * *', // cron expression string (UTC) + // or: cron: { expression: '*/15 * * * *', timezone: 'America/New_York' }, } ); ``` @@ -239,6 +243,8 @@ type WorkflowRun = { retryCount: number; maxRetries: number; jobId: string | null; + cron: string | null; + timezone: string | null; }; type WorkflowRunProgress = WorkflowRun & { @@ -264,6 +270,57 @@ class WorkflowRunNotFoundError extends WorkflowEngineError {} | `WORKFLOW_RUN_WORKERS` | Number of worker processes | `3` | | `WORKFLOW_RUN_EXPIRE_IN_SECONDS` | Job expiration time in seconds | `300` | +## Cron Workflows + +Workflows can be scheduled to run on a cron expression using the `cron` option. The engine uses pg-boss `schedule()` under the hood. + +### CronConfig + +The `cron` option accepts a string or an object: + +```typescript +// String shorthand (defaults to UTC) +cron: '*/15 * * * *' + +// Object with explicit timezone +cron: { expression: '*/15 * * * *', timezone: 'America/New_York' } + +type CronConfig = { + expression: string; // standard cron expression + timezone?: string; // IANA timezone, defaults to 'UTC' +}; +``` + +### ScheduleContext + +Cron-triggered runs receive a `schedule` object on the workflow context. The `timestamp` is derived from `run.createdAt` and `lastTimestamp` is queried from the latest completed run at execution time. + +```typescript +type ScheduleContext = { + timestamp: Date; // when this cron trigger fired + lastTimestamp: Date | undefined; // when the last successful cron run completed + timezone: string; // the configured timezone +}; +``` + +### Cron workflow example + +```typescript +const sync = workflow('sync-data', async ({ step, schedule, logger }) => { + const since = schedule?.lastTimestamp ?? new Date(0); + const data = await step.run('fetch', async () => fetchSince(since)); + await step.run('write', async () => writeToDB(data)); + return { synced: data.length }; +}, { + cron: '*/15 * * * *', // every 15 minutes, UTC + retries: 3, +}); +``` + +### Post-start registration + +Calling `engine.registerWorkflow()` after `engine.start()` will automatically set up the cron schedule if the workflow has a `cron` config. + ## AI & Agent Workflow Patterns pg-workflows is well-suited for AI agents and LLM pipelines because LLM calls are slow, expensive, and unreliable - exactly the work that benefits most from durable execution. diff --git a/bun.lock b/bun.lock index c3d221a..2942b67 100644 --- a/bun.lock +++ b/bun.lock @@ -5,6 +5,7 @@ "": { "name": "pg-workflows", "dependencies": { + "cron-parser": "^5.5.0", "es-toolkit": "^1.44.0", "ksuid": "^3.0.0", "parse-duration": "^2.1.5", diff --git a/examples/cron.ts b/examples/cron.ts new file mode 100644 index 0000000..81782f6 --- /dev/null +++ b/examples/cron.ts @@ -0,0 +1,84 @@ +import pg from 'pg'; +import { PgBoss } from 'pg-boss'; +import { WorkflowEngine, workflow } from '../src/'; + +// A cron workflow that syncs data on a schedule. +// The `schedule` context tells you when this run was triggered +// and when the last successful run completed — useful for +// incremental syncs ("give me everything since last time"). + +const syncOrders = workflow( + 'sync-orders', + async ({ step, schedule, logger }) => { + const since = schedule?.lastTimestamp ?? new Date(0); + logger.log(`Syncing orders changed since ${since.toISOString()}`); + + const orders = await step.run('fetch-new-orders', async () => { + // In a real app, query your source system: + // SELECT * FROM orders WHERE updated_at > $1 + logger.log(`Fetching orders updated after ${since.toISOString()}...`); + return [ + { id: 'ord_1', total: 99.0 }, + { id: 'ord_2', total: 149.5 }, + ]; + }); + + await step.run('write-to-warehouse', async () => { + logger.log(`Writing ${orders.length} orders to data warehouse...`); + return { written: orders.length }; + }); + + await step.run('update-metrics', async () => { + const total = orders.reduce((sum, o) => sum + o.total, 0); + logger.log(`Updated revenue metric: +$${total.toFixed(2)}`); + return { revenue: total }; + }); + + return { + synced: orders.length, + since: since.toISOString(), + triggeredAt: schedule?.timestamp.toISOString(), + timezone: schedule?.timezone, + }; + }, + { + cron: { expression: '* * * * *', timezone: 'Europe/Athens' }, + retries: 3, + }, +); + +async function main() { + const DATABASE_URL = process.env.DATABASE_URL ?? 'postgres://localhost:5432/pg_workflows_example'; + + const pool = new pg.Pool({ connectionString: DATABASE_URL }); + const boss = new PgBoss({ + db: { executeSql: (text, values) => pool.query(text, values) }, + }); + + const engine = new WorkflowEngine({ + boss, + workflows: [syncOrders], + }); + + // engine.start() registers the cron schedule with pg-boss. + // On each trigger, the engine automatically populates + // schedule.timestamp, schedule.lastTimestamp, and schedule.timezone + // before invoking the workflow handler. + await engine.start(); + + console.log('Cron workflow registered. Waiting for triggers...'); + console.log('Press Ctrl+C to stop.\n'); + + // Graceful shutdown + process.on('SIGINT', async () => { + console.log('\nShutting down...'); + await engine.stop(); + await pool.end(); + process.exit(0); + }); +} + +main().catch((err) => { + console.error('Example failed:', err); + process.exit(1); +}); diff --git a/examples/package.json b/examples/package.json index 724d034..7273e0c 100644 --- a/examples/package.json +++ b/examples/package.json @@ -9,7 +9,8 @@ "example:basic": "npm run build:lib && dotenvx run -- tsx basic.ts", "example:approval-flow": "npm run build:lib && dotenvx run -- tsx approval-flow.ts", "example:timeout": "npm run build:lib && dotenvx run -- tsx timeout.ts", - "example:polling": "npm run build:lib && dotenvx run -- tsx polling.ts" + "example:polling": "npm run build:lib && dotenvx run -- tsx polling.ts", + "example:cron": "npm run build:lib && dotenvx run -- tsx cron.ts" }, "dependencies": { "pg": "^8.13.1", diff --git a/package.json b/package.json index 2ef7ebf..c83bda6 100644 --- a/package.json +++ b/package.json @@ -67,6 +67,7 @@ "registry": "https://registry.npmjs.org/" }, "dependencies": { + "cron-parser": "^5.5.0", "es-toolkit": "^1.44.0", "ksuid": "^3.0.0", "parse-duration": "^2.1.5", diff --git a/src/db/migration.ts b/src/db/migration.ts index cb29d05..834368c 100644 --- a/src/db/migration.ts +++ b/src/db/migration.ts @@ -5,7 +5,7 @@ export async function runMigrations(db: Db): Promise { ` SELECT EXISTS ( SELECT FROM information_schema.tables - WHERE table_schema = 'public' + WHERE table_schema = current_schema() AND table_name = 'workflow_runs' ); `, @@ -33,7 +33,9 @@ export async function runMigrations(db: Db): Promise { timeout_at timestamp with time zone, retry_count integer DEFAULT 0 NOT NULL, max_retries integer DEFAULT 0 NOT NULL, - job_id varchar(256) + job_id varchar(256), + cron text, + timezone text ); `, [], @@ -59,5 +61,46 @@ export async function runMigrations(db: Db): Promise { `, [], ); + + await db.executeSql( + `CREATE INDEX idx_workflow_runs_cron_completed + ON workflow_runs (workflow_id, completed_at DESC) + WHERE cron IS NOT NULL AND status = 'completed';`, + [], + ); + } + + // Migration: add cron and timezone columns for existing tables + const cronColumnExists = await db.executeSql( + `SELECT EXISTS ( + SELECT FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = 'workflow_runs' + AND column_name = 'cron' + );`, + [], + ); + + if (!cronColumnExists.rows[0]?.exists) { + await db.executeSql(`ALTER TABLE workflow_runs ADD COLUMN cron text;`, []); + await db.executeSql(`ALTER TABLE workflow_runs ADD COLUMN timezone text;`, []); + } + + // Migration: add cron-specific indexes + const cronCompletedIndexExists = await db.executeSql( + `SELECT EXISTS ( + SELECT FROM pg_indexes + WHERE indexname = 'idx_workflow_runs_cron_completed' + );`, + [], + ); + + if (!cronCompletedIndexExists.rows[0]?.exists) { + await db.executeSql( + `CREATE INDEX idx_workflow_runs_cron_completed + ON workflow_runs (workflow_id, completed_at DESC) + WHERE cron IS NOT NULL AND status = 'completed';`, + [], + ); } } diff --git a/src/db/queries.ts b/src/db/queries.ts index fbb298b..8d4433b 100644 --- a/src/db/queries.ts +++ b/src/db/queries.ts @@ -25,6 +25,8 @@ type WorkflowRunRow = { retry_count: number; max_retries: number; job_id: string | null; + cron: string | null; + timezone: string | null; }; function mapRowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { @@ -52,6 +54,8 @@ function mapRowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { retryCount: row.retry_count, maxRetries: row.max_retries, jobId: row.job_id, + cron: row.cron, + timezone: row.timezone, }; } @@ -64,6 +68,8 @@ export async function insertWorkflowRun( input, maxRetries, timeoutAt, + cron, + timezone, }: { resourceId?: string; workflowId: string; @@ -72,6 +78,8 @@ export async function insertWorkflowRun( input: unknown; maxRetries: number; timeoutAt: Date | null; + cron?: string; + timezone?: string; }, db: Db, ): Promise { @@ -80,20 +88,22 @@ export async function insertWorkflowRun( const result = await db.executeSql( `INSERT INTO workflow_runs ( - id, - resource_id, - workflow_id, - current_step_id, - status, - input, - max_retries, + id, + resource_id, + workflow_id, + current_step_id, + status, + input, + max_retries, timeout_at, created_at, updated_at, timeline, - retry_count + retry_count, + cron, + timezone ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) RETURNING *`, [ runId, @@ -108,6 +118,8 @@ export async function insertWorkflowRun( now, '{}', 0, + cron ?? null, + timezone ?? null, ], ); @@ -155,6 +167,26 @@ export async function getWorkflowRun( return mapRowToWorkflowRun(run); } +export async function getWorkflowLastRun( + workflowId: string, + db: Db, +): Promise { + const result = await db.executeSql( + `SELECT * FROM workflow_runs + WHERE workflow_id = $1 AND status = 'completed' + ORDER BY completed_at DESC + LIMIT 1`, + [workflowId], + ); + + const row = result.rows[0]; + if (!row) { + return undefined; + } + + return mapRowToWorkflowRun(row); +} + export async function updateWorkflowRun( { runId, diff --git a/src/db/types.ts b/src/db/types.ts index e0a5db3..43da05c 100644 --- a/src/db/types.ts +++ b/src/db/types.ts @@ -17,4 +17,6 @@ export type WorkflowRun = { retryCount: number; maxRetries: number; jobId: string | null; + cron: string | null; + timezone: string | null; }; diff --git a/src/definition.ts b/src/definition.ts index 4cc6af8..351f433 100644 --- a/src/definition.ts +++ b/src/definition.ts @@ -14,13 +14,14 @@ function createWorkflowFactory( const factory = (( id: string, handler: (context: WorkflowContext) => Promise, - { inputSchema, timeout, retries }: WorkflowOptions = {}, + { inputSchema, timeout, retries, cron }: WorkflowOptions = {}, ): WorkflowDefinition => ({ id, handler, inputSchema, timeout, retries, + cron: typeof cron === 'string' ? { expression: cron } : cron, plugins: plugins.length > 0 ? (plugins as WorkflowPlugin[]) : undefined, })) as WorkflowFactory; diff --git a/src/engine.test.ts b/src/engine.test.ts index 2f33166..d7b23af 100644 --- a/src/engine.test.ts +++ b/src/engine.test.ts @@ -7,7 +7,7 @@ import { WorkflowEngine } from './engine'; import { WorkflowEngineError, WorkflowRunNotFoundError } from './error'; import { getBoss } from './tests/pgboss'; import { closeTestDatabase, createTestDatabase } from './tests/test-db'; -import type { StepBaseContext, WorkflowPlugin } from './types'; +import type { ScheduleContext, StepBaseContext, WorkflowPlugin } from './types'; import { WorkflowStatus } from './types'; let testBoss: PgBoss; @@ -413,6 +413,31 @@ describe('WorkflowEngine', () => { ).rejects.toThrow(WorkflowEngineError); }); + it('should run a workflow without resourceId', async () => { + const noResourceWf = workflow('no-resource-wf', async ({ step }) => { + return await step.run('step-1', async () => { + return { result: 'no-resource' }; + }); + }); + await engine.registerWorkflow(noResourceWf); + + const run = await engine.startWorkflow({ + workflowId: 'no-resource-wf', + input: {}, + }); + + await expect + .poll(async () => { + const r = await engine.getRun({ runId: run.id }); + return r.status; + }) + .toBe('completed'); + + const completedRun = await engine.getRun({ runId: run.id }); + expect(completedRun.output).toEqual({ result: 'no-resource' }); + expect(completedRun.resourceId).toBeNull(); + }); + it('should throw error for workflow without steps', async () => { const emptyWorkflow = workflow('empty-workflow', async () => {}); @@ -1359,6 +1384,552 @@ describe('WorkflowEngine', () => { }); }); + describe('cron workflows', () => { + it('should throw error when registering workflow with invalid cron expression', async () => { + const engine = new WorkflowEngine({ + pool: testPool, + boss: testBoss, + }); + await engine.start(false); + + const invalidCronWf = workflow( + 'invalid-cron-wf', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '60 * * * *' }, + }, + ); + + await expect(engine.registerWorkflow(invalidCronWf)).rejects.toThrow(WorkflowEngineError); + await expect(engine.registerWorkflow(invalidCronWf)).rejects.toThrow(/invalid cron/i); + + await engine.stop(); + }); + + it('should normalize string cron shorthand to CronConfig', async () => { + const engine = new WorkflowEngine({ + pool: testPool, + boss: testBoss, + }); + await engine.start(false); + + const stringWf = workflow( + 'string-cron-wf', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: '*/5 * * * *', + }, + ); + + await engine.registerWorkflow(stringWf); + const registered = engine.workflows.get('string-cron-wf'); + expect(registered?.cron?.expression).toBe('*/5 * * * *'); + expect(registered?.cron?.timezone).toBeUndefined(); + + await engine.stop(); + }); + + it('should pass through standard cron expressions unchanged', async () => { + const engine = new WorkflowEngine({ + pool: testPool, + boss: testBoss, + }); + await engine.start(false); + + const standardWf = workflow( + 'standard-cron-wf', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: '*/15 * * * *', + }, + ); + + await engine.registerWorkflow(standardWf); + const registered = engine.workflows.get('standard-cron-wf'); + expect(registered?.cron?.expression).toBe('*/15 * * * *'); + + await engine.stop(); + }); + + it('should throw error when cron workflow has inputSchema that rejects empty input', async () => { + const engine = new WorkflowEngine({ + pool: testPool, + boss: testBoss, + }); + await engine.start(false); + + const cronWithSchema = workflow( + 'cron-bad-schema-wf', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '* * * * *' }, + inputSchema: z.object({ required: z.string() }), + }, + ); + + await expect(engine.registerWorkflow(cronWithSchema)).rejects.toThrow(WorkflowEngineError); + await expect(engine.registerWorkflow(cronWithSchema)).rejects.toThrow(/rejects empty input/i); + + await engine.stop(); + }); + + it('should include cron fields in workflow definition', async () => { + const cronWf = workflow( + 'cron-test', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '*/5 * * * *', timezone: 'America/New_York' }, + }, + ); + + expect(cronWf.cron).toEqual({ expression: '*/5 * * * *', timezone: 'America/New_York' }); + }); + + it('should register cron schedule on engine start', async () => { + const cronWf = workflow( + 'cron-schedule-test', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '*/5 * * * *', timezone: 'America/Chicago' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronWf], + pool: testPool, + boss: testBoss, + }); + + const scheduleSpy = vi.spyOn(testBoss, 'schedule'); + const createQueueSpy = vi.spyOn(testBoss, 'createQueue'); + + await engine.start(); + + expect(createQueueSpy).toHaveBeenCalledWith('cron-schedule-test'); + expect(scheduleSpy).toHaveBeenCalledWith('cron-schedule-test', '*/5 * * * *', null, { + tz: 'America/Chicago', + }); + + scheduleSpy.mockRestore(); + createQueueSpy.mockRestore(); + await engine.stop(); + }); + + it('should default cronTimezone to UTC', async () => { + const cronWf = workflow( + 'cron-utc-test', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '0 * * * *' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronWf], + pool: testPool, + boss: testBoss, + }); + + const scheduleSpy = vi.spyOn(testBoss, 'schedule'); + + await engine.start(); + + expect(scheduleSpy).toHaveBeenCalledWith('cron-utc-test', '0 * * * *', null, { tz: 'UTC' }); + + scheduleSpy.mockRestore(); + await engine.stop(); + }); + + it('should default timezone to undefined when not provided', async () => { + const cronWf = workflow( + 'cron-test-no-tz', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '0 * * * *' }, + }, + ); + + expect(cronWf.cron?.expression).toBe('0 * * * *'); + expect(cronWf.cron?.timezone).toBeUndefined(); + }); + + it('should unschedule cron workflows on engine stop', async () => { + const cronWf = workflow( + 'cron-stop-test', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '0 3 * * *' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + const unscheduleSpy = vi.spyOn(testBoss, 'unschedule'); + + await engine.stop(); + + expect(unscheduleSpy).toHaveBeenCalledWith('cron-stop-test'); + + unscheduleSpy.mockRestore(); + }); + + it('should execute a cron workflow when triggered', async () => { + let executionCount = 0; + + const cronWf = workflow( + 'cron-e2e-test', + async ({ step }) => { + return await step.run('increment', async () => { + executionCount++; + return { count: executionCount }; + }); + }, + { + cron: { expression: '* * * * *' }, // every minute (won't actually wait — we trigger manually) + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + // Manually start the workflow as if cron triggered it + const run = await engine.startWorkflow({ + workflowId: 'cron-e2e-test', + input: {}, + }); + + await expect + .poll(async () => { + const r = await engine.getRun({ runId: run.id }); + return r.status; + }) + .toBe('completed'); + + const completedRun = await engine.getRun({ runId: run.id }); + expect(completedRun.output).toEqual({ count: 1 }); + expect(completedRun.resourceId).toBeNull(); + + await engine.stop(); + }); + + it('should have schedule as undefined for non-cron workflow runs', async () => { + let capturedSchedule: unknown = 'not-set'; + + const regularWf = workflow('regular-no-schedule-wf', async ({ step, schedule }) => { + capturedSchedule = schedule; + return await step.run('work', async () => ({ done: true })); + }); + + const engine = new WorkflowEngine({ + workflows: [regularWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + const run = await engine.startWorkflow({ + workflowId: 'regular-no-schedule-wf', + input: {}, + }); + + await expect + .poll(async () => { + const r = await engine.getRun({ runId: run.id }); + return r.status; + }) + .toBe('completed'); + + expect(capturedSchedule).toBeUndefined(); + + await engine.stop(); + }); + + it('should not expose schedule context on public startWorkflow API', async () => { + let capturedSchedule: ScheduleContext | undefined; + + const cronWf = workflow( + 'cron-no-public-schedule-test', + async ({ step, schedule }) => { + capturedSchedule = schedule; + return await step.run('work', async () => ({ done: true })); + }, + { + cron: { expression: '* * * * *', timezone: 'America/Chicago' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + // Public startWorkflow should NOT pass schedule context + const run = await engine.startWorkflow({ + workflowId: 'cron-no-public-schedule-test', + input: {}, + }); + + await expect + .poll(async () => { + const r = await engine.getRun({ runId: run.id }); + return r.status; + }) + .toBe('completed'); + + // Schedule should be undefined when started via public API + expect(capturedSchedule).toBeUndefined(); + + await engine.stop(); + }); + + it('should preserve schedule context across retries', async () => { + let attemptCount = 0; + let capturedSchedule: ScheduleContext | undefined; + + const cronRetryWf = workflow( + 'cron-retry-schedule-test', + async ({ step, schedule }) => { + capturedSchedule = schedule; + return await step.run('work', async () => { + attemptCount++; + if (attemptCount < 2) { + throw new Error('transient failure'); + } + return { done: true }; + }); + }, + { + cron: { expression: '* * * * *', timezone: 'America/Chicago' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronRetryWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + const run = await engine.startWorkflow({ + workflowId: 'cron-retry-schedule-test', + input: {}, + options: { retries: 3 }, + }); + + await expect + .poll( + async () => { + const r = await engine.getRun({ runId: run.id }); + return r.status; + }, + { timeout: 10000 }, + ) + .toBe('completed'); + + // API-triggered run: schedule should be undefined, confirming + // it's derived from the run record (which has null cron column for API runs) + expect(capturedSchedule).toBeUndefined(); + + // Verify the run completed after retry (attemptCount should be 2) + expect(attemptCount).toBe(2); + + await engine.stop(); + }); + + it('should not pollute lastTimestamp with API-triggered runs', async () => { + const capturedSchedules: Array = []; + + const cronWf = workflow( + 'cron-lasttimestamp-v2-test', + async ({ step, schedule }) => { + capturedSchedules.push(schedule); + return await step.run('work', async () => ({ done: true })); + }, + { + cron: { expression: '* * * * *', timezone: 'UTC' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + // Start via public API — no cron field + const run1 = await engine.startWorkflow({ + workflowId: 'cron-lasttimestamp-v2-test', + input: {}, + }); + await expect + .poll(async () => { + const r = await engine.getRun({ runId: run1.id }); + return r.status; + }) + .toBe('completed'); + + // API-triggered run should have no schedule + expect(capturedSchedules[0]).toBeUndefined(); + + await engine.stop(); + }); + + it('should set up cron schedule when registering workflow after engine start', async () => { + const engine = new WorkflowEngine({ + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + const scheduleSpy = vi.spyOn(testBoss, 'schedule'); + const createQueueSpy = vi.spyOn(testBoss, 'createQueue'); + + const cronWf = workflow( + 'cron-post-start-test', + async ({ step }) => { + await step.run('step-1', async () => 'done'); + }, + { + cron: { expression: '0 */2 * * *', timezone: 'Europe/London' }, + }, + ); + + await engine.registerWorkflow(cronWf); + + expect(createQueueSpy).toHaveBeenCalledWith('cron-post-start-test'); + expect(scheduleSpy).toHaveBeenCalledWith('cron-post-start-test', '0 */2 * * *', null, { + tz: 'Europe/London', + }); + + scheduleSpy.mockRestore(); + createQueueSpy.mockRestore(); + await engine.stop(); + }); + + it('should distinguish api and cron runs via cron field', async () => { + const cronFilterWf = workflow( + 'cron-filter-test', + async ({ step }) => { + return await step.run('work', async () => ({ done: true })); + }, + { + cron: { expression: '* * * * *' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronFilterWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + // Start via public API — cron field should be null + const apiRun = await engine.startWorkflow({ + workflowId: 'cron-filter-test', + input: {}, + }); + + await expect + .poll(async () => { + const r = await engine.getRun({ runId: apiRun.id }); + return r.status; + }) + .toBe('completed'); + + const completedRun = await engine.getRun({ runId: apiRun.id }); + expect(completedRun.cron).toBeNull(); + + await engine.stop(); + }); + + it('should execute cron workflow through actual worker path with schedule context', async () => { + let capturedSchedule: ScheduleContext | undefined; + + const cronIntegrationWf = workflow( + 'cron-integration-test', + async ({ step, schedule }) => { + capturedSchedule = schedule; + return await step.run('work', async () => ({ done: true })); + }, + { + cron: { expression: '* * * * *', timezone: 'America/New_York' }, + }, + ); + + const engine = new WorkflowEngine({ + workflows: [cronIntegrationWf], + pool: testPool, + boss: testBoss, + }); + await engine.start(); + + // Simulate what pg-boss schedule does: send a job to the workflow's own queue + await testBoss.send('cron-integration-test', {}); + + // Wait for a cron-triggered run to complete + await expect + .poll( + async () => { + const runs = await engine.getRuns({ + workflowId: 'cron-integration-test', + statuses: [WorkflowStatus.COMPLETED], + }); + return runs.items.length; + }, + { timeout: 10000 }, + ) + .toBeGreaterThanOrEqual(1); + + // Verify schedule context was populated + expect(capturedSchedule).toBeDefined(); + expect(capturedSchedule?.timestamp).toBeInstanceOf(Date); + expect(capturedSchedule?.timezone).toBe('America/New_York'); + // First cron run — no previous completion + expect(capturedSchedule?.lastTimestamp).toBeUndefined(); + + // Verify the run record has cron and timezone columns + const cronRuns = await engine.getRuns({ + workflowId: 'cron-integration-test', + }); + const cronRun = cronRuns.items.find((r) => r.cron !== null); + expect(cronRun?.cron).toBe('* * * * *'); + expect(cronRun?.timezone).toBe('America/New_York'); + + await engine.stop(); + }); + }); + describe('getRun() and getRuns()', () => { let engine: WorkflowEngine; diff --git a/src/engine.ts b/src/engine.ts index da6e2a5..b099d71 100644 --- a/src/engine.ts +++ b/src/engine.ts @@ -1,10 +1,12 @@ +import { CronExpressionParser } from 'cron-parser'; import { merge } from 'es-toolkit'; import pg from 'pg'; -import { type Db, type Job, PgBoss } from 'pg-boss'; +import { type Db, type Job, type JobWithMetadata, PgBoss } from 'pg-boss'; import type { z } from 'zod'; import { parseWorkflowHandler } from './ast-parser'; import { runMigrations } from './db/migration'; import { + getWorkflowLastRun, getWorkflowRun, getWorkflowRuns, insertWorkflowRun, @@ -18,6 +20,7 @@ import { WorkflowEngineError, WorkflowRunNotFoundError } from './error'; import { type InferInputParameters, type InputParameters, + type ScheduleContext, type StepBaseContext, StepType, type WorkflowContext, @@ -163,6 +166,19 @@ export class WorkflowEngine { `Worker ${i + 1}/${numWorkers} started for queue ${WORKFLOW_RUN_QUEUE_NAME}`, ); } + + for (const wf of this.workflows.values()) { + if (wf.cron) { + try { + await this.scheduleCronWorkflow(wf); + } catch (error) { + this.logger.error( + `Failed to set up cron schedule for "${wf.id}", skipping`, + error instanceof Error ? error : new Error(String(error)), + ); + } + } + } } this._started = true; @@ -171,6 +187,12 @@ export class WorkflowEngine { } async stop(): Promise { + for (const wf of this.workflows.values()) { + if (wf.cron) { + await this.boss.unschedule(wf.id); + } + } + await this.boss.stop(); if (this._ownsPool) { @@ -196,6 +218,27 @@ export class WorkflowEngine { definition.handler as (context: WorkflowContext) => Promise, ); + if (definition.cron) { + try { + CronExpressionParser.parse(definition.cron.expression, { tz: definition.cron.timezone }); + } catch (e) { + throw new WorkflowEngineError( + `Invalid cron expression "${definition.cron.expression}" for workflow "${definition.id}": ${e instanceof Error ? e.message : String(e)}`, + definition.id, + ); + } + + if (definition.inputSchema) { + const result = definition.inputSchema.safeParse({}); + if (!result.success) { + throw new WorkflowEngineError( + `Cron workflow "${definition.id}" has an inputSchema that rejects empty input. Cron-triggered runs always use {} as input.`, + definition.id, + ); + } + } + } + this.workflows.set(definition.id, { ...definition, steps, @@ -210,6 +253,13 @@ export class WorkflowEngine { this.logger.log(` └─ (${StepTypeToIcon[step.type]}) ${step.id} ${tags.join(' ')}`); } + if (this._started && definition.cron) { + const internalDef = this.workflows.get(definition.id); + if (internalDef) { + await this.scheduleCronWorkflow(internalDef); + } + } + return this; } @@ -223,6 +273,41 @@ export class WorkflowEngine { return this; } + private async buildScheduleContext(run: WorkflowRun): Promise { + const lastRun = await getWorkflowLastRun(run.workflowId, this.boss.getDb()); + return { + timestamp: run.createdAt, + lastTimestamp: lastRun?.completedAt ?? undefined, + timezone: run.timezone ?? 'UTC', + }; + } + + private async scheduleCronWorkflow(wf: WorkflowInternalDefinition): Promise { + if (!wf.cron) return; + + await this.boss.createQueue(wf.id); + await this.boss.schedule(wf.id, wf.cron.expression, null, { + tz: wf.cron.timezone ?? 'UTC', + }); + await this.boss.work(wf.id, { includeMetadata: true }, async ([_job]: JobWithMetadata[]) => { + try { + await this._createWorkflowRun({ + workflowId: wf.id, + input: {}, + cron: wf.cron?.expression, + timezone: wf.cron?.timezone, + }); + } catch (error) { + this.logger.error( + `Cron trigger failed for workflow "${wf.id}"`, + error instanceof Error ? error : new Error(String(error)), + ); + throw error; + } + }); + this.logger.log(`Cron schedule registered for ${wf.id}: ${wf.cron.expression}`); + } + async startWorkflow({ resourceId, workflowId, @@ -238,6 +323,34 @@ export class WorkflowEngine { expireInSeconds?: number; batchSize?: number; }; + }): Promise { + return this._createWorkflowRun({ + resourceId, + workflowId, + input, + options, + }); + } + + private async _createWorkflowRun({ + resourceId, + workflowId, + input, + options, + cron, + timezone, + }: { + resourceId?: string; + workflowId: string; + input: unknown; + cron?: string; + timezone?: string; + options?: { + timeout?: number; + retries?: number; + expireInSeconds?: number; + batchSize?: number; + }; }): Promise { if (!this._started) { await this.start(false, { batchSize: options?.batchSize ?? 1 }); @@ -280,6 +393,8 @@ export class WorkflowEngine { input, maxRetries: options?.retries ?? workflow.retries ?? 0, timeoutAt, + cron, + timezone, }, _db, ); @@ -408,7 +523,7 @@ export class WorkflowEngine { }, }; - this.boss.send(WORKFLOW_RUN_QUEUE_NAME, job, { + await this.boss.send(WORKFLOW_RUN_QUEUE_NAME, job, { expireInSeconds: options?.expireInSeconds ?? defaultExpireInSeconds, }); @@ -508,10 +623,6 @@ export class WorkflowEngine { throw new WorkflowEngineError('Invalid workflow run job, missing runId', workflowId); } - if (!resourceId) { - throw new WorkflowEngineError('Invalid workflow run job, missing resourceId', workflowId); - } - if (!workflowId) { throw new WorkflowEngineError( 'Invalid workflow run job, missing workflowId', @@ -532,6 +643,10 @@ export class WorkflowEngine { let run = await this.getRun({ runId, resourceId }); + const schedule: ScheduleContext | undefined = run.cron + ? await this.buildScheduleContext(run) + : undefined; + try { if (run.status === WorkflowStatus.CANCELLED) { this.logger.log(`Workflow run ${runId} is cancelled, skipping`); @@ -687,6 +802,7 @@ export class WorkflowEngine { runId: run.id, timeline: run.timeline, logger: this.logger, + schedule, step, }; diff --git a/src/types.ts b/src/types.ts index e482364..5c3ff93 100644 --- a/src/types.ts +++ b/src/types.ts @@ -25,10 +25,22 @@ export type InferInputParameters

= P extends z.ZodTyp ? z.infer

: never; +export type ScheduleContext = { + timestamp: Date; + lastTimestamp: Date | undefined; + timezone: string; +}; + +export type CronConfig = { + expression: string; + timezone?: string; +}; + export type WorkflowOptions = { timeout?: number; retries?: number; inputSchema?: I; + cron?: string | CronConfig; }; export type StepBaseContext = { @@ -80,6 +92,7 @@ export type WorkflowContext< runId: string; timeline: Record; logger: WorkflowLogger; + schedule?: ScheduleContext; }; export type WorkflowDefinition< @@ -91,6 +104,7 @@ export type WorkflowDefinition< inputSchema?: TInput; timeout?: number; // milliseconds retries?: number; + cron?: CronConfig; plugins?: WorkflowPlugin[]; };