feat: add cron workflow scheduling with overlap protection and schedule context#7
feat: add cron workflow scheduling with overlap protection and schedule context#7marcelom97 wants to merge 3 commits into
Conversation
|
@marcelom97 thanks for submitting this. Can you please elaborate on the incremental sync? Other than that, I need to think about the DX a bit more. I will start by adding inline comments. |
| max_retries: number; | ||
| job_id: string | null; | ||
| trigger_source: 'api' | 'cron'; | ||
| schedule_context: string | { timestamp: string; lastTimestamp?: string; timezone: string } | null; |
There was a problem hiding this comment.
How about using a cron: string column and a timezone, both being optional to avoid a nesting object?
| return mapRowToWorkflowRun(run); | ||
| } | ||
|
|
||
| export async function getLastCronCompletedAt( |
There was a problem hiding this comment.
Instead of a specific getLastCronCompletedAt method, I'd suggest introducing a getWorkflowLastRun that works for all workflows. This feels more generic and aligns with the naming of the rest of the methods.
| }; | ||
| } | ||
|
|
||
| private async setupCronSchedule(wf: InternalWorkflowDefinition): Promise<void> { |
There was a problem hiding this comment.
| private async setupCronSchedule(wf: InternalWorkflowDefinition): Promise<void> { | |
| private async scheduleCronWorkflow(wf: InternalWorkflowDefinition): Promise<void> { |
|
|
||
| let run = await this.getRun({ runId, resourceId }); | ||
|
|
||
| const schedule: ScheduleContext | undefined = run.scheduleContext |
There was a problem hiding this comment.
Please refer to my previous comment about modeling scheduleContext.
| const schedule: ScheduleContext | undefined = run.scheduleContext | ||
| ? { | ||
| timestamp: run.scheduleContext.timestamp, | ||
| lastTimestamp: run.scheduleContext.lastTimestamp ?? undefined, |
There was a problem hiding this comment.
Following on the modeling, lastTimestamp comes from the workflow_runs table. We can replace it with the timestamp of the latest record.
| limit?: number; | ||
| statuses?: WorkflowStatus[]; | ||
| workflowId?: string; | ||
| triggerSource?: 'api' | 'cron'; |
There was a problem hiding this comment.
Do we need triggerSource? If not let's remove it.
| | `WORKFLOW_RUN_WORKERS` | Number of worker processes | `3` | | ||
| | `WORKFLOW_RUN_EXPIRE_IN_SECONDS` | Job expiration time in seconds | `300` | | ||
|
|
||
| ## Cron Workflows |
There was a problem hiding this comment.
Let's create the following DX:
-
The cron expression can be a valid cron string or a human friendly cron string such as https://github.com/rainder/human-to-cron. Note, this library is quite old. Let's see if there is a modern one.
-
Cron can be either a string or an object of the expression and the timezone. If the timezone is not specified, we assume UTC or the current timezone of the running container.
There was a problem hiding this comment.
I looked into this — human-to-cron has critical bugs (e.g. "every hour" produces * */1 * * * which fires every minute, and unrecognized input silently becomes * * * * *). I couldn't find a modern,
well-maintained alternative either.
I'd suggest we defer this and stick with standard cron expressions for now. We can revisit if a reliable library comes along, or build a small parser ourselves for a limited set of human-friendly strings (e.g.
"every 5 minutes", "daily at 9am").
|
@marcelom97 Any updates on this? |
|
Hey @SokratisVidros, thanks for the review and for your suggestions. I will take care of this over the weekend. |
|
@SokratisVidros incremental sync is about only processing data that changed since the last cron run, instead of re-processing everything each time. The schedule context gives cron workflows a lastTimestamp (derived from the previous completed run's completedAt), so the workflow can use it as a cursor: const sync = workflow('sync-data', async ({ step, schedule }) => {
const since = schedule?.lastTimestamp ?? new Date(0);
const data = await step.run('fetch', async () => fetchSince(since));
await step.run('write', async () => writeToDB(data));
}, { cron: '*/15 * * * *' });Without this, each cron run would need to either re-process all data or manually track its own high-water mark somewhere. lastTimestamp makes that built-in. If you think it's not useful we can remove it, but I find it quite useful to have it. |
|
@marcelom97 Thanks for the updates. I will review them by the end of this week and get back to you. |
f158d41 to
913e073
Compare
|
@SokratisVidros any news for this PR? |
Summary
cronoption (expression + optional timezone)schedulecontext on cron runs withtimestamp,lastTimestamp,timezone— enables incremental sync without manual state trackinggetRuns()supportstriggerSourcefilter ('api'|'cron')schedule()Usage
Test plan