Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 100 additions & 8 deletions server/services/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,28 +664,56 @@ describe("daily metrics cleanup", () => {
consoleSpy.mockRestore();
});

it("logs error when cleanup query fails", async () => {
it("logs error when cleanup fails with non-transient DB error", async () => {
await startScheduler();
mockDbExecute.mockRejectedValueOnce(new Error("DB timeout"));
mockDbExecute.mockRejectedValueOnce(new Error('relation "monitor_metrics" does not exist'));
await runCron("0 3 * * *");

expect(ErrorLogger.error).toHaveBeenCalledWith(
"scheduler",
"monitor_metrics cleanup failed",
expect.any(Error),
expect.objectContaining({
errorMessage: "DB timeout",
errorMessage: 'relation "monitor_metrics" does not exist',
retentionDays: 90,
table: "monitor_metrics",
})
);
});

it("logs warning when cleanup fails with transient DB error", async () => {
await startScheduler();
mockDbExecute
.mockRejectedValueOnce(new Error("Connection terminated"))
.mockRejectedValueOnce(new Error("Connection terminated"));
const cronPromise = runCron("0 3 * * *");
await vi.advanceTimersByTimeAsync(2000);
await cronPromise;

expect(ErrorLogger.warning).toHaveBeenCalledWith(
"scheduler",
"monitor_metrics cleanup failed (transient, will retry)",
expect.objectContaining({
errorMessage: "Connection terminated",
retentionDays: 90,
table: "monitor_metrics",
})
);
// Verify the monitor_metrics cleanup itself didn't log an error (other cleanup tasks may)
expect(ErrorLogger.error).not.toHaveBeenCalledWith(
"scheduler",
"monitor_metrics cleanup failed",
expect.anything(),
expect.anything()
);
});
Comment on lines +684 to +709
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Tighten transient cleanup assertions (retry + no error log).

This case currently validates only the warning payload. It still passes if retry is removed or if ErrorLogger.error is also emitted.

Suggested assertion tightening
     expect(ErrorLogger.warning).toHaveBeenCalledWith(
       "scheduler",
       "monitor_metrics cleanup failed (transient, will retry)",
       expect.objectContaining({
         errorMessage: "Connection terminated",
         retentionDays: 90,
         table: "monitor_metrics",
       })
     );
+    expect(mockDbExecute).toHaveBeenCalledTimes(2);
+    expect(ErrorLogger.error).not.toHaveBeenCalled();

As per coding guidelines, **/*.test.ts must have specific assertions for error paths.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/services/scheduler.test.ts` around lines 684 - 702, The test currently
only checks ErrorLogger.warning payload; tighten it to also assert the retry
happened and no fatal error was logged: after running startScheduler()/runCron
and advancing timers, assert that mockDbExecute was called twice
(expect(mockDbExecute).toHaveBeenCalledTimes(2)) to verify the retry, and assert
ErrorLogger.error was not called
(expect(ErrorLogger.error).not.toHaveBeenCalled()). Keep the existing
ErrorLogger.warning assertion and the mocked rejections on mockDbExecute to
preserve the transient-failure setup.


it("handles non-Error thrown in cleanup (uses String coercion)", async () => {
await startScheduler();
mockDbExecute.mockRejectedValueOnce("disk full");
await runCron("0 3 * * *");

// Non-Error values are not transient, so logged as error
expect(ErrorLogger.error).toHaveBeenCalledWith(
"scheduler",
"monitor_metrics cleanup failed",
Expand Down Expand Up @@ -820,6 +848,42 @@ describe("notification queue and digest cron (*/1 * * * *)", () => {
})
);
});

it("logs warning (not error) when processQueuedNotifications fails with transient DB error", async () => {
// Not wrapped in withDbRetry (to prevent duplicate deliveries), but
// logSchedulerError still classifies transient errors as warnings.
mockProcessQueuedNotifications
.mockRejectedValueOnce(new Error("Connection terminated"));

await startScheduler();
await runCron("*/1 * * * *");

expect(ErrorLogger.warning).toHaveBeenCalledWith(
"scheduler",
expect.stringContaining("Queued notification processing failed (transient, will retry)"),
expect.objectContaining({
errorMessage: "Connection terminated",
})
);
expect(ErrorLogger.error).not.toHaveBeenCalled();
});

it("logs warning (not error) when processDigestCron fails with transient DB error", async () => {
mockProcessDigestCron
.mockRejectedValueOnce(new Error("Connection terminated"));

await startScheduler();
await runCron("*/1 * * * *");

expect(ErrorLogger.warning).toHaveBeenCalledWith(
"scheduler",
expect.stringContaining("Digest processing failed (transient, will retry)"),
expect.objectContaining({
errorMessage: "Connection terminated",
})
);
expect(ErrorLogger.error).not.toHaveBeenCalled();
});
});

