Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,11 @@ Owners can evaluate `CRABBOX.md` for enabled repos. Valid workflow config sets r
## Status

Deployed and actively used by OpenClaw. See [Current Boundaries](#current-boundaries) for the remaining deliberately unimplemented product behaviors.

## Recurring cards

Crabfleet supports recurring cards for operational jobs that should run on a cadence without a human repeatedly pressing Start. A card can include a schedule object such as:

{ "kind": "interval", "everyMs": 86400000 }

The Worker scheduled handler and the owner-only `/api/admin/scheduler/tick` endpoint both scan due cards, call the existing run scheduler, create a run attempt, and advance `nextRunAt`. This keeps recurrence at the card/run layer instead of using long-running loops.
12 changes: 12 additions & 0 deletions docs/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,15 @@ Every push to `main` runs checks, tests, builds, migrations, deploys, and endpoi
- [GitHub Actions Sessions](/github-actions-sessions/)
- [Native macOS Client](/macos-native-client/)
- [Fleet v2 Implementation Record](/spec-v2/)

## Recurring cards

Recurring work is modeled as a card schedule, not as an infinite process loop.

MVP schedule shape:

{ "kind": "interval", "everyMs": 86400000 }

When `nextRunAt` is due, the scheduler queues a normal run attempt for the card and computes the next due timestamp. Active runs are not duplicated; a due card with an active run is skipped until the next tick.

This allows daily operational sweeps, maintenance checks, and recurring repair jobs to remain visible in the normal card/run history.
4 changes: 4 additions & 0 deletions migrations/0020_recurring_cards.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE cards ADD COLUMN schedule_json TEXT NOT NULL DEFAULT '';
ALTER TABLE cards ADD COLUMN next_run_at INTEGER;
ALTER TABLE cards ADD COLUMN last_scheduled_run_at INTEGER;
CREATE INDEX IF NOT EXISTS idx_cards_next_run_at ON cards(next_run_at);
137 changes: 136 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ import {
githubOAuthCanonicalSshLinkUrl,
githubOAuthRedirectUri,
} from "./oauth";
import {
cardScheduleSummary,
nextRecurringRunAt,
normalizeCardSchedule,
parseStoredCardSchedule,
type CardSchedule,
} from "./recurring-cards";
import {
APP_HTML,
GHOSTTY_BROWSER_EXTERNAL_JS,
Expand Down Expand Up @@ -400,6 +407,9 @@ type Card = {
logs: string[];
changes: CardChanges;
run: RunAttempt | null;
schedule: CardSchedule | null;
nextRunAt: number | null;
lastScheduledRunAt: number | null;
};

type DiffFileStatus = "added" | "deleted" | "modified" | "renamed";
Expand Down Expand Up @@ -787,6 +797,9 @@ type CardTable = {
changed_files: string;
diff_patch: string;
active_run_id: string | null;
schedule_json: string;
next_run_at: number | null;
last_scheduled_run_at: number | null;
};

type RunAttemptTable = {
Expand Down Expand Up @@ -2029,8 +2042,14 @@ export default {
env: RuntimeEnv,
context: ExecutionContext,
): Promise<void> {
const now = Date.now();
context.waitUntil(
reconcileInteractiveSessionLifecycleBatch(env, Date.now()).catch((error) => {
runSchedulerTick(env, now).catch((error) => {
console.error("scheduled recurring card tick failed", error);
}),
);
context.waitUntil(
reconcileInteractiveSessionLifecycleBatch(env, now).catch((error) => {
console.error("scheduled interactive session reconciliation failed", error);
}),
);
Expand Down Expand Up @@ -2571,6 +2590,11 @@ async function api(
return json({ runs: await readRunsForCard(env, cardId) });
}

if (request.method === "POST" && url.pathname === "/api/admin/scheduler/tick") {
requireRole(user, "owner");
return json(await runSchedulerTick(env, Date.now()));
}

if (request.method === "PUT" && url.pathname === "/api/admin/policy") {
requireRole(user, "owner");
return json(await updatePolicy(request, env, user));
Expand Down Expand Up @@ -13591,6 +13615,7 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis
source?: string;
runtime?: string;
policy?: string;
schedule?: unknown;
}>(request);
const prompt = clean(body.prompt, 4000);
const title = clean(body.title, 140) || titleFromPrompt(prompt);
Expand All @@ -13604,6 +13629,7 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis
const source = oneOf(body.source, ["Prompt", "Issue", "PR"], "Prompt");
const runtime = oneOf(body.runtime, runtimeOptions, "auto");
const policy = resolveCardPolicy(body.policy, workflowConfig);
const schedule = parseCreateCardSchedule(body.schedule);
const owner = user.login ?? user.email ?? user.subject;
const db = database(env);
for (let attempt = 0; attempt < 3; attempt += 1) {
Expand All @@ -13628,6 +13654,9 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis
changed_files: "[]",
diff_patch: "",
active_run_id: null,
schedule_json: schedule ? JSON.stringify(schedule) : "",
next_run_at: schedule ? nextRecurringRunAt(schedule, now, null) : null,
last_scheduled_run_at: null,
})
.execute();
await db
Expand All @@ -13645,6 +13674,100 @@ async function createCard(request: Request, env: RuntimeEnv, user: User): Promis
throw new Error("failed to allocate card id");
}

function parseCreateCardSchedule(input: unknown): CardSchedule | null {
try {
return normalizeCardSchedule(input);
} catch (error) {
throw badRequest(error instanceof Error ? error.message : "invalid schedule");
}
}

function safeStoredCardSchedule(value: unknown): CardSchedule | null {
try {
return parseStoredCardSchedule(value);
} catch {
return null;
}
}

type SchedulerTickResult = {
status: "ok";
now: number;
scanned: number;
queued: number;
skipped: number;
invalid: number;
};

async function runSchedulerTick(env: RuntimeEnv, now: number): Promise<SchedulerTickResult> {
await reconcileStalledRuns(env, now);
const system = systemUser();
const rows = (
await sql<{ id: string; schedule_json: string; next_run_at: number | null }>`
SELECT id, schedule_json, next_run_at
FROM cards
WHERE schedule_json != ''
AND next_run_at IS NOT NULL
AND next_run_at <= ${now}
ORDER BY next_run_at ASC
LIMIT 25
`.execute(database(env))
).rows;
let queued = 0;
let skipped = 0;
let invalid = 0;
for (const row of rows) {
const schedule = safeStoredCardSchedule(row.schedule_json);
if (!schedule) {
invalid += 1;
await appendEvent(
env,
row.id,
system,
"recurring schedule invalid; human review required",
now,
);
continue;
}
const card = await readCard(env, row.id);
if (!card) {
skipped += 1;
continue;
}
if (card.run && activeRunStatuses.includes(card.run.status)) {
skipped += 1;
await appendEvent(
env,
card.id,
system,
"recurring schedule skipped; run already active",
now,
);
continue;
}
const claimed = await claimRunning(env, system, card, now);
if (!claimed) {
skipped += 1;
continue;
}
const nextRunAt = nextRecurringRunAt(schedule, now, now);
await database(env)
.updateTable("cards")
.set({ next_run_at: nextRunAt, last_scheduled_run_at: now })
.where("id", "=", card.id)
.execute();
await appendEvent(
env,
card.id,
system,
`recurring schedule queued (${cardScheduleSummary(schedule)}); next ${new Date(nextRunAt).toISOString()}`,
now + 4,
);
queued += 1;
}
return { status: "ok", now, scanned: rows.length, queued, skipped, invalid };
}

async function claimRunning(
env: RuntimeEnv,
user: User,
Expand Down Expand Up @@ -14201,6 +14324,9 @@ async function readCards(env: RuntimeEnv): Promise<Card[]> {
"created_at",
"changed_files",
"active_run_id",
"schedule_json",
"next_run_at",
"last_scheduled_run_at",
])
.orderBy("updated_at", "desc")
.orderBy("created_at", "desc")
Expand Down Expand Up @@ -14240,6 +14366,9 @@ async function readCards(env: RuntimeEnv): Promise<Card[]> {
logs: logs.get(card.id) ?? [],
changes: cardChanges(card.changed_files, ""),
run: card.active_run_id ? (runs.get(card.active_run_id) ?? null) : null,
schedule: safeStoredCardSchedule(card.schedule_json),
nextRunAt: card.next_run_at,
lastScheduledRunAt: card.last_scheduled_run_at,
}));
}

Expand All @@ -14262,6 +14391,9 @@ async function readCard(env: RuntimeEnv, id: string): Promise<Card | null> {
"changed_files",
"diff_patch",
"active_run_id",
"schedule_json",
"next_run_at",
"last_scheduled_run_at",
])
.where("id", "=", id)
.executeTakeFirst();
Expand Down Expand Up @@ -14297,6 +14429,9 @@ async function readCard(env: RuntimeEnv, id: string): Promise<Card | null> {
),
changes: cardChanges(card.changed_files, card.diff_patch),
run: card.active_run_id ? (runs.get(card.active_run_id) ?? null) : null,
schedule: safeStoredCardSchedule(card.schedule_json),
nextRunAt: card.next_run_at,
lastScheduledRunAt: card.last_scheduled_run_at,
};
}

