From 28ca43931a77aa64ca5c8c0734d1f3b3154c2f20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Victor=20Negreiros?= Date: Fri, 12 Dec 2025 09:18:44 -0300 Subject: [PATCH] synced-cron - add check for stuck jobs --- synced-cron/CHANGELOG.md | 10 + synced-cron/README.md | 57 ++++- synced-cron/package.js | 2 +- synced-cron/synced-cron-server.js | 107 +++++++++ synced-cron/synced-cron-tests.js | 383 ++++++++++++++++++++++++++++++ 5 files changed, 557 insertions(+), 2 deletions(-) diff --git a/synced-cron/CHANGELOG.md b/synced-cron/CHANGELOG.md index 8764ed5..77c6f8c 100644 --- a/synced-cron/CHANGELOG.md +++ b/synced-cron/CHANGELOG.md @@ -1,6 +1,16 @@ # Changelog +## 2.2.2 (2025-12-12) + +- Add native stuck jobs detection and cleanup functionality: + - New `stuckJobsThreshold` option: Time in milliseconds to consider a job as stuck (default: 15 minutes). + - New `onStuckJobFound` callback option: Called for each stuck job found with `{ job, runningTimeMs }`. + - New `checkStuckJobsSchedule` option: Schedule function (using Later.js parser) to automatically check for stuck jobs. + - New `SyncedCron.checkStuckJobs(options)` method to manually check and remove stuck jobs. + - Automatic stuck jobs check is distributed-safe (only one server instance runs the check at a time). + - Add tests for the new functionality. + ## 2.2.1 (2024-12-13) - Add automatic cleanup of running jobs when the process encounters a fatal error (uncaught exceptions and unhandled rejections). The cleanup consists of marking the job as finished and adding a `terminatedBy` field to the job history collection to indicate how the job was terminated. diff --git a/synced-cron/README.md b/synced-cron/README.md index d76abc5..5becf98 100644 --- a/synced-cron/README.md +++ b/synced-cron/README.md @@ -106,10 +106,65 @@ You can configure SyncedCron with the `config` method. Defaults are: run `db.cronHistory.dropIndex({startedAt: 1})`) and re-run your project. SyncedCron will recreate the index with the updated TTL. */ - collectionTTL: 172800 + collectionTTL: 172800, + + // Time in milliseconds to consider a job as stuck (default: 15 minutes) + stuckJobsThreshold: 15 * 60 * 1000, + + // Callback function called for each stuck job found + // Receives an object with: { job, runningTimeMs } + onStuckJobFound: null, + + // Schedule function for automatic stuck jobs check (using Later.js parser) + // Set to null to disable automatic checking + checkStuckJobsSchedule: null }); ``` +### Stuck Jobs Detection + +SyncedCron can automatically detect and clean up jobs that appear to be stuck (running longer than expected without finishing). This is useful for recovering from crashed processes or hung jobs. + +#### Automatic Check + +To enable automatic stuck jobs checking, configure a schedule: + +```js +SyncedCron.config({ + // Jobs running longer than 15 minutes without finishing are considered stuck + stuckJobsThreshold: 15 * 60 * 1000, + + // Check for stuck jobs every 5 minutes + checkStuckJobsSchedule: (parser) => parser.text('every 5 minutes'), + + // Optional: handle each stuck job found + onStuckJobFound: ({ job, runningTimeMs }) => { + console.log(`Found stuck job: ${job.name}, running for ${runningTimeMs}ms`); + } +}); + +SyncedCron.start(); +``` + +The automatic check is distributed-safe - only one server instance will run the check at a time. + +#### Manual Check + +You can also manually check for stuck jobs: + +```js +const result = await SyncedCron.checkStuckJobs(); +// result: { found: 2, removed: 2, stuckJobs: [...] } + +// Or with custom options: +const result = await SyncedCron.checkStuckJobs({ + stuckJobsThreshold: 30 * 60 * 1000, // 30 minutes + onStuckJobFound: ({ job, runningTimeMs }) => { + // Handle each stuck job + } +}); +``` + ### Logging SyncedCron uses Meteor's `logging` package by default. If you want to use your own logger (for sending to other consumers or similar) you can do so by configuring the `logger` option. diff --git a/synced-cron/package.js b/synced-cron/package.js index bc2c2b5..edaa532 100644 --- a/synced-cron/package.js +++ b/synced-cron/package.js @@ -1,7 +1,7 @@ Package.describe({ summary: 'Allows you to define and run scheduled jobs across multiple servers.', - version: '2.2.1', + version: '2.2.2', name: 'quave:synced-cron', git: 'https://github.com/quavedev/meteor-synced-cron.git', }); diff --git a/synced-cron/synced-cron-server.js b/synced-cron/synced-cron-server.js index 828fb1c..d64e68c 100644 --- a/synced-cron/synced-cron-server.js +++ b/synced-cron/synced-cron-server.js @@ -3,6 +3,7 @@ SyncedCron = { _entries: {}, running: false, processId: Random.id(), + _stuckJobsCheckName: '__SyncedCron_checkStuckJobs__', options: { //Log job run details to console log: true, @@ -20,6 +21,19 @@ SyncedCron = { //NOTE: Unset to remove expiry but ensure you remove the index from //mongo by hand collectionTTL: 172800, + + // Time in milliseconds to consider a job as stuck (default: 15 minutes) + // Jobs running longer than this without finishing will be considered stuck + stuckJobsThreshold: 15 * 60 * 1000, + + // Callback function called for each stuck job found + // Receives an object with: { job, runningTimeMs } + onStuckJobFound: null, + + // Schedule function for automatic stuck jobs check (using Later.js parser) + // Example: (parser) => parser.text('every 5 minutes') + // Set to null to disable automatic checking + checkStuckJobsSchedule: null, }, config: function (opts) { this.options = Object.assign({}, this.options, opts); @@ -233,9 +247,40 @@ SyncedCron.start = function () { scheduleEntry(entry); }); self.running = true; + + // Start automatic stuck jobs check if configured + self._startStuckJobsCheck(); }); }; +// Start the automatic stuck jobs check as a cron job +SyncedCron._startStuckJobsCheck = function () { + const schedule = this.options.checkStuckJobsSchedule; + + // Remove any existing stuck jobs check + this._stopStuckJobsCheck(); + + if (typeof schedule === 'function') { + const self = this; + + this.add({ + name: this._stuckJobsCheckName, + schedule: schedule, + persist: false, + job: async function () { + await self.checkStuckJobs(); + }, + }); + } +}; + +// Stop the automatic stuck jobs check +SyncedCron._stopStuckJobsCheck = function () { + if (this._entries[this._stuckJobsCheckName]) { + this.remove(this._stuckJobsCheckName); + } +}; + // Return the next scheduled date of the first matching entry or undefined SyncedCron.nextScheduledAtDate = function (jobName) { const entry = this._entries[jobName]; @@ -262,6 +307,7 @@ SyncedCron.pause = function () { Object.values(this._entries).forEach(function pauseEntry(entry) { entry._timer.clear(); }); + this._stopStuckJobsCheck(); this.running = false; } }; @@ -271,9 +317,69 @@ SyncedCron.stop = function () { Object.values(this._entries).forEach(function stopEntry(entry) { SyncedCron.remove(entry.name); }); + this._stopStuckJobsCheck(); this.running = false; }; +// Check for and handle stuck jobs +// Returns an object with { found: number, removed: number, stuckJobs: Array } +SyncedCron.checkStuckJobs = async function (options = {}) { + const threshold = options.stuckJobsThreshold ?? this.options.stuckJobsThreshold; + const onStuckJobFound = options.onStuckJobFound ?? this.options.onStuckJobFound; + + const thresholdDate = new Date(Date.now() - threshold); + const selector = { + startedAt: { $lt: thresholdDate }, + finishedAt: { $exists: false }, + }; + + const stuckJobs = await this._collection.find(selector).fetchAsync(); + + if (!stuckJobs.length) { + log.info('No stuck jobs found'); + return { found: 0, removed: 0, stuckJobs: [] }; + } + + log.warn(`Found ${stuckJobs.length} stuck job(s)`); + + const now = Date.now(); + + // Process each stuck job + for (const job of stuckJobs) { + const runningTimeMs = now - job.startedAt.getTime(); + const runningTimeMinutes = Math.floor(runningTimeMs / 60000); + + log.warn( + `Stuck job found - Name: ${job.name}, ` + + `Started at: ${job.startedAt.toISOString()}, ` + + `Running time: ${runningTimeMinutes} minutes, ` + + `Job ID: ${job._id}` + ); + + // Call the onStuckJobFound callback if provided + if (typeof onStuckJobFound === 'function') { + try { + await onStuckJobFound({ job, runningTimeMs }) + } catch (e) { + log.error(`Error in onStuckJobFound callback for job "${job.name}": ${e && e.stack ? e.stack : e}`); + } + } + } + + // Remove stuck jobs from the collection + const result = await this._collection.removeAsync({ + _id: { $in: stuckJobs.map((job) => job._id) }, + }); + + log.info(`Finished stuck jobs check. Total removed: ${result} job(s)`); + + return { + found: stuckJobs.length, + removed: result, + stuckJobs, + }; +}; + // The meat of our logic. Checks if the specified has already run. If not, // records that it's running the job, runs it, and records the output SyncedCron._entryWrapper = function (entry) { @@ -383,6 +489,7 @@ SyncedCron._entryWrapper = function (entry) { // for tests SyncedCron._reset = async function () { + this._stopStuckJobsCheck(); this._entries = {}; await this._collection.removeAsync({}); this.running = false; diff --git a/synced-cron/synced-cron-tests.js b/synced-cron/synced-cron-tests.js index a156c58..947687f 100644 --- a/synced-cron/synced-cron-tests.js +++ b/synced-cron/synced-cron-tests.js @@ -770,3 +770,386 @@ Tinytest.addAsync( restoreProcessExit(); } ); + +// Stuck jobs tests +Tinytest.addAsync( + 'checkStuckJobs: finds and removes stuck jobs', + async (test) => { + await SyncedCron._reset(); + + // Insert a job that started 20 minutes ago without finishedAt + const twentyMinutesAgo = new Date(Date.now() - 20 * 60 * 1000); + const stuckJob = { + name: 'Stuck Job', + intendedAt: twentyMinutesAgo, + startedAt: twentyMinutesAgo, + processId: 'some-process-id', + }; + + await SyncedCron._collection.insertAsync(stuckJob); + + // Check there is 1 job + test.equal(await SyncedCron._collection.find().countAsync(), 1); + + // Run checkStuckJobs with 15 minute threshold (default) + const result = await SyncedCron.checkStuckJobs(); + + test.equal(result.found, 1, 'Should find 1 stuck job'); + test.equal(result.removed, 1, 'Should remove 1 stuck job'); + test.equal(result.stuckJobs.length, 1, 'Should return 1 stuck job'); + test.equal(result.stuckJobs[0].name, 'Stuck Job', 'Stuck job name should match'); + + // Check the job was removed + test.equal(await SyncedCron._collection.find().countAsync(), 0); + } +); + +Tinytest.addAsync( + 'checkStuckJobs: does not remove jobs within threshold', + async (test) => { + await SyncedCron._reset(); + + // Insert a job that started 5 minutes ago without finishedAt + const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); + const recentJob = { + name: 'Recent Job', + intendedAt: fiveMinutesAgo, + startedAt: fiveMinutesAgo, + processId: 'some-process-id', + }; + + await SyncedCron._collection.insertAsync(recentJob); + + // Check there is 1 job + test.equal(await SyncedCron._collection.find().countAsync(), 1); + + // Run checkStuckJobs with 15 minute threshold (default) + const result = await SyncedCron.checkStuckJobs(); + + test.equal(result.found, 0, 'Should find 0 stuck jobs'); + test.equal(result.removed, 0, 'Should remove 0 stuck jobs'); + + // Check the job was NOT removed + test.equal(await SyncedCron._collection.find().countAsync(), 1); + } +); + +Tinytest.addAsync( + 'checkStuckJobs: respects custom threshold option', + async (test) => { + await SyncedCron._reset(); + + // Insert a job that started 5 minutes ago without finishedAt + const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); + const recentJob = { + name: 'Recent Job', + intendedAt: fiveMinutesAgo, + startedAt: fiveMinutesAgo, + processId: 'some-process-id', + }; + + await SyncedCron._collection.insertAsync(recentJob); + + // Run checkStuckJobs with 3 minute threshold + const result = await SyncedCron.checkStuckJobs({ + stuckJobsThreshold: 3 * 60 * 1000, + }); + + test.equal(result.found, 1, 'Should find 1 stuck job with custom threshold'); + test.equal(result.removed, 1, 'Should remove 1 stuck job'); + + // Check the job was removed + test.equal(await SyncedCron._collection.find().countAsync(), 0); + } +); + +Tinytest.addAsync( + 'checkStuckJobs: calls onStuckJobFound callback for each stuck job', + async (test) => { + await SyncedCron._reset(); + + // Insert two stuck jobs + const twentyMinutesAgo = new Date(Date.now() - 20 * 60 * 1000); + const thirtyMinutesAgo = new Date(Date.now() - 30 * 60 * 1000); + + await SyncedCron._collection.insertAsync({ + name: 'Stuck Job 1', + intendedAt: twentyMinutesAgo, + startedAt: twentyMinutesAgo, + processId: 'process-1', + }); + + await SyncedCron._collection.insertAsync({ + name: 'Stuck Job 2', + intendedAt: thirtyMinutesAgo, + startedAt: thirtyMinutesAgo, + processId: 'process-2', + }); + + const callbackCalls = []; + + const result = await SyncedCron.checkStuckJobs({ + onStuckJobFound: ({ job, runningTimeMs }) => { + callbackCalls.push({ jobName: job.name, runningTimeMs }); + }, + }); + + test.equal(result.found, 2, 'Should find 2 stuck jobs'); + test.equal(callbackCalls.length, 2, 'onStuckJobFound should be called twice'); + + // Check callback received correct data + const jobNames = callbackCalls.map(c => c.jobName).sort(); + test.equal(jobNames[0], 'Stuck Job 1'); + test.equal(jobNames[1], 'Stuck Job 2'); + + // Check running times are roughly correct (within 1 minute tolerance) + callbackCalls.forEach(call => { + test.isTrue(call.runningTimeMs > 15 * 60 * 1000, 'Running time should be greater than 15 minutes'); + }); + } +); + +Tinytest.addAsync( + 'checkStuckJobs: uses onStuckJobFound from global options', + async (test) => { + await SyncedCron._reset(); + + const twentyMinutesAgo = new Date(Date.now() - 20 * 60 * 1000); + await SyncedCron._collection.insertAsync({ + name: 'Stuck Job Global', + intendedAt: twentyMinutesAgo, + startedAt: twentyMinutesAgo, + processId: 'process-1', + }); + + let globalCallbackCalled = false; + const originalOnStuckJobFound = SyncedCron.options.onStuckJobFound; + + SyncedCron.options.onStuckJobFound = ({ job }) => { + globalCallbackCalled = true; + test.equal(job.name, 'Stuck Job Global'); + }; + + await SyncedCron.checkStuckJobs(); + + test.isTrue(globalCallbackCalled, 'Global onStuckJobFound should be called'); + + // Restore original option + SyncedCron.options.onStuckJobFound = originalOnStuckJobFound; + } +); + +Tinytest.addAsync( + 'checkStuckJobs: does not remove finished jobs', + async (test) => { + await SyncedCron._reset(); + + // Insert a job that finished + const twentyMinutesAgo = new Date(Date.now() - 20 * 60 * 1000); + const finishedJob = { + name: 'Finished Job', + intendedAt: twentyMinutesAgo, + startedAt: twentyMinutesAgo, + finishedAt: new Date(Date.now() - 19 * 60 * 1000), + processId: 'some-process-id', + result: 'completed', + }; + + await SyncedCron._collection.insertAsync(finishedJob); + + const result = await SyncedCron.checkStuckJobs(); + + test.equal(result.found, 0, 'Should not find finished jobs as stuck'); + test.equal(await SyncedCron._collection.find().countAsync(), 1, 'Finished job should remain'); + } +); + +Tinytest.addAsync( + 'checkStuckJobs: handles async onStuckJobFound callback', + async (test) => { + await SyncedCron._reset(); + + const twentyMinutesAgo = new Date(Date.now() - 20 * 60 * 1000); + await SyncedCron._collection.insertAsync({ + name: 'Stuck Job Async', + intendedAt: twentyMinutesAgo, + startedAt: twentyMinutesAgo, + processId: 'process-1', + }); + + let asyncCallbackCompleted = false; + + const result = await SyncedCron.checkStuckJobs({ + onStuckJobFound: async ({ job }) => { + await new Promise(resolve => setTimeout(resolve, 50)); + asyncCallbackCompleted = true; + }, + }); + + test.isTrue(asyncCallbackCompleted, 'Async callback should complete'); + test.equal(result.found, 1, 'Should find 1 stuck job'); + } +); + +Tinytest.addAsync( + 'checkStuckJobs: continues processing if callback throws', + async (test) => { + await SyncedCron._reset(); + + // Insert two stuck jobs + const twentyMinutesAgo = new Date(Date.now() - 20 * 60 * 1000); + await SyncedCron._collection.insertAsync({ + name: 'Stuck Job Error 1', + intendedAt: twentyMinutesAgo, + startedAt: twentyMinutesAgo, + processId: 'process-1', + }); + + await SyncedCron._collection.insertAsync({ + name: 'Stuck Job Error 2', + intendedAt: new Date(Date.now() - 25 * 60 * 1000), + startedAt: new Date(Date.now() - 25 * 60 * 1000), + processId: 'process-2', + }); + + let callCount = 0; + + const result = await SyncedCron.checkStuckJobs({ + onStuckJobFound: ({ job }) => { + callCount++; + if (callCount === 1) { + throw new Error('Callback error'); + } + }, + }); + + test.equal(callCount, 2, 'Callback should be called for both jobs'); + test.equal(result.found, 2, 'Should find 2 stuck jobs'); + test.equal(result.removed, 2, 'Should remove both jobs despite callback error'); + } +); + +// Automatic stuck jobs check tests +Tinytest.addAsync( + 'checkStuckJobsSchedule: registers job when configured', + async (test) => { + await SyncedCron._reset(); + + const originalSchedule = SyncedCron.options.checkStuckJobsSchedule; + + // Set a schedule + SyncedCron.options.checkStuckJobsSchedule = (parser) => parser.text('every 1 second'); + + SyncedCron._startStuckJobsCheck(); + + // Check that the job was registered + const entry = SyncedCron._entries[SyncedCron._stuckJobsCheckName]; + test.isNotUndefined(entry, 'Stuck jobs check entry should be registered'); + test.equal(entry.persist, false, 'Stuck jobs check should not persist'); + + // Cleanup + SyncedCron._stopStuckJobsCheck(); + SyncedCron.options.checkStuckJobsSchedule = originalSchedule; + } +); + +Tinytest.addAsync( + 'checkStuckJobsSchedule: does not register when not configured', + async (test) => { + await SyncedCron._reset(); + + const originalSchedule = SyncedCron.options.checkStuckJobsSchedule; + SyncedCron.options.checkStuckJobsSchedule = null; + + SyncedCron._startStuckJobsCheck(); + + // Check that no job was registered + const entry = SyncedCron._entries[SyncedCron._stuckJobsCheckName]; + test.isUndefined(entry, 'Stuck jobs check entry should not be registered when disabled'); + + // Cleanup + SyncedCron.options.checkStuckJobsSchedule = originalSchedule; + } +); + +Tinytest.addAsync( + 'checkStuckJobsSchedule: removes job on stop', + async (test) => { + await SyncedCron._reset(); + + const originalSchedule = SyncedCron.options.checkStuckJobsSchedule; + SyncedCron.options.checkStuckJobsSchedule = (parser) => parser.text('every 1 second'); + + SyncedCron._startStuckJobsCheck(); + test.isNotUndefined(SyncedCron._entries[SyncedCron._stuckJobsCheckName], 'Job should be registered'); + + SyncedCron._stopStuckJobsCheck(); + test.isUndefined(SyncedCron._entries[SyncedCron._stuckJobsCheckName], 'Job should be removed on stop'); + + // Cleanup + SyncedCron.options.checkStuckJobsSchedule = originalSchedule; + } +); + +Tinytest.addAsync( + 'checkStuckJobsSchedule: job executes and removes stuck jobs', + async (test) => { + await SyncedCron._reset(); + + const originalSchedule = SyncedCron.options.checkStuckJobsSchedule; + const originalThreshold = SyncedCron.options.stuckJobsThreshold; + + SyncedCron.options.checkStuckJobsSchedule = (parser) => parser.text('every 1 second'); + SyncedCron.options.stuckJobsThreshold = 100; // 100ms + + // Insert a stuck job + const twoSecondsAgo = new Date(Date.now() - 2000); + await SyncedCron._collection.insertAsync({ + name: 'Auto Check Stuck Job', + intendedAt: twoSecondsAgo, + startedAt: twoSecondsAgo, + processId: 'process-1', + }); + + // Manually run the job entry wrapper to simulate execution + SyncedCron._startStuckJobsCheck(); + const entry = SyncedCron._entries[SyncedCron._stuckJobsCheckName]; + + // Execute the job directly + await entry.job(); + + // The stuck job should have been removed + const count = await SyncedCron._collection.find().countAsync(); + test.equal(count, 0, 'Stuck job should be removed by check job'); + + // Cleanup + SyncedCron._stopStuckJobsCheck(); + SyncedCron.options.checkStuckJobsSchedule = originalSchedule; + SyncedCron.options.stuckJobsThreshold = originalThreshold; + } +); + +Tinytest.addAsync( + 'checkStuckJobsSchedule: replaces previous job when restarted', + async (test) => { + await SyncedCron._reset(); + + const originalSchedule = SyncedCron.options.checkStuckJobsSchedule; + SyncedCron.options.checkStuckJobsSchedule = (parser) => parser.text('every 1 second'); + + SyncedCron._startStuckJobsCheck(); + const firstEntry = SyncedCron._entries[SyncedCron._stuckJobsCheckName]; + test.isNotUndefined(firstEntry, 'First entry should be registered'); + + // Change schedule and restart + SyncedCron.options.checkStuckJobsSchedule = (parser) => parser.text('every 5 seconds'); + SyncedCron._startStuckJobsCheck(); + + const secondEntry = SyncedCron._entries[SyncedCron._stuckJobsCheckName]; + test.isNotUndefined(secondEntry, 'Second entry should be registered'); + + // Cleanup + SyncedCron._stopStuckJobsCheck(); + SyncedCron.options.checkStuckJobsSchedule = originalSchedule; + } +);