describe("stopScheduler", () => {
Expand Down Expand Up @@ -931,7 +995,7 @@ describe("withDbRetry and re-entrancy guards", () => {
);
});

it("logs error when retry also fails on transient error", async () => {
it("logs warning when retry also fails on transient error", async () => {
mockGetAllActiveMonitors
.mockRejectedValueOnce(new Error("Connection terminated"))
.mockRejectedValueOnce(new Error("Connection terminated again"));
Expand All @@ -942,11 +1006,11 @@ describe("withDbRetry and re-entrancy guards", () => {
await cronPromise;

expect(mockGetAllActiveMonitors).toHaveBeenCalledTimes(2);
expect(ErrorLogger.error).toHaveBeenCalledWith(
// Transient DB errors are downgraded to warnings via logSchedulerError helper
expect(ErrorLogger.warning).toHaveBeenCalledWith(
"scheduler",
"Scheduler iteration failed",
expect.any(Error),
expect.objectContaining({ phase: "fetching active monitors" })
expect.stringContaining("Scheduler iteration failed (transient, will retry)"),
expect.objectContaining({ activeChecks: 0 })
);
});

Expand Down Expand Up @@ -1045,6 +1109,34 @@ describe("withDbRetry and re-entrancy guards", () => {
resolveRetries([]);
await firstRun;
});

it("logs warning (not error) when webhook processing fails with transient DB error", async () => {
// Both withDbRetry attempts fail with transient error
mockStorage.getPendingWebhookRetries
.mockRejectedValueOnce(new Error("Connection terminated"))
.mockRejectedValueOnce(new Error("Connection terminated"));

await startScheduler();
const callbacks = cronCallbacks["*/1 * * * *"];
await callbacks[0](); // notification cron
const webhookPromise = callbacks[1]();
await vi.advanceTimersByTimeAsync(2000);
await webhookPromise;

expect(ErrorLogger.warning).toHaveBeenCalledWith(
"scheduler",
expect.stringContaining("Webhook retry processing failed (transient, will retry)"),
expect.objectContaining({
errorMessage: "Connection terminated",
})
);
expect(ErrorLogger.error).not.toHaveBeenCalledWith(
"scheduler",
expect.stringContaining("Webhook"),
expect.anything(),
expect.anything()
);
});
});

describe("webhook retry cumulative backoff", () => {
Expand Down
95 changes: 42 additions & 53 deletions server/services/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ErrorLogger } from "./logger";
import { notificationTablesExist } from "./notificationReady";
import { browserlessCircuitBreaker } from "./browserlessCircuitBreaker";
import { ensureMonitorConditionsTable } from "./ensureTables";
import { isTransientDbError } from "../utils/dbErrors";
import { db } from "../db";
import { sql } from "drizzle-orm";

Expand All @@ -21,25 +22,6 @@ let schedulerStarted = false;
const cronTasks: ReturnType<typeof cron.schedule>[] = [];
const pendingTimeouts = new Set<ReturnType<typeof setTimeout>>();

/**
* Transient DB errors that are safe to retry (connection drops, pool exhaustion).
* Checks both PostgreSQL error codes (stable across driver versions) and message
* substrings (fallback for connection-level errors that lack a code).
*/
function isTransientDbError(err: unknown): boolean {
if (!(err instanceof Error)) return false;
// PostgreSQL error codes: 08xxx = connection exceptions, 57P01 = admin shutdown
const code = (err as any).code;
if (typeof code === "string" && (/^08/.test(code) || code === "57P01")) return true;
const msg = err.message.toLowerCase();
return msg.includes("connection terminated")
|| msg.includes("connection timeout")
|| msg.includes("connection refused")
|| msg.includes("econnreset")
|| msg.includes("econnrefused")
|| msg.includes("cannot acquire")
|| msg.includes("timeout expired");
}

/** Retry a DB operation once after a 1 s delay on transient connection errors. */
async function withDbRetry<T>(fn: () => Promise<T>): Promise<T> {
Expand All @@ -56,6 +38,30 @@ async function withDbRetry<T>(fn: () => Promise<T>): Promise<T> {
}
}

/** Log a caught error as warning (transient) or error (non-transient) based on isTransientDbError. */
async function logSchedulerError(
message: string,
error: unknown,
context?: Record<string, any>,
): Promise<void> {
try {
if (isTransientDbError(error)) {
await ErrorLogger.warning("scheduler", `${message} (transient, will retry)`, {
errorMessage: error instanceof Error ? error.message : String(error),
...context,
});
} else {
await ErrorLogger.error("scheduler", message, error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
...context,
});
}
} catch {
// If logging itself fails (e.g., logging DB also down), don't mask the original error
console.error(`[Scheduler] Failed to log error: ${message}`, error instanceof Error ? error.message : error);
}
}

/** Schedule a callback with automatic cleanup from pendingTimeouts when it fires. */
function trackTimeout(callback: () => void, delayMs: number): ReturnType<typeof setTimeout> {
const handle = setTimeout(() => {
Expand Down Expand Up @@ -227,11 +233,7 @@ export async function startScheduler() {
}
}
} catch (error) {
await ErrorLogger.error("scheduler", "Scheduler iteration failed", error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
activeChecks,
phase: "fetching active monitors",
});
await logSchedulerError("Scheduler iteration failed", error, { activeChecks, phase: "fetching active monitors" });
} finally {
mainCronRunning = false;
}
Expand All @@ -249,18 +251,17 @@ export async function startScheduler() {
notificationCronRunning = true;
try {
try {
// Not wrapped in withDbRetry: these functions deliver notifications
// before marking entries as delivered. Retrying the entire function
// could cause duplicate email/webhook/Slack deliveries.
await processQueuedNotifications();
} catch (error) {
await ErrorLogger.error("scheduler", "Queued notification processing failed", error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
});
await logSchedulerError("Queued notification processing failed", error);
}
try {
await processDigestCron();
} catch (error) {
await ErrorLogger.error("scheduler", "Digest processing failed", error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
});
await logSchedulerError("Digest processing failed", error);
}
} finally {
notificationCronRunning = false;
Expand Down Expand Up @@ -361,62 +362,50 @@ export async function startScheduler() {
}
}
} catch (error) {
await ErrorLogger.error("scheduler", "Webhook retry processing failed", error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
});
await logSchedulerError("Webhook retry processing failed", error);
} finally {
webhookCronRunning = false;
}
}));
}

