Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/fix-async-queuer-pending-tick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@tanstack/pacer': patch
---

fix(async-queuer): keep pendingTick true during wait period

When `addItem()` is called on a running queue during the wait period, it checks `isRunning && !pendingTick` to decide whether to trigger `#tick()`. Previously, `pendingTick` was set to `false` synchronously at the end of `#tick()`, even when async work was still pending. This caused `addItem()` to trigger immediate processing that bypassed the configured `wait` delay.

This fix tracks whether async work was scheduled and only clears `pendingTick` when no async work is pending.
30 changes: 15 additions & 15 deletions docs/reference/classes/AsyncQueuer.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Defined in: [async-queuer.ts:316](https://github.com/TanStack/pacer/blob/main/pa
abort(): void;
```

Defined in: [async-queuer.ts:836](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L836)
Defined in: [async-queuer.ts:840](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L840)

Aborts all ongoing executions with the internal abort controllers.
Does NOT clear out the items.
Expand All @@ -188,7 +188,7 @@ addItem(
runOnItemsChange): boolean;
```

Defined in: [async-queuer.ts:474](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L474)
Defined in: [async-queuer.ts:478](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L478)

Adds an item to the queue. If the queue is full, the item is rejected and onReject is called.
Items can be inserted based on priority or at the front/back depending on configuration.
Expand Down Expand Up @@ -226,7 +226,7 @@ queuer.addItem('task2', 'front');
clear(): void;
```

Defined in: [async-queuer.ts:801](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L801)
Defined in: [async-queuer.ts:805](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L805)

Removes all pending items from the queue.
Does NOT affect active tasks.
Expand All @@ -243,7 +243,7 @@ Does NOT affect active tasks.
execute(position?): Promise<any>;
```

Defined in: [async-queuer.ts:609](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L609)
Defined in: [async-queuer.ts:613](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L613)

Removes and returns the next item from the queue and executes the task function with it.

Expand Down Expand Up @@ -273,7 +273,7 @@ queuer.execute('back');
flush(numberOfItems, position?): Promise<void>;
```

Defined in: [async-queuer.ts:657](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L657)
Defined in: [async-queuer.ts:661](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L661)

Processes a specified number of items to execute immediately with no wait time
If no numberOfItems is provided, all items will be processed
Expand All @@ -300,7 +300,7 @@ If no numberOfItems is provided, all items will be processed
flushAsBatch(batchFunction): Promise<void>;
```

Defined in: [async-queuer.ts:671](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L671)
Defined in: [async-queuer.ts:675](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L675)

Processes all items in the queue as a batch using the provided function as an argument
The queue is cleared after processing
Expand All @@ -323,7 +323,7 @@ The queue is cleared after processing
getAbortSignal(executeCount?): AbortSignal | null;
```

Defined in: [async-queuer.ts:826](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L826)
Defined in: [async-queuer.ts:830](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L830)

Returns the AbortSignal for a specific execution.
If no executeCount is provided, returns the signal for the most recent execution.
Expand Down Expand Up @@ -364,7 +364,7 @@ const queuer = new AsyncQueuer(
getNextItem(position): TValue | undefined;
```

Defined in: [async-queuer.ts:557](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L557)
Defined in: [async-queuer.ts:561](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L561)

Removes and returns the next item from the queue without executing the task function.
Use for manual queue management. Normally, use execute() to process items.
Expand Down Expand Up @@ -396,7 +396,7 @@ queuer.getNextItem('back');
peekActiveItems(): TValue[];
```

Defined in: [async-queuer.ts:763](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L763)
Defined in: [async-queuer.ts:767](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L767)

Returns the items currently being processed (active tasks).

Expand All @@ -412,7 +412,7 @@ Returns the items currently being processed (active tasks).
peekAllItems(): TValue[];
```

Defined in: [async-queuer.ts:756](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L756)
Defined in: [async-queuer.ts:760](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L760)

Returns a copy of all items in the queue, including active and pending items.

Expand All @@ -428,7 +428,7 @@ Returns a copy of all items in the queue, including active and pending items.
peekNextItem(position): TValue | undefined;
```

Defined in: [async-queuer.ts:746](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L746)
Defined in: [async-queuer.ts:750](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L750)

Returns the next item in the queue without removing it.

Expand Down Expand Up @@ -457,7 +457,7 @@ queuer.peekNextItem('back'); // back
peekPendingItems(): TValue[];
```

Defined in: [async-queuer.ts:770](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L770)
Defined in: [async-queuer.ts:774](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L774)

Returns the items waiting to be processed (pending tasks).

Expand All @@ -473,7 +473,7 @@ Returns the items waiting to be processed (pending tasks).
reset(): void;
```

Defined in: [async-queuer.ts:847](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L847)
Defined in: [async-queuer.ts:851](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L851)

Resets the queuer state to its default values

Expand Down Expand Up @@ -511,7 +511,7 @@ Updates the queuer options. New options are merged with existing options.
start(): void;
```

Defined in: [async-queuer.ts:777](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L777)
Defined in: [async-queuer.ts:781](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L781)

Starts processing items in the queue. If already running, does nothing.

Expand All @@ -527,7 +527,7 @@ Starts processing items in the queue. If already running, does nothing.
stop(): void;
```

Defined in: [async-queuer.ts:787](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L787)
Defined in: [async-queuer.ts:791](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L791)

Stops processing items in the queue. Does not clear the queue.

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/functions/asyncQueue.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ title: asyncQueue
function asyncQueue<TValue>(fn, initialOptions): (item, position, runOnItemsChange) => boolean;
```

Defined in: [async-queuer.ts:919](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L919)
Defined in: [async-queuer.ts:923](https://github.com/TanStack/pacer/blob/main/packages/pacer/src/async-queuer.ts#L923)

Creates a new AsyncQueuer instance and returns a bound addItem function for adding tasks.
The queuer is started automatically and ready to process items.
Expand Down
6 changes: 5 additions & 1 deletion packages/pacer/src/async-queuer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ export class AsyncQueuer<TValue> {
this.#checkExpiredItems()

// Process items concurrently up to the concurrency limit
let scheduledAsyncWork = false
const activeItems = this.store.state.activeItems
while (
activeItems.length < this.#getConcurrency() &&
Expand All @@ -444,6 +445,7 @@ export class AsyncQueuer<TValue> {
this.#setState({
activeItems,
})
scheduledAsyncWork = true
;(async () => {
await this.execute()

Expand All @@ -458,7 +460,9 @@ export class AsyncQueuer<TValue> {
})()
}

this.#setState({ pendingTick: false })
if (!scheduledAsyncWork) {
this.#setState({ pendingTick: false })
}
}

/**
Expand Down
34 changes: 34 additions & 0 deletions packages/pacer/tests/async-queuer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,40 @@ describe('AsyncQueuer', () => {
expect(results).toHaveLength(3)
expect(results[2]).toBe('third')
})

it('should respect wait period when addItem is called during processing', async () => {
const results: Array<string> = []
const asyncQueuer = new AsyncQueuer<string>(
async (item) => {
results.push(item)
return item
},
{
wait: 100,
concurrency: 1,
started: false,
},
)

asyncQueuer.addItem('first')
asyncQueuer.start()

// 'first' processes immediately
await vi.advanceTimersByTimeAsync(0)
expect(results).toEqual(['first'])

// During the 100ms wait period, add a new item
await vi.advanceTimersByTimeAsync(50)
asyncQueuer.addItem('second')

// 'second' should NOT have processed yet — still in the wait period
await vi.advanceTimersByTimeAsync(0)
expect(results).toEqual(['first'])

// After the remaining wait time, 'second' should process
await vi.advanceTimersByTimeAsync(50)
expect(results).toEqual(['first', 'second'])
})
})

describe('error handling', () => {
Expand Down
Loading