Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions packages/utils/src/queue/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class Job <JobOptions extends AbortOptions & ProgressOptions = AbortOptio
public status: JobStatus
public readonly timeline: JobTimeline
private readonly controller: AbortController
private dispatchingProgress: boolean

constructor (fn: (options: JobOptions) => Promise<JobReturnType>, options: any) {
this.id = randomId()
Expand All @@ -41,6 +42,8 @@ export class Job <JobOptions extends AbortOptions & ProgressOptions = AbortOptio
this.controller = new AbortController()
setMaxListeners(Infinity, this.controller.signal)

this.dispatchingProgress = false

this.onAbort = this.onAbort.bind(this)
}

Expand Down Expand Up @@ -80,9 +83,25 @@ export class Job <JobOptions extends AbortOptions & ProgressOptions = AbortOptio
...(this.options ?? {}),
signal: this.controller.signal,
onProgress: (evt: any): void => {
this.recipients.forEach(recipient => {
recipient.onProgress?.(evt)
})
// Guard against re-entrant progress dispatch when two jobs'
// recipients form a cycle (issue #3484). Without this, a single
// progress event would recurse synchronously until the call
// stack overflows. Non-cyclic dispatches behave identically;
// only a synchronous re-entry into the same job's dispatcher
// is short-circuited.
if (this.dispatchingProgress) {
return
}

this.dispatchingProgress = true

try {
this.recipients.forEach(recipient => {
recipient.onProgress?.(evt)
})
} finally {
this.dispatchingProgress = false
}
}
}), this.controller.signal)

Expand Down
70 changes: 70 additions & 0 deletions packages/utils/test/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -934,4 +934,74 @@ describe('queue', () => {

expect(events).to.have.lengthOf(2)
})

it('should not recurse infinitely when two jobs progress-feed each other (issue #3484)', async () => {
// Reproduces the recursion crash where two jobs in different queues
// have onProgress callbacks that ultimately call back into each
// other's dispatcher. Without the per-job re-entry guard in Job.run,
// this throws `RangeError: Maximum call stack size exceeded`.
interface ProgressJobOptions extends AbortOptions, ProgressOptions {

}

const queueA = new Queue<string, ProgressJobOptions>({ concurrency: 1 })
const queueB = new Queue<string, ProgressJobOptions>({ concurrency: 1 })

let aSynthOP: ((evt: any) => void) | undefined
let bSynthOP: ((evt: any) => void) | undefined

const aReady = pDefer<void>()
const bReady = pDefer<void>()
const aHold = pDefer<void>()
const bHold = pDefer<void>()

const eventsAtA: any[] = []
const eventsAtB: any[] = []

const pA = queueA.add(async (options) => {
aSynthOP = options.onProgress
aReady.resolve()
await aHold.promise
return 'a'
}, {
onProgress: (evt) => {
eventsAtA.push(evt)
bSynthOP?.(evt)
}
})

const pB = queueB.add(async (options) => {
bSynthOP = options.onProgress
bReady.resolve()
await bHold.promise
return 'b'
}, {
onProgress: (evt) => {
eventsAtB.push(evt)
aSynthOP?.(evt)
}
})

// Wait for both jobs to start running and capture their synthesised
// onProgress dispatchers.
await Promise.all([aReady.promise, bReady.promise])

// Kick a single progress event into the cycle. With the guard this
// returns cleanly; without it, V8 throws RangeError.
expect(() => {
aSynthOP?.(new CustomProgressEvent('kick'))
}).to.not.throw()

// Each recipient should observe the event exactly once: the first
// hop into A's recipient, then the forward into B's recipient. The
// attempt to come back into aSynthOP is short-circuited because A's
// dispatcher is already running.
expect(eventsAtA).to.have.lengthOf(1)
expect(eventsAtB).to.have.lengthOf(1)

aHold.resolve()
bHold.resolve()

await Promise.all([pA, pB])
})
})