// Daily cleanup: prune monitor_metrics older than 90 days to prevent unbounded growth
// All cleanup operations are best-effort background tasks — transient DB failures
// are logged as warnings since the next daily run will catch up.
cronTasks.push(cron.schedule("0 3 * * *", async () => {
try {
const result = await db.execute(
const result = await withDbRetry(() => db.execute(
sql`DELETE FROM monitor_metrics WHERE checked_at < NOW() - INTERVAL '90 days'`
);
));
const deleted = (result as any).rowCount ?? 0;
if (deleted > 0) {
console.log(`[Cleanup] Pruned ${deleted} monitor_metrics rows older than 90 days`);
}
} catch (error) {
await ErrorLogger.error("scheduler", "monitor_metrics cleanup failed", error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
retentionDays: 90,
table: "monitor_metrics",
});
await logSchedulerError("monitor_metrics cleanup failed", error, { retentionDays: 90, table: "monitor_metrics" });
}

// Delivery log cleanup: prune entries older than 30 days
try {
const olderThan = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const entriesDeleted = await storage.cleanupOldDeliveryLogs(olderThan);
const entriesDeleted = await withDbRetry(() => storage.cleanupOldDeliveryLogs(olderThan));
if (entriesDeleted > 0) {
console.log(`[Cleanup] Pruned ${entriesDeleted} delivery_log rows older than 30 days`);
}
} catch (error) {
await ErrorLogger.error("scheduler", "delivery_log cleanup failed", error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
retentionDays: 30,
table: "delivery_log",
});
await logSchedulerError("delivery_log cleanup failed", error, { retentionDays: 30, table: "delivery_log" });
}

// Notification queue cleanup: prune permanently failed entries older than 7 days
try {
const deleted = await storage.cleanupPermanentlyFailedQueueEntries(
const deleted = await withDbRetry(() => storage.cleanupPermanentlyFailedQueueEntries(
new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
);
));
if (deleted > 0) {
console.log(`[Cleanup] Pruned ${deleted} permanently failed notification_queue rows older than 7 days`);
}
} catch (error) {
await ErrorLogger.error("scheduler", "notification_queue cleanup failed", error instanceof Error ? error : null, {
errorMessage: error instanceof Error ? error.message : String(error),
retentionDays: 7,
table: "notification_queue",
});
await logSchedulerError("notification_queue cleanup failed", error, { retentionDays: 7, table: "notification_queue" });
}
}));

Expand Down
Loading
Loading