Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions synced-cron/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# Changelog


## 2.2.2 (2025-12-12)

- Add native stuck jobs detection and cleanup functionality:
- New `stuckJobsThreshold` option: Time in milliseconds to consider a job as stuck (default: 15 minutes).
- New `onStuckJobFound` callback option: Called for each stuck job found with `{ job, runningTimeMs }`.
- New `checkStuckJobsSchedule` option: Schedule function (using Later.js parser) to automatically check for stuck jobs.
- New `SyncedCron.checkStuckJobs(options)` method to manually check and remove stuck jobs.
- Automatic stuck jobs check is distributed-safe (only one server instance runs the check at a time).
- Add tests for the new functionality.

## 2.2.1 (2024-12-13)

- Add automatic cleanup of running jobs when the process encounters a fatal error (uncaught exceptions and unhandled rejections). The cleanup consists of marking the job as finished and adding a `terminatedBy` field to the job history collection to indicate how the job was terminated.
Expand Down
57 changes: 56 additions & 1 deletion synced-cron/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,65 @@ You can configure SyncedCron with the `config` method. Defaults are:
run `db.cronHistory.dropIndex({startedAt: 1})`) and re-run your
project. SyncedCron will recreate the index with the updated TTL.
*/
collectionTTL: 172800
collectionTTL: 172800,

// Time in milliseconds to consider a job as stuck (default: 15 minutes)
stuckJobsThreshold: 15 * 60 * 1000,

// Callback function called for each stuck job found
// Receives an object with: { job, runningTimeMs }
onStuckJobFound: null,

// Schedule function for automatic stuck jobs check (using Later.js parser)
// Set to null to disable automatic checking
checkStuckJobsSchedule: null
});
```

### Stuck Jobs Detection

SyncedCron can automatically detect and clean up jobs that appear to be stuck (running longer than expected without finishing). This is useful for recovering from crashed processes or hung jobs.

#### Automatic Check

To enable automatic stuck jobs checking, configure a schedule:

```js
SyncedCron.config({
// Jobs running longer than 15 minutes without finishing are considered stuck
stuckJobsThreshold: 15 * 60 * 1000,

// Check for stuck jobs every 5 minutes
checkStuckJobsSchedule: (parser) => parser.text('every 5 minutes'),

// Optional: handle each stuck job found
onStuckJobFound: ({ job, runningTimeMs }) => {
console.log(`Found stuck job: ${job.name}, running for ${runningTimeMs}ms`);
}
});

SyncedCron.start();
```

The automatic check is distributed-safe - only one server instance will run the check at a time.

#### Manual Check

You can also manually check for stuck jobs:

```js
const result = await SyncedCron.checkStuckJobs();
// result: { found: 2, removed: 2, stuckJobs: [...] }