Expand Down
52 changes: 52 additions & 0 deletions src/recurring-cards.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
export type CardSchedule = {
kind: "interval";
everyMs: number;
startAt?: number | null;
};

const minIntervalMs = 60_000;
const maxIntervalMs = 31 * 24 * 60 * 60 * 1000;

export function normalizeCardSchedule(input: unknown): CardSchedule | null {
if (input === undefined || input === null || input === "") return null;
if (typeof input !== "object" || Array.isArray(input))
throw new Error("schedule must be an object");
const data = input as Record<string, unknown>;
const kind = String(data.kind ?? "");
if (kind !== "interval") throw new Error("schedule.kind must be interval");
const everyMs = Number(data.everyMs ?? data.intervalMs);
if (!Number.isFinite(everyMs) || !Number.isInteger(everyMs))
throw new Error("schedule.everyMs must be an integer");
if (everyMs < minIntervalMs || everyMs > maxIntervalMs)
throw new Error("schedule.everyMs must be between 60000 and 2678400000");
const startAtValue = data.startAt ?? data.start_at;
const startAt =
startAtValue === undefined || startAtValue === null || startAtValue === ""
? null
: Number(startAtValue);
if (startAt !== null && (!Number.isFinite(startAt) || !Number.isInteger(startAt) || startAt < 0))
throw new Error("schedule.startAt must be a unix epoch millisecond integer");
return { kind, everyMs, ...(startAt === null ? {} : { startAt }) };
}

