From 9461c1c208b4fa10d26d8bf7b0fe07eabded7286 Mon Sep 17 00:00:00 2001 From: Sokratis Vidros Date: Thu, 28 May 2026 13:13:06 +0300 Subject: [PATCH] feat: recurring workflow schedules (closes #7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `schedule` option to `workflow()` that accepts a cron expression, a `parse-duration` string, or a `DurationObject`. Detection is automatic: 5–6 cron-charset tokens are treated as cron; anything else is parsed as a duration and translated into cron (only when the interval divides cleanly). The engine registers each scheduled workflow with pg-boss `schedule()` on start, unschedules on stop/unregister, and stamps the fire timestamp into the new `workflow_runs.scheduled_for` column. Schedule-triggered handlers receive `ctx.schedule.timestamp`; manual runs have `ctx.schedule` undefined. New helper `engine.getWorkflowLastRun({ workflowId, resourceId? })` returns the most recent run — useful as a cursor for incremental syncs without denormalizing previous-run state into context. Supersedes #7 — keeps the cron-via-pg-boss approach but reshapes the API per review: single `schedule` option (no nested `{ expression, timezone }`), flat top-level `timezone` sibling, `ctx.schedule` trimmed to `{ timestamp }`, no `triggerSource` column, history exposed via a generic helper instead of denormalized into the run row. Co-Authored-By: Marcelo Mollaj Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 43 +++++++++- examples/cron.ts | 64 ++++++++++++++ examples/package.json | 3 +- package-lock.json | 1 + package.json | 1 + src/constants.ts | 5 ++ src/db/migration.ts | 8 +- src/db/queries.ts | 43 +++++++++- src/db/types.ts | 2 + src/definition.ts | 4 +- src/duration.ts | 10 +-- src/engine.test.ts | 102 ++++++++++++++++++++++ src/engine.ts | 113 +++++++++++++++++++++++++ src/index.ts | 2 + src/migration-lock.integration.test.ts | 2 +- src/schedule.test.ts | 93 ++++++++++++++++++++ src/schedule.ts | 77 +++++++++++++++++ src/types.ts | 18 ++++ 18 files changed, 579 insertions(+), 12 deletions(-) create mode 100644 examples/cron.ts create mode 100644 src/schedule.test.ts create mode 100644 src/schedule.ts diff --git a/AGENTS.md b/AGENTS.md index ff234fa..d137d24 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -67,17 +67,57 @@ import { z } from 'zod'; const myWorkflow = workflow( 'workflow-id', // unique string ID - async ({ step, input, runId, workflowId, timeline, logger }) => { + async ({ step, input, runId, workflowId, timeline, logger, schedule }) => { // workflow body with step calls + // `schedule` is populated only for runs triggered by a recurring schedule }, { inputSchema: z.object({ /* ... */ }), // optional Zod schema timeout: 60000, // optional, milliseconds retries: 3, // optional, max retry count + schedule: '*/5 * * * *', // optional, recurring schedule (cron or duration) + timezone: 'America/New_York', // optional, only meaningful for cron (default: UTC) } ); ``` +### Recurring workflows + +Workflows can run on a recurring schedule. `schedule` accepts three forms — a +cron expression, a duration string, or a `DurationObject`: + +```typescript +workflow('cron-style', handler, { schedule: '0 9 * * 1-5', timezone: 'America/New_York' }); +workflow('every-5-min', handler, { schedule: '5m' }); // duration string +workflow('every-hour', handler, { schedule: '1 hour' }); // natural-language duration +workflow('every-day', handler, { schedule: { days: 1 } }); // DurationObject +``` + +**Detection rule.** A string that splits into 5 or 6 whitespace tokens of +cron-charset (`0-9 * / , - ? L W #`) is treated as a cron expression and +validated by `cron-parser`. Otherwise it's parsed as a duration via +`parse-duration` and translated into a cron expression — but only if the +interval divides cleanly (whole minutes that divide 60, whole hours that divide +24, or 1 day). Non-divisible intervals (`'23m'`, `'7h'`) throw with a clear +message; use an explicit cron expression for those. + +**Schedule context.** Schedule-triggered runs receive `ctx.schedule.timestamp` +— the time the schedule fired. Manual runs from `engine.startWorkflow()` have +`ctx.schedule === undefined`. Use that as the "this is a scheduled fire" flag. + +```typescript +async ({ step, schedule, workflowId }) => { + // For cursor-style incremental syncs, fetch the previous run separately: + const lastRun = await engine.getWorkflowLastRun({ workflowId }); + const since = lastRun?.completedAt ?? new Date(0); + // ... fetch data updated since `since` +} +``` + +**Overlap policy.** Scheduled runs are singletons via pg-boss — if a fire is +queued while the previous run is still executing, pg-boss handles it +(configurable overlap policies may be added later). + ### `WorkflowEngine` - Main orchestrator ```typescript @@ -190,6 +230,7 @@ await engine.triggerEvent({ // Query runs const run = await engine.getRun({ runId, resourceId }); +const lastRun = await engine.getWorkflowLastRun({ workflowId, resourceId }); // null if none const progress = await engine.checkProgress({ runId, resourceId }); const { items, nextCursor, hasMore } = await engine.getRuns({ resourceId: 'user-123', diff --git a/examples/cron.ts b/examples/cron.ts new file mode 100644 index 0000000..17bc630 --- /dev/null +++ b/examples/cron.ts @@ -0,0 +1,64 @@ +import { WorkflowEngine, workflow } from '../src'; + +// A recurring workflow. +// +// `schedule` accepts: +// - a cron expression: '0 9 * * 1-5' (weekdays at 9am) +// - a duration string: '5m', '1 hour', '1 day' +// - a DurationObject: { minutes: 5 } +// +// `timezone` is optional and only meaningful for cron expressions (UTC by default). +// `ctx.schedule.timestamp` is the time this fire was scheduled — present only on +// schedule-triggered runs. Use `engine.getWorkflowLastRun(...)` to fetch the +// previous run when you need a cursor for incremental syncs. + +const syncOrders = workflow( + 'sync-orders', + async ({ step, schedule, workflowId, logger }) => { + logger.log( + schedule + ? `Cron fire at ${schedule.timestamp.toISOString()}` + : 'Manual run (no schedule context)', + ); + + const lastRun = await engine.getWorkflowLastRun({ workflowId }); + const since = lastRun?.completedAt ?? new Date(0); + logger.log(`Syncing orders changed since ${since.toISOString()}`); + + const orders = await step.run('fetch-new-orders', async () => { + return [ + { id: 'ord_1', total: 99.0 }, + { id: 'ord_2', total: 149.5 }, + ]; + }); + + await step.run('write-to-warehouse', async () => ({ written: orders.length })); + + return { synced: orders.length, since: since.toISOString() }; + }, + { + schedule: '5m', + retries: 3, + }, +); + +const engine = new WorkflowEngine({ + connectionString: process.env.DATABASE_URL ?? 'postgres://localhost:5432/pg_workflows_example', + workflows: [syncOrders], +}); + +async function main() { + await engine.start(); + console.warn('Schedule registered. Waiting for triggers (Ctrl+C to stop)...'); + + process.on('SIGINT', async () => { + console.warn('Shutting down...'); + await engine.stop(); + process.exit(0); + }); +} + +main().catch((err) => { + console.error('Example failed:', err); + process.exit(1); +}); diff --git a/examples/package.json b/examples/package.json index a50d6d3..9cbe3d7 100644 --- a/examples/package.json +++ b/examples/package.json @@ -11,7 +11,8 @@ "example:timeout": "npm run build:lib && dotenvx run -- tsx timeout.ts", "example:polling": "npm run build:lib && dotenvx run -- tsx polling.ts", "example:microservices:worker": "npm run build:lib && dotenvx run -- tsx microservices/worker-service.ts", - "example:microservices:api": "npm run build:lib && dotenvx run -- tsx microservices/api-service.ts" + "example:microservices:api": "npm run build:lib && dotenvx run -- tsx microservices/api-service.ts", + "example:cron": "npm run build:lib && dotenvx run -- tsx cron.ts" }, "dependencies": { "pg": "^8.13.1", diff --git a/package-lock.json b/package-lock.json index 28d186d..2e1aa3a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "MIT", "dependencies": { "@standard-schema/spec": "^1.1.0", + "cron-parser": "^5.5.0", "es-toolkit": "^1.44.0", "ksuid": "^3.0.0", "parse-duration": "^2.1.5", diff --git a/package.json b/package.json index cf13244..9aa0e47 100644 --- a/package.json +++ b/package.json @@ -78,6 +78,7 @@ }, "dependencies": { "@standard-schema/spec": "^1.1.0", + "cron-parser": "^5.5.0", "es-toolkit": "^1.44.0", "ksuid": "^3.0.0", "parse-duration": "^2.1.5", diff --git a/src/constants.ts b/src/constants.ts index 0455fad..097cc6e 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -1,6 +1,11 @@ export const PAUSE_EVENT_NAME = '__internal_pause'; export const WORKFLOW_RUN_QUEUE_NAME = 'workflow-run'; export const WORKFLOW_RUN_DLQ_QUEUE_NAME = 'workflow_run_dlq'; +// pg-boss queue names allow only alphanumeric, _, -, ., or / — keep the +// prefix in that character set so any valid workflow id stays addressable. +const SCHEDULE_QUEUE_PREFIX = '__pgw_schedule_'; +export const scheduleQueueNameFor = (workflowId: string): string => + `${SCHEDULE_QUEUE_PREFIX}${workflowId}`; export const DEFAULT_PGBOSS_SCHEMA = 'pgboss_v12_pgworkflow'; export const MAX_WORKFLOW_ID_LENGTH = 256; export const MAX_RESOURCE_ID_LENGTH = 256; diff --git a/src/db/migration.ts b/src/db/migration.ts index d749698..f71dbe8 100644 --- a/src/db/migration.ts +++ b/src/db/migration.ts @@ -5,7 +5,7 @@ export const MIGRATION_LOCK_ID = 738291645; // Bump this when adding new migrations. The engine stores the current version // in a `workflow_schema_version` table so migrations only run once per version. -const CURRENT_SCHEMA_VERSION = 4; +const CURRENT_SCHEMA_VERSION = 5; export async function runMigrations(db: Db): Promise { // Fast path: skip the advisory lock if schema is already current. @@ -87,6 +87,12 @@ export async function runMigrations(db: Db): Promise { ); } + if (currentVersion < 5) { + commands.push( + 'ALTER TABLE workflow_runs ADD COLUMN IF NOT EXISTS scheduled_for timestamp with time zone', + ); + } + // Upsert the schema version if (currentVersion === 0) { commands.push( diff --git a/src/db/queries.ts b/src/db/queries.ts index 4c0018a..138c566 100644 --- a/src/db/queries.ts +++ b/src/db/queries.ts @@ -29,6 +29,7 @@ type WorkflowRunRow = { parent_run_id: string | null; parent_step_id: string | null; parent_resource_id: string | null; + scheduled_for: string | Date | null; }; function mapRowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { @@ -60,6 +61,7 @@ function mapRowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { parentRunId: row.parent_run_id, parentStepId: row.parent_step_id, parentResourceId: row.parent_resource_id, + scheduledFor: row.scheduled_for ? new Date(row.scheduled_for) : null, }; } @@ -76,6 +78,7 @@ export async function insertWorkflowRun( parentRunId, parentStepId, parentResourceId, + scheduledFor, }: { resourceId?: string; workflowId: string; @@ -88,6 +91,7 @@ export async function insertWorkflowRun( parentRunId?: string; parentStepId?: string; parentResourceId?: string; + scheduledFor?: Date; }, db: Db, ): Promise<{ run: WorkflowRun; created: boolean }> { @@ -111,9 +115,10 @@ export async function insertWorkflowRun( idempotency_key, parent_run_id, parent_step_id, - parent_resource_id + parent_resource_id, + scheduled_for ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING RETURNING *`, [ @@ -133,6 +138,7 @@ export async function insertWorkflowRun( parentRunId ?? null, parentStepId ?? null, parentResourceId ?? null, + scheduledFor ?? null, ], ); @@ -187,6 +193,39 @@ export async function getWorkflowRun( return mapRowToWorkflowRun(run); } +export async function getWorkflowLastRun( + { + workflowId, + resourceId, + }: { + workflowId: string; + resourceId?: string; + }, + db: Db, +): Promise { + const result = resourceId + ? await db.executeSql( + `SELECT * FROM workflow_runs + WHERE workflow_id = $1 AND resource_id = $2 + ORDER BY created_at DESC + LIMIT 1`, + [workflowId, resourceId], + ) + : await db.executeSql( + `SELECT * FROM workflow_runs + WHERE workflow_id = $1 + ORDER BY created_at DESC + LIMIT 1`, + [workflowId], + ); + + const run = result.rows[0]; + if (!run) { + return null; + } + return mapRowToWorkflowRun(run); +} + export async function updateWorkflowRun( { runId, diff --git a/src/db/types.ts b/src/db/types.ts index bd3ee58..1df15e3 100644 --- a/src/db/types.ts +++ b/src/db/types.ts @@ -21,4 +21,6 @@ export type WorkflowRun = { parentRunId: string | null; parentStepId: string | null; parentResourceId: string | null; + /** Set when the run was started by a recurring schedule; the timestamp the schedule fired. */ + scheduledFor: Date | null; }; diff --git a/src/definition.ts b/src/definition.ts index 2f26e6b..93b90f5 100644 --- a/src/definition.ts +++ b/src/definition.ts @@ -45,7 +45,7 @@ function createWorkflowFactory( const factory = (( id: string, handler: (context: WorkflowContext) => Promise, - { inputSchema, timeout, retries }: WorkflowOptions = {}, + { inputSchema, timeout, retries, schedule, timezone }: WorkflowOptions = {}, ): WorkflowDefinition => ({ id, handler: handler as ( @@ -54,6 +54,8 @@ function createWorkflowFactory( inputSchema, timeout, retries, + schedule, + timezone, plugins: plugins.length > 0 ? (plugins as WorkflowPlugin[]) : undefined, })) as WorkflowFactory; diff --git a/src/duration.ts b/src/duration.ts index 79b6c97..8123395 100644 --- a/src/duration.ts +++ b/src/duration.ts @@ -11,11 +11,11 @@ export type DurationObject = { export type Duration = string | DurationObject; -const MS_PER_SECOND = 1000; -const MS_PER_MINUTE = 60 * MS_PER_SECOND; -const MS_PER_HOUR = 60 * MS_PER_MINUTE; -const MS_PER_DAY = 24 * MS_PER_HOUR; -const MS_PER_WEEK = 7 * MS_PER_DAY; +const MS_PER_SECOND: number = 1000; +export const MS_PER_MINUTE: number = 60 * MS_PER_SECOND; +export const MS_PER_HOUR: number = 60 * MS_PER_MINUTE; +export const MS_PER_DAY: number = 24 * MS_PER_HOUR; +const MS_PER_WEEK: number = 7 * MS_PER_DAY; export function parseDuration(duration: Duration): number { if (typeof duration === 'string') { diff --git a/src/engine.test.ts b/src/engine.test.ts index ed06663..ad41ffc 100644 --- a/src/engine.test.ts +++ b/src/engine.test.ts @@ -154,6 +154,108 @@ describe('WorkflowEngine', () => { await expect(engine.registerWorkflow(invalidWorkflow)).rejects.toThrow(WorkflowEngineError); }); + + describe('schedule option', () => { + it('accepts a cron schedule with explicit timezone', async () => { + const wf = workflow( + 'scheduled-cron-wf', + async ({ step }) => step.run('s1', async () => 'ok'), + { schedule: '0 9 * * 1-5', timezone: 'America/New_York' }, + ); + + await engine.registerWorkflow(wf); + const stored = engine.workflows.get('scheduled-cron-wf'); + expect(stored?.schedule).toBe('0 9 * * 1-5'); + expect(stored?.timezone).toBe('America/New_York'); + }); + + it('accepts a duration-string schedule', async () => { + const wf = workflow( + 'scheduled-duration-wf', + async ({ step }) => step.run('s1', async () => 'ok'), + { schedule: '5m' }, + ); + + await engine.registerWorkflow(wf); + expect(engine.workflows.get('scheduled-duration-wf')?.schedule).toBe('5m'); + }); + + it('accepts a DurationObject schedule', async () => { + const wf = workflow( + 'scheduled-object-wf', + async ({ step }) => step.run('s1', async () => 'ok'), + { schedule: { hours: 6 } }, + ); + + await engine.registerWorkflow(wf); + expect(engine.workflows.get('scheduled-object-wf')?.schedule).toEqual({ hours: 6 }); + }); + + it('throws when schedule is an invalid cron expression', async () => { + const wf = workflow( + 'scheduled-bad-cron-wf', + async ({ step }) => step.run('s1', async () => 'ok'), + { schedule: '99 * * * *' }, + ); + + await expect(engine.registerWorkflow(wf)).rejects.toThrow(WorkflowEngineError); + }); + + it('throws when schedule duration does not divide cleanly', async () => { + const wf = workflow( + 'scheduled-bad-duration-wf', + async ({ step }) => step.run('s1', async () => 'ok'), + { schedule: '23m' }, + ); + + await expect(engine.registerWorkflow(wf)).rejects.toThrow(WorkflowEngineError); + }); + }); + }); + + describe('getWorkflowLastRun()', () => { + let engine: WorkflowEngine; + const workflowId = 'last-run-wf'; + + beforeEach(async () => { + engine = new WorkflowEngine({ + workflows: [ + workflow(workflowId, async ({ step, input }) => + step.run('s1', async () => ({ echo: input })), + ), + ], + pool: testPool, + boss: testBoss, + }); + await engine.start(false); + }); + + afterEach(async () => { + await engine.stop(); + }); + + it('returns null when no runs exist for the workflow', async () => { + const result = await engine.getWorkflowLastRun({ workflowId }); + expect(result).toBeNull(); + }); + + it('returns the most recently created run for the workflow', async () => { + const first = await engine.startWorkflow({ workflowId, input: { n: 1 } }); + const second = await engine.startWorkflow({ workflowId, input: { n: 2 } }); + + const result = await engine.getWorkflowLastRun({ workflowId }); + expect(result?.id).toBe(second.id); + expect(result?.id).not.toBe(first.id); + }); + + it('scopes by resourceId when provided', async () => { + const a = await engine.startWorkflow({ workflowId, input: { n: 1 }, resourceId: 'tenant-a' }); + await engine.startWorkflow({ workflowId, input: { n: 2 }, resourceId: 'tenant-b' }); + + const result = await engine.getWorkflowLastRun({ workflowId, resourceId: 'tenant-a' }); + expect(result?.id).toBe(a.id); + expect(result?.resourceId).toBe('tenant-a'); + }); }); describe('workflow.use(plugin)', () => { diff --git a/src/engine.ts b/src/engine.ts index 8069f42..b507067 100644 --- a/src/engine.ts +++ b/src/engine.ts @@ -7,12 +7,14 @@ import { invokeChildWorkflowTimelineKey, isInvokeChildWorkflowTimelineEntry, PAUSE_EVENT_NAME, + scheduleQueueNameFor, WORKFLOW_RUN_DLQ_QUEUE_NAME, WORKFLOW_RUN_QUEUE_NAME, waitForTimelineKey, } from './constants'; import { runMigrations } from './db/migration'; import { + getWorkflowLastRun, getWorkflowRun, getWorkflowRuns, insertWorkflowRun, @@ -28,6 +30,7 @@ import { WorkflowEngineError, WorkflowRunNotFoundError, } from './error'; +import { resolveSchedule } from './schedule'; import { type InferInputParameters, type InputParameters, @@ -246,12 +249,81 @@ export class WorkflowEngine { this.logger.log(`Worker started for queue ${WORKFLOW_RUN_DLQ_QUEUE_NAME}`); } + if (asEngine) { + const scheduled = Array.from(this.workflows.values()).flatMap((wf) => + wf.schedule == null + ? [] + : [{ id: wf.id, resolved: resolveSchedule(wf.schedule, wf.timezone) }], + ); + await Promise.allSettled( + scheduled.map(({ id, resolved }) => + this.registerWorkflowSchedule(id, resolved).catch((error: unknown) => { + this.logger.error( + `Failed to register schedule for "${id}", skipping`, + error instanceof Error ? error : new Error(String(error)), + { workflowId: id }, + ); + }), + ), + ); + } + this._started = true; this.logger.log('Workflow engine started!'); } + private async registerWorkflowSchedule( + workflowId: string, + resolvedSchedule: { cron: string; timezone: string }, + ): Promise { + const scheduleQueueName = scheduleQueueNameFor(workflowId); + await this.boss.createQueue(scheduleQueueName); + await this.boss.schedule(scheduleQueueName, resolvedSchedule.cron, null, { + tz: resolvedSchedule.timezone, + }); + await this.boss.work( + scheduleQueueName, + { batchSize: 1, includeMetadata: true }, + async (jobs: JobWithMetadata[]) => { + const scheduledFor = jobs[0]?.startAfter ?? new Date(); + try { + await this.createWorkflowRun({ workflowId, input: {}, scheduledFor }); + } catch (error) { + this.logger.error( + `Schedule fire failed to start a run for workflow "${workflowId}"`, + error instanceof Error ? error : new Error(String(error)), + { workflowId }, + ); + throw error; + } + }, + ); + this.logger.log( + `Schedule registered for workflow "${workflowId}": ${resolvedSchedule.cron} (${resolvedSchedule.timezone})`, + { workflowId }, + ); + } + + private async unscheduleWorkflow(workflowId: string): Promise { + try { + await this.boss.unschedule(scheduleQueueNameFor(workflowId)); + } catch (error) { + this.logger.error( + `Failed to unschedule "${workflowId}"`, + error instanceof Error ? error : new Error(String(error)), + { workflowId }, + ); + } + } + async stop(): Promise { + await Promise.allSettled( + Array.from(this.workflows.values()) + .filter((wf) => wf.schedule != null) + .map((wf) => this.unscheduleWorkflow(wf.id)), + ); + await this.boss.stop(); if (this._ownsPool) { @@ -275,11 +347,20 @@ export class WorkflowEngine { definition.handler as (context: WorkflowContext) => Promise, ); + // Validate eagerly so authors get a clear error at registration time. + const resolvedSchedule = definition.schedule + ? resolveSchedule(definition.schedule, definition.timezone) + : undefined; + this.workflows.set(definition.id, { ...definition, steps, } as WorkflowInternalDefinition); + if (this._started && resolvedSchedule) { + await this.registerWorkflowSchedule(definition.id, resolvedSchedule); + } + this.logger.log(`Registered workflow "${definition.id}" with steps:`); for (const step of steps.values()) { const tags = []; @@ -293,11 +374,22 @@ export class WorkflowEngine { } async unregisterWorkflow(workflowId: string): Promise { + const existing = this.workflows.get(workflowId); + if (existing?.schedule != null && this._started) { + await this.unscheduleWorkflow(workflowId); + } this.workflows.delete(workflowId); return this; } async unregisterAllWorkflows(): Promise { + if (this._started) { + await Promise.allSettled( + Array.from(this.workflows.values()) + .filter((wf) => wf.schedule != null) + .map((wf) => this.unscheduleWorkflow(wf.id)), + ); + } this.workflows.clear(); return this; } @@ -404,6 +496,7 @@ export class WorkflowEngine { parentRunId, parentStepId, parentResourceId, + scheduledFor, enqueue = true, db, }: { @@ -415,6 +508,7 @@ export class WorkflowEngine { parentRunId?: string; parentStepId?: string; parentResourceId?: string; + scheduledFor?: Date; enqueue?: boolean; db?: Db; }): Promise<{ run: WorkflowRun; created: boolean }> { @@ -465,6 +559,7 @@ export class WorkflowEngine { parentRunId, parentStepId, parentResourceId, + scheduledFor, }, targetDb, ); @@ -753,6 +848,23 @@ export class WorkflowEngine { return run; } + /** + * Fetch the most recently created run for a workflow, optionally scoped to a + * `resourceId`. Useful for cron-style incremental syncs where the next run + * needs the previous run's completion timestamp as a cursor. + */ + async getWorkflowLastRun({ + workflowId, + resourceId, + }: { + workflowId: string; + resourceId?: string; + }): Promise { + validateWorkflowId(workflowId); + validateResourceId(resourceId); + return getWorkflowLastRun({ workflowId, resourceId }, this.db); + } + async updateRun( { runId, @@ -1133,6 +1245,7 @@ export class WorkflowEngine { }, logger: this.logger, step, + schedule: run.scheduledFor ? { timestamp: run.scheduledFor } : undefined, }; for (const plugin of plugins) { diff --git a/src/index.ts b/src/index.ts index f2e8d1e..eecb431 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,9 +8,11 @@ export type { Duration } from './duration'; export { WorkflowEngine, type WorkflowEngineOptions } from './engine'; export { WorkflowEngineError, WorkflowRunNotFoundError } from './error'; export { type OtelPluginOptions, otelPlugin } from './plugins/otel'; +export type { Schedule } from './schedule'; export type { InferInputParameters, InputParameters, + ScheduleContext, StartWorkflowOptions, StepBaseContext, WorkflowContext, diff --git a/src/migration-lock.integration.test.ts b/src/migration-lock.integration.test.ts index 30c9f7d..83c3b51 100644 --- a/src/migration-lock.integration.test.ts +++ b/src/migration-lock.integration.test.ts @@ -55,7 +55,7 @@ describe('Migration advisory lock (real PostgreSQL)', () => { // Verify the final schema state is correct const versionResult = await pool.query('SELECT version FROM workflow_schema_version LIMIT 1'); - expect(versionResult.rows[0].version).toBe(4); + expect(versionResult.rows[0].version).toBe(5); const tableExists = await pool.query(` SELECT EXISTS ( diff --git a/src/schedule.test.ts b/src/schedule.test.ts new file mode 100644 index 0000000..0da6054 --- /dev/null +++ b/src/schedule.test.ts @@ -0,0 +1,93 @@ +import { describe, expect, it } from 'vitest'; +import { WorkflowEngineError } from './error'; +import { resolveSchedule } from './schedule'; + +describe('resolveSchedule', () => { + describe('cron strings', () => { + it('passes through a standard 5-field cron expression', () => { + expect(resolveSchedule('*/5 * * * *')).toEqual({ + cron: '*/5 * * * *', + timezone: 'UTC', + }); + }); + + it('passes through a 6-field cron expression (with seconds)', () => { + expect(resolveSchedule('0 0 12 * * *')).toEqual({ + cron: '0 0 12 * * *', + timezone: 'UTC', + }); + }); + + it('uses provided timezone for cron expressions', () => { + expect(resolveSchedule('0 9 * * 1-5', 'America/New_York')).toEqual({ + cron: '0 9 * * 1-5', + timezone: 'America/New_York', + }); + }); + + it('throws on an invalid cron expression', () => { + expect(() => resolveSchedule('* * * *')).toThrow(); + expect(() => resolveSchedule('99 * * * *')).toThrow(WorkflowEngineError); + }); + }); + + describe('duration strings', () => { + it('translates a duration that divides 60 minutes cleanly', () => { + expect(resolveSchedule('5m')).toEqual({ cron: '*/5 * * * *', timezone: 'UTC' }); + expect(resolveSchedule('15m')).toEqual({ cron: '*/15 * * * *', timezone: 'UTC' }); + expect(resolveSchedule('30m')).toEqual({ cron: '*/30 * * * *', timezone: 'UTC' }); + }); + + it('translates a duration that divides 24 hours cleanly', () => { + expect(resolveSchedule('1h')).toEqual({ cron: '0 */1 * * *', timezone: 'UTC' }); + expect(resolveSchedule('2 hours')).toEqual({ cron: '0 */2 * * *', timezone: 'UTC' }); + expect(resolveSchedule('12h')).toEqual({ cron: '0 */12 * * *', timezone: 'UTC' }); + }); + + it('translates 1 day to midnight cron', () => { + expect(resolveSchedule('1d')).toEqual({ cron: '0 0 * * *', timezone: 'UTC' }); + expect(resolveSchedule('1 day')).toEqual({ cron: '0 0 * * *', timezone: 'UTC' }); + }); + + it('throws on durations under a minute', () => { + expect(() => resolveSchedule('30s')).toThrow(/at least 1 minute/); + }); + + it('throws on durations that do not divide cleanly', () => { + expect(() => resolveSchedule('23m')).toThrow(/doesn't map cleanly/); + expect(() => resolveSchedule('7h')).toThrow(/doesn't map cleanly/); + expect(() => resolveSchedule('2d')).toThrow(/doesn't map/); + }); + + it('throws on empty string', () => { + expect(() => resolveSchedule('')).toThrow(/empty/); + }); + }); + + describe('duration objects', () => { + it('translates { minutes: N } where N divides 60', () => { + expect(resolveSchedule({ minutes: 5 })).toEqual({ + cron: '*/5 * * * *', + timezone: 'UTC', + }); + }); + + it('translates { hours: N } where N divides 24', () => { + expect(resolveSchedule({ hours: 6 })).toEqual({ + cron: '0 */6 * * *', + timezone: 'UTC', + }); + }); + + it('translates { days: 1 } to midnight cron', () => { + expect(resolveSchedule({ days: 1 })).toEqual({ + cron: '0 0 * * *', + timezone: 'UTC', + }); + }); + + it('throws on { minutes: 23 } (does not divide 60)', () => { + expect(() => resolveSchedule({ minutes: 23 })).toThrow(/doesn't map cleanly/); + }); + }); +}); diff --git a/src/schedule.ts b/src/schedule.ts new file mode 100644 index 0000000..02bd56d --- /dev/null +++ b/src/schedule.ts @@ -0,0 +1,77 @@ +import { CronExpressionParser } from 'cron-parser'; +import { type Duration, MS_PER_DAY, MS_PER_HOUR, MS_PER_MINUTE, parseDuration } from './duration'; +import { WorkflowEngineError } from './error'; + +const CRON_TOKEN = /^[0-9*/,?\-LW#]+$/; + +export type Schedule = string | Exclude; + +type ResolvedSchedule = { + cron: string; + timezone: string; +}; + +function looksLikeCronString(value: string): boolean { + const tokens = value.trim().split(/\s+/); + if (tokens.length !== 5 && tokens.length !== 6) return false; + return tokens.every((t) => CRON_TOKEN.test(t)); +} + +function validateCronExpression(expression: string, timezone: string): void { + try { + CronExpressionParser.parse(expression, { tz: timezone }); + } catch (e) { + throw new WorkflowEngineError( + `Invalid cron expression "${expression}" (timezone: ${timezone}): ${e instanceof Error ? e.message : String(e)}`, + ); + } +} + +function durationMsToCron(ms: number, original: Duration): string { + if (ms < MS_PER_MINUTE) { + throw new WorkflowEngineError( + `Schedule interval must be at least 1 minute; got ${ms}ms from ${JSON.stringify(original)}`, + ); + } + + if (ms % MS_PER_DAY === 0) { + const days = ms / MS_PER_DAY; + if (days === 1) return '0 0 * * *'; + throw cronStepError(original, `${days} days`); + } + + if (ms % MS_PER_HOUR === 0) { + const hours = ms / MS_PER_HOUR; + if (24 % hours === 0) return `0 */${hours} * * *`; + throw cronStepError(original, `${hours} hours`); + } + + const minutes = ms / MS_PER_MINUTE; + if (Number.isInteger(minutes) && 60 % minutes === 0) return `*/${minutes} * * * *`; + throw cronStepError(original, `${minutes} minutes`); +} + +function cronStepError(original: Duration, label: string): WorkflowEngineError { + return new WorkflowEngineError( + `Schedule interval ${JSON.stringify(original)} (${label}) doesn't map cleanly to a recurring cron expression. Use a value that divides 60 minutes, 24 hours, or 1 day — or pass an explicit cron string.`, + ); +} + +/** + * Resolve a `schedule` option (cron string OR duration) to a cron expression + * plus timezone. Throws with a helpful message on bad input. + */ +export function resolveSchedule(schedule: Schedule, timezone?: string): ResolvedSchedule { + const tz = timezone ?? 'UTC'; + + if (typeof schedule === 'string' && looksLikeCronString(schedule)) { + validateCronExpression(schedule, tz); + return { cron: schedule, timezone: tz }; + } + + // Duration string, DurationObject, or string that didn't match cron-charset. + // `parseDuration` throws for empty strings and unparseable input; `durationMsToCron` + // only ever emits cron expressions we construct ourselves, so no further validation needed. + const ms = parseDuration(schedule); + return { cron: durationMsToCron(ms, schedule), timezone: tz }; +} diff --git a/src/types.ts b/src/types.ts index d64a747..6481e13 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,6 +1,7 @@ import type { StandardSchemaV1 } from '@standard-schema/spec'; import type { WorkflowRun } from './db/types'; import type { Duration } from './duration'; +import type { Schedule } from './schedule'; export enum WorkflowStatus { PENDING = 'pending', @@ -36,6 +37,19 @@ export type WorkflowOptions = { timeout?: number; retries?: number; inputSchema?: I; + /** + * Recurring schedule. Accepts a cron expression (`'0 9 * * 1-5'`), + * a duration string (`'5m'`, `'1 hour'`), or a `DurationObject`. + */ + schedule?: Schedule; + /** IANA timezone for cron expressions. Defaults to UTC. Ignored for duration-based schedules. */ + timezone?: string; +}; + +/** Metadata about a scheduled fire, exposed on `ctx.schedule` for runs triggered by a schedule. */ +export type ScheduleContext = { + /** Time the schedule fired this run. */ + timestamp: Date; }; export type StepBaseContext = { @@ -119,6 +133,8 @@ export type WorkflowContext< attempt: number; timeline: Record; logger: WorkflowLogger; + /** Set only for runs triggered by a recurring schedule. */ + schedule?: ScheduleContext; }; export type WorkflowDefinition = { @@ -128,6 +144,8 @@ export type WorkflowDefinition inputSchema?: TInput; timeout?: number; // milliseconds retries?: number; + schedule?: Schedule; + timezone?: string; plugins?: WorkflowPlugin[]; };