// Or with custom options:
const result = await SyncedCron.checkStuckJobs({
stuckJobsThreshold: 30 * 60 * 1000, // 30 minutes
onStuckJobFound: ({ job, runningTimeMs }) => {
// Handle each stuck job
}
});
```

### Logging

SyncedCron uses Meteor's `logging` package by default. If you want to use your own logger (for sending to other consumers or similar) you can do so by configuring the `logger` option.
Expand Down
2 changes: 1 addition & 1 deletion synced-cron/package.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package.describe({
summary:
'Allows you to define and run scheduled jobs across multiple servers.',
version: '2.2.1',
version: '2.2.2',
name: 'quave:synced-cron',
git: 'https://github.com/quavedev/meteor-synced-cron.git',
});
Expand Down
107 changes: 107 additions & 0 deletions synced-cron/synced-cron-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ SyncedCron = {
_entries: {},
running: false,
processId: Random.id(),
_stuckJobsCheckName: '__SyncedCron_checkStuckJobs__',
options: {
//Log job run details to console
log: true,
Expand All @@ -20,6 +21,19 @@ SyncedCron = {
//NOTE: Unset to remove expiry but ensure you remove the index from
//mongo by hand
collectionTTL: 172800,

// Time in milliseconds to consider a job as stuck (default: 15 minutes)
// Jobs running longer than this without finishing will be considered stuck
stuckJobsThreshold: 15 * 60 * 1000,

// Callback function called for each stuck job found
// Receives an object with: { job, runningTimeMs }
onStuckJobFound: null,

// Schedule function for automatic stuck jobs check (using Later.js parser)
// Example: (parser) => parser.text('every 5 minutes')
// Set to null to disable automatic checking
checkStuckJobsSchedule: null,
},
config: function (opts) {
this.options = Object.assign({}, this.options, opts);
Expand Down Expand Up @@ -233,9 +247,40 @@ SyncedCron.start = function () {
scheduleEntry(entry);
});
self.running = true;

// Start automatic stuck jobs check if configured
self._startStuckJobsCheck();
});
};

// Start the automatic stuck jobs check as a cron job
SyncedCron._startStuckJobsCheck = function () {
const schedule = this.options.checkStuckJobsSchedule;

// Remove any existing stuck jobs check
this._stopStuckJobsCheck();

if (typeof schedule === 'function') {
const self = this;

this.add({
name: this._stuckJobsCheckName,
schedule: schedule,
persist: false,
job: async function () {
await self.checkStuckJobs();
},
});
}
};

// Stop the automatic stuck jobs check
SyncedCron._stopStuckJobsCheck = function () {
if (this._entries[this._stuckJobsCheckName]) {
this.remove(this._stuckJobsCheckName);
}
};

// Return the next scheduled date of the first matching entry or undefined
SyncedCron.nextScheduledAtDate = function (jobName) {
const entry = this._entries[jobName];
Expand All @@ -262,6 +307,7 @@ SyncedCron.pause = function () {
Object.values(this._entries).forEach(function pauseEntry(entry) {
entry._timer.clear();
});
this._stopStuckJobsCheck();
this.running = false;
}
};
Expand All @@ -271,9 +317,69 @@ SyncedCron.stop = function () {
Object.values(this._entries).forEach(function stopEntry(entry) {
SyncedCron.remove(entry.name);
});
this._stopStuckJobsCheck();
this.running = false;
};

// Check for and handle stuck jobs
// Returns an object with { found: number, removed: number, stuckJobs: Array }
SyncedCron.checkStuckJobs = async function (options = {}) {
const threshold = options.stuckJobsThreshold ?? this.options.stuckJobsThreshold;
const onStuckJobFound = options.onStuckJobFound ?? this.options.onStuckJobFound;

const thresholdDate = new Date(Date.now() - threshold);
const selector = {
startedAt: { $lt: thresholdDate },
finishedAt: { $exists: false },
};

const stuckJobs = await this._collection.find(selector).fetchAsync();

if (!stuckJobs.length) {
log.info('No stuck jobs found');
return { found: 0, removed: 0, stuckJobs: [] };
}

log.warn(`Found ${stuckJobs.length} stuck job(s)`);

const now = Date.now();

// Process each stuck job
for (const job of stuckJobs) {
const runningTimeMs = now - job.startedAt.getTime();
const runningTimeMinutes = Math.floor(runningTimeMs / 60000);

log.warn(
`Stuck job found - Name: ${job.name}, ` +
`Started at: ${job.startedAt.toISOString()}, ` +
`Running time: ${runningTimeMinutes} minutes, ` +
`Job ID: ${job._id}`
);

// Call the onStuckJobFound callback if provided
if (typeof onStuckJobFound === 'function') {
try {
await onStuckJobFound({ job, runningTimeMs })
} catch (e) {
log.error(`Error in onStuckJobFound callback for job "${job.name}": ${e && e.stack ? e.stack : e}`);
}
}
}

// Remove stuck jobs from the collection
const result = await this._collection.removeAsync({
_id: { $in: stuckJobs.map((job) => job._id) },
});

log.info(`Finished stuck jobs check. Total removed: ${result} job(s)`);

return {
found: stuckJobs.length,
removed: result,
stuckJobs,
};
};

// The meat of our logic. Checks if the specified has already run. If not,
// records that it's running the job, runs it, and records the output
SyncedCron._entryWrapper = function (entry) {
Expand Down Expand Up @@ -383,6 +489,7 @@ SyncedCron._entryWrapper = function (entry) {

// for tests
SyncedCron._reset = async function () {
this._stopStuckJobsCheck();
this._entries = {};
await this._collection.removeAsync({});
this.running = false;
Expand Down
Loading