export function parseStoredCardSchedule(value: unknown): CardSchedule | null {
const text = String(value ?? "").trim();
if (!text) return null;
return normalizeCardSchedule(JSON.parse(text));
}

export function nextRecurringRunAt(
schedule: CardSchedule,
now: number,
lastScheduledRunAt: number | null,
): number {
const first = schedule.startAt ?? null;
if (lastScheduledRunAt === null && first !== null && first > now) return first;
let next = (lastScheduledRunAt ?? first ?? now) + schedule.everyMs;
while (next <= now) next += schedule.everyMs;
return next;
}

export function cardScheduleSummary(schedule: CardSchedule): string {
return `interval every ${schedule.everyMs}ms`;
}
38 changes: 38 additions & 0 deletions tests/recurring-cards.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import assert from "node:assert/strict";
import { test } from "node:test";
import {
nextRecurringRunAt,
normalizeCardSchedule,
parseStoredCardSchedule,
} from "../src/recurring-cards.ts";

test("normalizes interval schedules", () => {
assert.deepEqual(normalizeCardSchedule({ kind: "interval", everyMs: 86_400_000 }), {
kind: "interval",
everyMs: 86_400_000,
});
assert.deepEqual(parseStoredCardSchedule('{"kind":"interval","everyMs":60000}'), {
kind: "interval",
everyMs: 60_000,
});
});

test("rejects unsupported or unsafe schedules", () => {
assert.equal(normalizeCardSchedule(null), null);
assert.throws(() => normalizeCardSchedule({ kind: "cron", expr: "0 6 * * *" }), /kind/);
assert.throws(() => normalizeCardSchedule({ kind: "interval", everyMs: 1000 }), /between/);
assert.throws(() => normalizeCardSchedule({ kind: "interval", everyMs: 1.5 }), /integer/);
});

test("computes the next due interval without immediate loops", () => {
const schedule = { kind: "interval" as const, everyMs: 1_000 };
assert.equal(nextRecurringRunAt(schedule, 10_000, null), 11_000);
assert.equal(nextRecurringRunAt(schedule, 10_000, 9_500), 10_500);
assert.equal(nextRecurringRunAt(schedule, 10_000, 5_000), 11_000);
});

test("honors a future startAt for first run", () => {
const schedule = { kind: "interval" as const, everyMs: 1_000, startAt: 20_000 };
assert.equal(nextRecurringRunAt(schedule, 10_000, null), 20_000);
assert.equal(nextRecurringRunAt(schedule, 20_000, null), 21_000);
});