diff --git a/docs/index.md b/docs/index.md index b99cdb1..4f52d7a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -118,3 +118,11 @@ Owners can evaluate `CRABBOX.md` for enabled repos. Valid workflow config sets r ## Status Deployed and actively used by OpenClaw. See [Current Boundaries](#current-boundaries) for the remaining deliberately unimplemented product behaviors. + +## Recurring cards + +Crabfleet supports recurring cards for operational jobs that should run on a cadence without a human repeatedly pressing Start. A card can include a schedule object such as: + + { "kind": "interval", "everyMs": 86400000 } + +The Worker scheduled handler and the owner-only `/api/admin/scheduler/tick` endpoint both scan due cards, call the existing run scheduler, create a run attempt, and advance `nextRunAt`. This keeps recurrence at the card/run layer instead of using long-running loops. diff --git a/docs/spec.md b/docs/spec.md index 4fb18bd..f691e8e 100644 --- a/docs/spec.md +++ b/docs/spec.md @@ -414,3 +414,15 @@ Every push to `main` runs checks, tests, builds, migrations, deploys, and endpoi - [GitHub Actions Sessions](/github-actions-sessions/) - [Native macOS Client](/macos-native-client/) - [Fleet v2 Implementation Record](/spec-v2/) + +## Recurring cards + +Recurring work is modeled as a card schedule, not as an infinite process loop. + +MVP schedule shape: + + { "kind": "interval", "everyMs": 86400000 } + +When `nextRunAt` is due, the scheduler queues a normal run attempt for the card and computes the next due timestamp. Active runs are not duplicated; a due card with an active run is skipped until the next tick. + +This allows daily operational sweeps, maintenance checks, and recurring repair jobs to remain visible in the normal card/run history. diff --git a/migrations/0020_recurring_cards.sql b/migrations/0020_recurring_cards.sql new file mode 100644 index 0000000..ca5f53e --- /dev/null +++ b/migrations/0020_recurring_cards.sql @@ -0,0 +1,4 @@ +ALTER TABLE cards ADD COLUMN schedule_json TEXT NOT NULL DEFAULT ''; +ALTER TABLE cards ADD COLUMN next_run_at INTEGER; +ALTER TABLE cards ADD COLUMN last_scheduled_run_at INTEGER; +CREATE INDEX IF NOT EXISTS idx_cards_next_run_at ON cards(next_run_at); diff --git a/src/index.ts b/src/index.ts index 0104a6a..09e00c4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -68,6 +68,13 @@ import { githubOAuthCanonicalSshLinkUrl, githubOAuthRedirectUri, } from "./oauth"; +import { + cardScheduleSummary, + nextRecurringRunAt, + normalizeCardSchedule, + parseStoredCardSchedule, + type CardSchedule, +} from "./recurring-cards"; import { APP_HTML, GHOSTTY_BROWSER_EXTERNAL_JS, @@ -400,6 +407,9 @@ type Card = { logs: string[]; changes: CardChanges; run: RunAttempt | null; + schedule: CardSchedule | null; + nextRunAt: number | null; + lastScheduledRunAt: number | null; }; type DiffFileStatus = "added" | "deleted" | "modified" | "renamed"; @@ -787,6 +797,9 @@ type CardTable = { changed_files: string; diff_patch: string; active_run_id: string | null; + schedule_json: string; + next_run_at: number | null; + last_scheduled_run_at: number | null; }; type RunAttemptTable = { @@ -2029,8 +2042,14 @@ export default { env: RuntimeEnv, context: ExecutionContext, ): Promise { + const now = Date.now(); context.waitUntil( - reconcileInteractiveSessionLifecycleBatch(env, Date.now()).catch((error) => { + runSchedulerTick(env, now).catch((error) => { + console.error("scheduled recurring card tick failed", error); + }), + ); + context.waitUntil( + reconcileInteractiveSessionLifecycleBatch(env, now).catch((error) => { console.error("scheduled interactive session reconciliation failed", error); }), ); @@ -2571,6 +2590,11 @@ async function api( return json({ runs: await readRunsForCard(env, cardId) }); } + if (request.method === "POST" && url.pathname === "/api/admin/scheduler/tick") { + requireRole(user, "owner"); + return json(await runSchedulerTick(env, Date.now())); + } + if (request.method === "PUT" && url.pathname === "/api/admin/policy") { requireRole(user, "owner"); return json(await updatePolicy(request, env, user)); @@ -13591,6 +13615,7 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis source?: string; runtime?: string; policy?: string; + schedule?: unknown; }>(request); const prompt = clean(body.prompt, 4000); const title = clean(body.title, 140) || titleFromPrompt(prompt); @@ -13604,6 +13629,7 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis const source = oneOf(body.source, ["Prompt", "Issue", "PR"], "Prompt"); const runtime = oneOf(body.runtime, runtimeOptions, "auto"); const policy = resolveCardPolicy(body.policy, workflowConfig); + const schedule = parseCreateCardSchedule(body.schedule); const owner = user.login ?? user.email ?? user.subject; const db = database(env); for (let attempt = 0; attempt < 3; attempt += 1) { @@ -13628,6 +13654,9 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis changed_files: "[]", diff_patch: "", active_run_id: null, + schedule_json: schedule ? JSON.stringify(schedule) : "", + next_run_at: schedule ? nextRecurringRunAt(schedule, now, null) : null, + last_scheduled_run_at: null, }) .execute(); await db @@ -13645,6 +13674,100 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis throw new Error("failed to allocate card id"); } +function parseCreateCardSchedule(input: unknown): CardSchedule | null { + try { + return normalizeCardSchedule(input); + } catch (error) { + throw badRequest(error instanceof Error ? error.message : "invalid schedule"); + } +} + +function safeStoredCardSchedule(value: unknown): CardSchedule | null { + try { + return parseStoredCardSchedule(value); + } catch { + return null; + } +} + +type SchedulerTickResult = { + status: "ok"; + now: number; + scanned: number; + queued: number; + skipped: number; + invalid: number; +}; + +async function runSchedulerTick(env: RuntimeEnv, now: number): Promise { + await reconcileStalledRuns(env, now); + const system = systemUser(); + const rows = ( + await sql<{ id: string; schedule_json: string; next_run_at: number | null }>` + SELECT id, schedule_json, next_run_at + FROM cards + WHERE schedule_json != '' + AND next_run_at IS NOT NULL + AND next_run_at <= ${now} + ORDER BY next_run_at ASC + LIMIT 25 + `.execute(database(env)) + ).rows; + let queued = 0; + let skipped = 0; + let invalid = 0; + for (const row of rows) { + const schedule = safeStoredCardSchedule(row.schedule_json); + if (!schedule) { + invalid += 1; + await appendEvent( + env, + row.id, + system, + "recurring schedule invalid; human review required", + now, + ); + continue; + } + const card = await readCard(env, row.id); + if (!card) { + skipped += 1; + continue; + } + if (card.run && activeRunStatuses.includes(card.run.status)) { + skipped += 1; + await appendEvent( + env, + card.id, + system, + "recurring schedule skipped; run already active", + now, + ); + continue; + } + const claimed = await claimRunning(env, system, card, now); + if (!claimed) { + skipped += 1; + continue; + } + const nextRunAt = nextRecurringRunAt(schedule, now, now); + await database(env) + .updateTable("cards") + .set({ next_run_at: nextRunAt, last_scheduled_run_at: now }) + .where("id", "=", card.id) + .execute(); + await appendEvent( + env, + card.id, + system, + `recurring schedule queued (${cardScheduleSummary(schedule)}); next ${new Date(nextRunAt).toISOString()}`, + now + 4, + ); + queued += 1; + } + return { status: "ok", now, scanned: rows.length, queued, skipped, invalid }; +} + async function claimRunning( env: RuntimeEnv, user: User, @@ -14201,6 +14324,9 @@ async function readCards(env: RuntimeEnv): Promise { "created_at", "changed_files", "active_run_id", + "schedule_json", + "next_run_at", + "last_scheduled_run_at", ]) .orderBy("updated_at", "desc") .orderBy("created_at", "desc") @@ -14240,6 +14366,9 @@ async function readCards(env: RuntimeEnv): Promise { logs: logs.get(card.id) ?? [], changes: cardChanges(card.changed_files, ""), run: card.active_run_id ? (runs.get(card.active_run_id) ?? null) : null, + schedule: safeStoredCardSchedule(card.schedule_json), + nextRunAt: card.next_run_at, + lastScheduledRunAt: card.last_scheduled_run_at, })); } @@ -14262,6 +14391,9 @@ async function readCard(env: RuntimeEnv, id: string): Promise { "changed_files", "diff_patch", "active_run_id", + "schedule_json", + "next_run_at", + "last_scheduled_run_at", ]) .where("id", "=", id) .executeTakeFirst(); @@ -14297,6 +14429,9 @@ async function readCard(env: RuntimeEnv, id: string): Promise { ), changes: cardChanges(card.changed_files, card.diff_patch), run: card.active_run_id ? (runs.get(card.active_run_id) ?? null) : null, + schedule: safeStoredCardSchedule(card.schedule_json), + nextRunAt: card.next_run_at, + lastScheduledRunAt: card.last_scheduled_run_at, }; } diff --git a/src/recurring-cards.ts b/src/recurring-cards.ts new file mode 100644 index 0000000..50a6be9 --- /dev/null +++ b/src/recurring-cards.ts @@ -0,0 +1,52 @@ +export type CardSchedule = { + kind: "interval"; + everyMs: number; + startAt?: number | null; +}; + +const minIntervalMs = 60_000; +const maxIntervalMs = 31 * 24 * 60 * 60 * 1000; + +export function normalizeCardSchedule(input: unknown): CardSchedule | null { + if (input === undefined || input === null || input === "") return null; + if (typeof input !== "object" || Array.isArray(input)) + throw new Error("schedule must be an object"); + const data = input as Record; + const kind = String(data.kind ?? ""); + if (kind !== "interval") throw new Error("schedule.kind must be interval"); + const everyMs = Number(data.everyMs ?? data.intervalMs); + if (!Number.isFinite(everyMs) || !Number.isInteger(everyMs)) + throw new Error("schedule.everyMs must be an integer"); + if (everyMs < minIntervalMs || everyMs > maxIntervalMs) + throw new Error("schedule.everyMs must be between 60000 and 2678400000"); + const startAtValue = data.startAt ?? data.start_at; + const startAt = + startAtValue === undefined || startAtValue === null || startAtValue === "" + ? null + : Number(startAtValue); + if (startAt !== null && (!Number.isFinite(startAt) || !Number.isInteger(startAt) || startAt < 0)) + throw new Error("schedule.startAt must be a unix epoch millisecond integer"); + return { kind, everyMs, ...(startAt === null ? {} : { startAt }) }; +} + +export function parseStoredCardSchedule(value: unknown): CardSchedule | null { + const text = String(value ?? "").trim(); + if (!text) return null; + return normalizeCardSchedule(JSON.parse(text)); +} + +export function nextRecurringRunAt( + schedule: CardSchedule, + now: number, + lastScheduledRunAt: number | null, +): number { + const first = schedule.startAt ?? null; + if (lastScheduledRunAt === null && first !== null && first > now) return first; + let next = (lastScheduledRunAt ?? first ?? now) + schedule.everyMs; + while (next <= now) next += schedule.everyMs; + return next; +} + +export function cardScheduleSummary(schedule: CardSchedule): string { + return `interval every ${schedule.everyMs}ms`; +} diff --git a/tests/recurring-cards.test.ts b/tests/recurring-cards.test.ts new file mode 100644 index 0000000..e38be83 --- /dev/null +++ b/tests/recurring-cards.test.ts @@ -0,0 +1,38 @@ +import assert from "node:assert/strict"; +import { test } from "node:test"; +import { + nextRecurringRunAt, + normalizeCardSchedule, + parseStoredCardSchedule, +} from "../src/recurring-cards.ts"; + +test("normalizes interval schedules", () => { + assert.deepEqual(normalizeCardSchedule({ kind: "interval", everyMs: 86_400_000 }), { + kind: "interval", + everyMs: 86_400_000, + }); + assert.deepEqual(parseStoredCardSchedule('{"kind":"interval","everyMs":60000}'), { + kind: "interval", + everyMs: 60_000, + }); +}); + +test("rejects unsupported or unsafe schedules", () => { + assert.equal(normalizeCardSchedule(null), null); + assert.throws(() => normalizeCardSchedule({ kind: "cron", expr: "0 6 * * *" }), /kind/); + assert.throws(() => normalizeCardSchedule({ kind: "interval", everyMs: 1000 }), /between/); + assert.throws(() => normalizeCardSchedule({ kind: "interval", everyMs: 1.5 }), /integer/); +}); + +test("computes the next due interval without immediate loops", () => { + const schedule = { kind: "interval" as const, everyMs: 1_000 }; + assert.equal(nextRecurringRunAt(schedule, 10_000, null), 11_000); + assert.equal(nextRecurringRunAt(schedule, 10_000, 9_500), 10_500); + assert.equal(nextRecurringRunAt(schedule, 10_000, 5_000), 11_000); +}); + +test("honors a future startAt for first run", () => { + const schedule = { kind: "interval" as const, everyMs: 1_000, startAt: 20_000 }; + assert.equal(nextRecurringRunAt(schedule, 10_000, null), 20_000); + assert.equal(nextRecurringRunAt(schedule, 20_000, null), 21_000); +});