From 90be1372e4919fdb6fda12aae959bb1b949fa980 Mon Sep 17 00:00:00 2001 From: Sorra Date: Wed, 11 Mar 2026 04:11:29 -0700 Subject: [PATCH 1/5] WL-0MMLXTAVF0IAIATF: Add token-bucket throttler and wire core github-sync upserts through it --- src/github-sync.ts | 13 ++-- src/github-throttler.ts | 145 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+), 6 deletions(-) create mode 100644 src/github-throttler.ts diff --git a/src/github-sync.ts b/src/github-sync.ts index 7e6478a..43745fe 100644 --- a/src/github-sync.ts +++ b/src/github-sync.ts @@ -38,6 +38,7 @@ import { labelFieldsDiffer, getLatestLabelEventTimestamp, } from './github.js'; +import throttler from './github-throttler.js'; import { increment, snapshot, diff } from './github-metrics.js'; import { mergeWorkItems } from './sync.js'; @@ -298,7 +299,7 @@ export async function upsertIssuesFromWorkItems( } if (item.githubIssueNumber) { increment('api.issue.update'); - issue = await updateGithubIssueAsync(config, item.githubIssueNumber, payload); + issue = await throttler.schedule(() => updateGithubIssueAsync(config, item.githubIssueNumber!, payload)); if (item.status === 'deleted') { result.closed += 1; result.syncedItems.push({ @@ -318,11 +319,11 @@ export async function upsertIssuesFromWorkItems( } } else { increment('api.issue.create'); - issue = await createGithubIssueAsync(config, { + issue = await throttler.schedule(() => createGithubIssueAsync(config, { title: payload.title, body: payload.body, labels: payload.labels, - }); + })); result.created += 1; result.syncedItems.push({ action: 'created', @@ -345,7 +346,7 @@ export async function upsertIssuesFromWorkItems( if (shouldSyncCommentsNow && issueNumber) { const commentListStart = Date.now(); increment('api.comment.list'); - const existingComments = await listGithubIssueCommentsAsync(config, issueNumber); + const existingComments = await throttler.schedule(() => listGithubIssueCommentsAsync(config, issueNumber!)); timing.commentListMs += Date.now() - commentListStart; const commentUpsertStart = Date.now(); const commentSummary = await upsertGithubIssueCommentsAsync(config, issueNumber, itemComments, existingComments); @@ -403,7 +404,7 @@ export async function upsertIssuesFromWorkItems( const bodyMatch = (existing.body || '').trim() === body.trim(); if (!bodyMatch) { increment('api.comment.update'); - const updatedComment = await updateGithubIssueCommentAsync(issueConfig, existing.id, body); + const updatedComment = await throttler.schedule(() => updateGithubIssueCommentAsync(issueConfig, existing.id!, body)); // Persist mapping back to local comment comment.githubCommentId = existing.id; comment.githubCommentUpdatedAt = updatedComment.updatedAt; @@ -418,7 +419,7 @@ export async function upsertIssuesFromWorkItems( // No GH comment mapping found — create a new comment increment('api.comment.create'); - const createdComment = await createGithubIssueCommentAsync(issueConfig, issueNumber, body); + const createdComment = await throttler.schedule(() => createGithubIssueCommentAsync(issueConfig, issueNumber, body)); // Persist mapping back to local comment so future runs can directly reference by ID comment.githubCommentId = createdComment.id; comment.githubCommentUpdatedAt = createdComment.updatedAt; diff --git a/src/github-throttler.ts b/src/github-throttler.ts new file mode 100644 index 0000000..0a8e5bf --- /dev/null +++ b/src/github-throttler.ts @@ -0,0 +1,145 @@ +/** + * Small token-bucket throttler with optional concurrency cap. + * - Rate: tokens per second + * - Burst: bucket capacity (initial tokens = burst) + * - Concurrency: max number of concurrent running tasks + * + * The implementation keeps a FIFO queue of pending tasks and attempts to + * dispatch them when both a token is available and concurrency allows. + * The clock is injectable to allow deterministic unit tests. + */ + +export type Clock = { now(): number }; + +export type ThrottlerOptions = { + rate: number; // tokens per second + burst: number; // bucket capacity + concurrency: number; // max concurrent tasks (0 or Infinity = unlimited) + clock?: Clock; +}; + +type Task = { + fn: () => Promise | T; + resolve: (v: T) => void; + reject: (e: unknown) => void; +}; + +export class TokenBucketThrottler { + private rate: number; + private burst: number; + private concurrency: number; + private clock: Clock; + + private tokens: number; + private lastRefill: number; // ms + private active = 0; + private queue: Array> = []; + + constructor(opts: ThrottlerOptions) { + this.rate = opts.rate; + this.burst = Math.max(1, Math.floor(opts.burst)); + this.concurrency = opts.concurrency <= 0 ? Infinity : Math.floor(opts.concurrency); + this.clock = opts.clock || { now: () => Date.now() }; + + // start full + this.tokens = this.burst; + this.lastRefill = this.clock.now(); + } + + schedule(fn: () => Promise | T): Promise { + return new Promise((resolve, reject) => { + const task: Task = { fn, resolve, reject } as Task; + this.queue.push(task as Task); + // try dispatch immediately + this.processQueue(); + }); + } + + private refillTokens(): void { + const now = this.clock.now(); + if (now <= this.lastRefill) return; + const elapsedMs = now - this.lastRefill; + const toAdd = (elapsedMs / 1000) * this.rate; + if (toAdd <= 0) return; + this.tokens = Math.min(this.burst, this.tokens + toAdd); + this.lastRefill = now; + } + + private scheduleProcess(delayMs: number): void { + // schedule a future attempt to process the queue + setTimeout(() => this.processQueue(), Math.max(0, Math.floor(delayMs))); + } + + private processQueue(): void { + // refill using clock + this.refillTokens(); + + // If no queued tasks, nothing to do + if (this.queue.length === 0) return; + + // If we have no tokens, compute next token arrival and schedule + if (this.tokens < 1) { + const missing = 1 - this.tokens; + const msUntil = (missing / this.rate) * 1000; + this.scheduleProcess(msUntil); + return; + } + + // If concurrency limit reached, wait for running tasks to complete + if (this.active >= this.concurrency) return; + + // Pop next task and run it consuming one token + const task = this.queue.shift() as Task | undefined; + if (!task) return; + + // consume one token + this.tokens -= 1; + // Ensure tokens not negative + if (this.tokens < 0) this.tokens = 0; + + this.active += 1; + + // Execute task + Promise.resolve() + .then(() => task.fn()) + .then((res) => { + this.active -= 1; + (task.resolve as (v: unknown) => void)(res); + // process more tasks (immediately) - may schedule next refill internally + this.processQueue(); + }) + .catch((err) => { + this.active -= 1; + task.reject(err); + this.processQueue(); + }); + + // After starting one, attempt to start more if possible + // Use setImmediate style to avoid deep recursion + if (typeof setImmediate !== 'undefined') setImmediate(() => this.processQueue()); + else this.scheduleProcess(0); + } +} + +/** + * Make a throttler instance from environment variables (or provided overrides) + */ +export function makeThrottlerFromEnv(overrides?: Partial): TokenBucketThrottler { + const rate = Number(process.env.WL_GITHUB_RATE || '6'); + const burst = Number(process.env.WL_GITHUB_BURST || '12'); + const concurrency = Number(process.env.WL_GITHUB_CONCURRENCY || String(6)); + + const opts: ThrottlerOptions = { + rate: overrides?.rate ?? rate, + burst: overrides?.burst ?? burst, + concurrency: overrides?.concurrency ?? concurrency, + clock: overrides?.clock, + } as ThrottlerOptions; + + return new TokenBucketThrottler(opts); +} + +// Default shared instance +export const throttler = makeThrottlerFromEnv(); + +export default throttler; From 734ae9646b363a86e36f9e2591a39470f07ee2db Mon Sep 17 00:00:00 2001 From: Sorra Date: Wed, 11 Mar 2026 05:09:12 -0700 Subject: [PATCH 2/5] WL-0MLGBAPEO1QGMTGM: Start migrating GH helpers to central throttler (wrap async GH calls with throttler.schedule) --- src/github.ts | 55 ++++++++++++++++++----------------- test/throttler.test.ts | 66 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 27 deletions(-) create mode 100644 test/throttler.test.ts diff --git a/src/github.ts b/src/github.ts index b1188ba..6f1411c 100644 --- a/src/github.ts +++ b/src/github.ts @@ -1,4 +1,5 @@ import { execSync, spawnSync, spawn } from 'child_process'; +import throttler from './github-throttler.js'; import * as fs from 'fs'; import * as os from 'os'; import * as path from 'path'; @@ -668,9 +669,9 @@ export function getIssueHierarchy(config: GithubConfig, issueNumber: number): Is export async function getIssueNodeIdAsync(config: GithubConfig, issueNumber: number): Promise { const { owner, name } = parseRepoSlug(config.repo); const query = `query($owner: String!, $name: String!, $number: Int!) { repository(owner: $owner, name: $name) { issue(number: $number) { id } } }`; - const output = await runGhJsonDetailedAsync( + const output = await throttler.schedule(() => runGhJsonDetailedAsync( `gh api graphql -f query=${quoteShellValue(query)} -f owner=${quoteShellValue(owner)} -f name=${quoteShellValue(name)} -F number=${issueNumber}` - ); + )); if (!output.ok) { throw new Error(output.error || 'Unable to query GitHub issue node ID'); } @@ -684,9 +685,9 @@ export async function getIssueNodeIdAsync(config: GithubConfig, issueNumber: num export async function getIssueHierarchyAsync(config: GithubConfig, issueNumber: number): Promise { const { owner, name } = parseRepoSlug(config.repo); const query = `query($owner: String!, $name: String!, $number: Int!) { repository(owner: $owner, name: $name) { issue(number: $number) { parent { number } subIssues(first: 100) { nodes { number } } } } }`; - const output = await runGhJsonDetailedAsync( + const output = await throttler.schedule(() => runGhJsonDetailedAsync( `gh api graphql -f query=${quoteShellValue(query)} -f owner=${quoteShellValue(owner)} -f name=${quoteShellValue(name)} -F number=${issueNumber}` - ); + )); if (!output.ok) { throw new Error(output.error || 'Unable to query issue hierarchy'); } @@ -762,9 +763,9 @@ export async function addSubIssueLinkAsync( const parentNodeId = await resolveNodeId(parentIssueNumber); const childNodeId = await resolveNodeId(childIssueNumber); const mutation = `mutation($parent: ID!, $child: ID!) { addSubIssue(input: { issueId: $parent, subIssueId: $child }) { issue { id } subIssue { id } } }`; - const result = await runGhJsonDetailedAsync( + const result = await throttler.schedule(() => runGhJsonDetailedAsync( `gh api graphql -f query=${quoteShellValue(mutation)} -f parent=${quoteShellValue(parentNodeId)} -f child=${quoteShellValue(childNodeId)}` - ); + )); if (!result.ok) { throw new Error(result.error || `Failed to link #${childIssueNumber} as sub-issue of #${parentIssueNumber}`); } @@ -934,7 +935,7 @@ export async function listGithubIssueCommentsAsync(config: GithubConfig, issueNu const { owner, name } = parseRepoSlug(config.repo); const command = `gh api repos/${owner}/${name}/issues/${issueNumber}/comments --paginate`; try { - const data = await runGhJsonAsync(command); + const data = await throttler.schedule(() => runGhJsonAsync(command)); if (!data) return []; const raw = Array.isArray(data) ? data : []; return raw.map(comment => normalizeGithubIssueComment(comment)); @@ -953,7 +954,7 @@ export function createGithubIssueComment(config: GithubConfig, issueNumber: numb export async function createGithubIssueCommentAsync(config: GithubConfig, issueNumber: number, body: string): Promise { const { owner, name } = parseRepoSlug(config.repo); const command = `gh api -X POST repos/${owner}/${name}/issues/${issueNumber}/comments -F body=@-`; - const data = await runGhJsonAsync(command, body); + const data = await throttler.schedule(() => runGhJsonAsync(command, body)); return normalizeGithubIssueComment(data); } @@ -967,7 +968,7 @@ export function updateGithubIssueComment(config: GithubConfig, commentId: number export async function updateGithubIssueCommentAsync(config: GithubConfig, commentId: number, body: string): Promise { const { owner, name } = parseRepoSlug(config.repo); const command = `gh api -X PATCH repos/${owner}/${name}/issues/comments/${commentId} -F body=@-`; - const data = await runGhJsonAsync(command, body); + const data = await throttler.schedule(() => runGhJsonAsync(command, body)); return normalizeGithubIssueComment(data); } @@ -981,7 +982,7 @@ export function getGithubIssueComment(config: GithubConfig, commentId: number): export async function getGithubIssueCommentAsync(config: GithubConfig, commentId: number): Promise { const { owner, name } = parseRepoSlug(config.repo); const command = `gh api repos/${owner}/${name}/issues/comments/${commentId} --json id,body,updatedAt,user`; - const data = await runGhJsonAsync(command); + const data = await throttler.schedule(() => runGhJsonAsync(command)); return normalizeGithubIssueComment(data); } @@ -1183,7 +1184,7 @@ export async function ensureGithubLabelsAsync(config: GithubConfig, labels: stri let existing = existingLabelsCache.get(config.repo); if (existing === undefined && !existingLabelsCache.has(config.repo)) { try { - const existingRaw = await runGhJsonAsync(`gh api repos/${owner}/${name}/labels --paginate`); + const existingRaw = await throttler.schedule(() => runGhJsonAsync(`gh api repos/${owner}/${name}/labels --paginate`)); const parsedSet = new Set(); if (existingRaw) { for (const entry of existingRaw) { @@ -1205,12 +1206,12 @@ export async function ensureGithubLabelsAsync(config: GithubConfig, labels: stri const color = labelColor(label); const createCommand = `gh api -X POST repos/${owner}/${name}/labels -f name=${JSON.stringify(label)} -f color=${JSON.stringify(color)}`; try { - await runGhAsync(createCommand); + await throttler.schedule(() => runGhAsync(createCommand)); existing.add(label); continue; } catch { const fallbackCommand = `gh issue label create ${JSON.stringify(label)} --repo ${config.repo} --color ${color}`; - try { await runGhAsync(fallbackCommand); existing.add(label); } catch (_) { /* ignore */ } + try { await throttler.schedule(() => runGhAsync(fallbackCommand)); existing.add(label); } catch (_) { /* ignore */ } } } } catch { @@ -1256,21 +1257,21 @@ async function ensureGithubLabelsOnceAsync(config: GithubConfig, labels: string[ export async function createGithubIssueAsync(config: GithubConfig, payload: { title: string; body: string; labels: string[] }): Promise { const command = `gh issue create --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; - const output = await runGhAsync(command, payload.body); + const output = await throttler.schedule(() => runGhAsync(command, payload.body)); let issueNumber: number | null = null; const match = output.match(/\/(\d+)$/); if (match) issueNumber = parseInt(match[1], 10); if (issueNumber !== null && payload.labels.length > 0) { // Ensure labels once per process to reduce API calls await ensureGithubLabelsOnceAsync(config, payload.labels); - try { await runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`); } catch (_) {} + try { await throttler.schedule(() => runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`)); } catch (_) {} } if (issueNumber === null) { - const view = await runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`); + const view = await throttler.schedule(() => runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`)); if (Array.isArray(view) && view.length > 0) return normalizeGithubIssue(view[0]); throw new Error('Failed to create GitHub issue'); } - const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); + const parsed = await throttler.schedule(() => runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`)); return normalizeGithubIssue(parsed); } @@ -1293,14 +1294,14 @@ export async function updateGithubIssueAsync( // Only edit title/body if something changed if (titleChanged || bodyChanged) { const command = `gh issue edit ${issueNumber} --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; - ops.push(runGhAsync(command, payload.body).then(() => {}).catch(() => {})); + ops.push(throttler.schedule(() => runGhAsync(command, payload.body)).then(() => {}).catch(() => {})); } // State change: only close/reopen when different - if (payload.state === 'closed' && current.state !== 'closed') { - ops.push(runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); + if (payload.state === 'closed' && current.state !== 'closed') { + ops.push(throttler.schedule(() => runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`)).then(() => {}).catch(() => {})); } else if (payload.state === 'open' && current.state === 'closed') { - ops.push(runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); + ops.push(throttler.schedule(() => runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`)).then(() => {}).catch(() => {})); } // Labels: compute status labels to remove and labels to add @@ -1311,14 +1312,14 @@ export async function updateGithubIssueAsync( // prevents label accumulation when e.g. stage changes from idea -> done. const staleLabelsToRemove = current.labels.filter(label => isSingleValueCategoryLabel(label, config.labelPrefix) && !desiredSet.has(label)); if (staleLabelsToRemove.length > 0) { - ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`).then(() => {}).catch(() => {})); + ops.push(throttler.schedule(() => runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`)).then(() => {}).catch(() => {})); } // Compute labels that are not already present const labelsToAdd = payload.labels.filter(l => !current.labels.includes(l)); if (labelsToAdd.length > 0) { await ensureGithubLabelsOnceAsync(config, labelsToAdd); - ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`).then(() => {}).catch(() => {})); + ops.push(throttler.schedule(() => runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`)).then(() => {}).catch(() => {})); } } @@ -1328,12 +1329,12 @@ export async function updateGithubIssueAsync( // If no ops ran, return current object, else fetch fresh state if (ops.length === 0) return current; - const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); + const parsed = await throttler.schedule(() => runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`)); return normalizeGithubIssue(parsed); } export async function getGithubIssueAsync(config: GithubConfig, issueNumber: number): Promise { - const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); + const parsed = await throttler.schedule(() => runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`)); return normalizeGithubIssue(parsed); } @@ -1341,7 +1342,7 @@ export async function listGithubIssuesAsync(config: GithubConfig, since?: string const sinceParam = since ? `&since=${encodeURIComponent(since)}` : ''; const apiPath = `repos/${config.repo}/issues?state=all&per_page=100${sinceParam}`; const apiCommand = `gh api ${quoteShellValue(apiPath)} --paginate`; - const output = await runGhAsync(apiCommand); + const output = await throttler.schedule(() => runGhAsync(apiCommand)); const parsed = JSON.parse(output) as any[]; const issuesOnly = parsed.filter(entry => { if (entry.pull_request) return false; @@ -1537,7 +1538,7 @@ export async function fetchLabelEventsAsync( try { const command = `gh api repos/${owner}/${name}/issues/${issueNumber}/events --paginate`; - const result = await runGhJsonDetailedAsync(command); + const result = await throttler.schedule(() => runGhJsonDetailedAsync(command)); if (!result.ok || !Array.isArray(result.data)) { // API failure — cache empty array to avoid retrying in same run diff --git a/test/throttler.test.ts b/test/throttler.test.ts new file mode 100644 index 0000000..5b8ca11 --- /dev/null +++ b/test/throttler.test.ts @@ -0,0 +1,66 @@ +import { describe, it, expect } from 'vitest'; +import { TokenBucketThrottler, makeThrottlerFromEnv } from '../src/github-throttler.js'; + +// Fake clock that we can advance manually +class FakeClock { + private t = 0; + now() { return this.t; } + advance(ms: number) { this.t += ms; } +} + +describe('TokenBucketThrottler - basic token semantics', () => { + it('starts with burst tokens and consumes one per scheduled task', async () => { + const clock = new FakeClock(); + const t = new TokenBucketThrottler({ rate: 1, burst: 2, concurrency: 10, clock }); + let ran = 0; + await Promise.all([ + t.schedule(async () => { ran += 1; return 1; }), + t.schedule(async () => { ran += 1; return 2; }), + ]); + expect(ran).toBe(2); + }); + + it('refills tokens over time according to rate', async () => { + const clock = new FakeClock(); + const t = new TokenBucketThrottler({ rate: 1, burst: 2, concurrency: 10, clock }); + // consume burst + await t.schedule(() => 1); + await t.schedule(() => 2); + // schedule a third task which will wait for a token + const p = t.schedule(() => 3); + // advance clock less than required -> still pending + clock.advance(500); + // allow event loop to process any timers + await new Promise(r => setTimeout(r, 0)); + // not resolved yet + let resolved = false; + p.then(() => { resolved = true; }); + await new Promise(r => setTimeout(r, 0)); + expect(resolved).toBe(false); + // advance one second to refill 1 token + clock.advance(500); + await new Promise(r => setTimeout(r, 0)); + await p; + expect(resolved).toBe(true); + }); +}); + +describe('TokenBucketThrottler - concurrency cap', () => { + it('enforces concurrency cap', async () => { + const clock = new FakeClock(); + const t = new TokenBucketThrottler({ rate: 10, burst: 10, concurrency: 1, clock }); + let running = 0; + const tasks = Array.from({ length: 3 }, () => t.schedule(async () => { + running += 1; + // hang until we advance clock (simulate async work) + await new Promise(r => setTimeout(r, 0)); + running -= 1; + return true; + })); + // allow tasks to start + await new Promise(r => setTimeout(r, 0)); + // only one should be running due to concurrency cap + expect(running).toBeLessThanOrEqual(1); + await Promise.all(tasks); + }); +}); From 3c65362473c0d2e2af9720709d4e86e2c5b083cb Mon Sep 17 00:00:00 2001 From: Sorra Date: Wed, 11 Mar 2026 07:02:29 -0700 Subject: [PATCH 3/5] WL-0MLGBAPEO1QGMTGM: Move scheduling to helper-level for GH helpers (reduce per-call throttling) --- src/github-sync.ts | 12 +-- src/github-throttler.ts | 7 +- src/github.ts | 170 +++++++++++++++++++++------------------- 3 files changed, 100 insertions(+), 89 deletions(-) diff --git a/src/github-sync.ts b/src/github-sync.ts index 43745fe..1d50f93 100644 --- a/src/github-sync.ts +++ b/src/github-sync.ts @@ -299,7 +299,7 @@ export async function upsertIssuesFromWorkItems( } if (item.githubIssueNumber) { increment('api.issue.update'); - issue = await throttler.schedule(() => updateGithubIssueAsync(config, item.githubIssueNumber!, payload)); + issue = await updateGithubIssueAsync(config, item.githubIssueNumber!, payload); if (item.status === 'deleted') { result.closed += 1; result.syncedItems.push({ @@ -319,11 +319,11 @@ export async function upsertIssuesFromWorkItems( } } else { increment('api.issue.create'); - issue = await throttler.schedule(() => createGithubIssueAsync(config, { + issue = await createGithubIssueAsync(config, { title: payload.title, body: payload.body, labels: payload.labels, - })); + }); result.created += 1; result.syncedItems.push({ action: 'created', @@ -346,7 +346,7 @@ export async function upsertIssuesFromWorkItems( if (shouldSyncCommentsNow && issueNumber) { const commentListStart = Date.now(); increment('api.comment.list'); - const existingComments = await throttler.schedule(() => listGithubIssueCommentsAsync(config, issueNumber!)); + const existingComments = await listGithubIssueCommentsAsync(config, issueNumber!); timing.commentListMs += Date.now() - commentListStart; const commentUpsertStart = Date.now(); const commentSummary = await upsertGithubIssueCommentsAsync(config, issueNumber, itemComments, existingComments); @@ -404,7 +404,7 @@ export async function upsertIssuesFromWorkItems( const bodyMatch = (existing.body || '').trim() === body.trim(); if (!bodyMatch) { increment('api.comment.update'); - const updatedComment = await throttler.schedule(() => updateGithubIssueCommentAsync(issueConfig, existing.id!, body)); + const updatedComment = await updateGithubIssueCommentAsync(issueConfig, existing.id!, body); // Persist mapping back to local comment comment.githubCommentId = existing.id; comment.githubCommentUpdatedAt = updatedComment.updatedAt; @@ -419,7 +419,7 @@ export async function upsertIssuesFromWorkItems( // No GH comment mapping found — create a new comment increment('api.comment.create'); - const createdComment = await throttler.schedule(() => createGithubIssueCommentAsync(issueConfig, issueNumber, body)); + const createdComment = await createGithubIssueCommentAsync(issueConfig, issueNumber, body); // Persist mapping back to local comment so future runs can directly reference by ID comment.githubCommentId = createdComment.id; comment.githubCommentUpdatedAt = createdComment.updatedAt; diff --git a/src/github-throttler.ts b/src/github-throttler.ts index 0a8e5bf..6c53d3c 100644 --- a/src/github-throttler.ts +++ b/src/github-throttler.ts @@ -127,7 +127,12 @@ export class TokenBucketThrottler { export function makeThrottlerFromEnv(overrides?: Partial): TokenBucketThrottler { const rate = Number(process.env.WL_GITHUB_RATE || '6'); const burst = Number(process.env.WL_GITHUB_BURST || '12'); - const concurrency = Number(process.env.WL_GITHUB_CONCURRENCY || String(6)); + // Only enforce a concurrency cap if the env var is explicitly set. Leaving + // it unset preserves existing per-callsite concurrency controls. When not + // set we use Infinity (unlimited) so the throttler only enforces rate. + const concurrency = process.env.WL_GITHUB_CONCURRENCY !== undefined + ? Number(process.env.WL_GITHUB_CONCURRENCY) + : Infinity; const opts: ThrottlerOptions = { rate: overrides?.rate ?? rate, diff --git a/src/github.ts b/src/github.ts index 6f1411c..f4f8c07 100644 --- a/src/github.ts +++ b/src/github.ts @@ -669,9 +669,9 @@ export function getIssueHierarchy(config: GithubConfig, issueNumber: number): Is export async function getIssueNodeIdAsync(config: GithubConfig, issueNumber: number): Promise { const { owner, name } = parseRepoSlug(config.repo); const query = `query($owner: String!, $name: String!, $number: Int!) { repository(owner: $owner, name: $name) { issue(number: $number) { id } } }`; - const output = await throttler.schedule(() => runGhJsonDetailedAsync( + const output = await runGhJsonDetailedAsync( `gh api graphql -f query=${quoteShellValue(query)} -f owner=${quoteShellValue(owner)} -f name=${quoteShellValue(name)} -F number=${issueNumber}` - )); + ); if (!output.ok) { throw new Error(output.error || 'Unable to query GitHub issue node ID'); } @@ -685,9 +685,9 @@ export async function getIssueNodeIdAsync(config: GithubConfig, issueNumber: num export async function getIssueHierarchyAsync(config: GithubConfig, issueNumber: number): Promise { const { owner, name } = parseRepoSlug(config.repo); const query = `query($owner: String!, $name: String!, $number: Int!) { repository(owner: $owner, name: $name) { issue(number: $number) { parent { number } subIssues(first: 100) { nodes { number } } } } }`; - const output = await throttler.schedule(() => runGhJsonDetailedAsync( + const output = await runGhJsonDetailedAsync( `gh api graphql -f query=${quoteShellValue(query)} -f owner=${quoteShellValue(owner)} -f name=${quoteShellValue(name)} -F number=${issueNumber}` - )); + ); if (!output.ok) { throw new Error(output.error || 'Unable to query issue hierarchy'); } @@ -763,9 +763,9 @@ export async function addSubIssueLinkAsync( const parentNodeId = await resolveNodeId(parentIssueNumber); const childNodeId = await resolveNodeId(childIssueNumber); const mutation = `mutation($parent: ID!, $child: ID!) { addSubIssue(input: { issueId: $parent, subIssueId: $child }) { issue { id } subIssue { id } } }`; - const result = await throttler.schedule(() => runGhJsonDetailedAsync( + const result = await runGhJsonDetailedAsync( `gh api graphql -f query=${quoteShellValue(mutation)} -f parent=${quoteShellValue(parentNodeId)} -f child=${quoteShellValue(childNodeId)}` - )); + ); if (!result.ok) { throw new Error(result.error || `Failed to link #${childIssueNumber} as sub-issue of #${parentIssueNumber}`); } @@ -935,7 +935,7 @@ export async function listGithubIssueCommentsAsync(config: GithubConfig, issueNu const { owner, name } = parseRepoSlug(config.repo); const command = `gh api repos/${owner}/${name}/issues/${issueNumber}/comments --paginate`; try { - const data = await throttler.schedule(() => runGhJsonAsync(command)); + const data = await runGhJsonAsync(command); if (!data) return []; const raw = Array.isArray(data) ? data : []; return raw.map(comment => normalizeGithubIssueComment(comment)); @@ -954,7 +954,7 @@ export function createGithubIssueComment(config: GithubConfig, issueNumber: numb export async function createGithubIssueCommentAsync(config: GithubConfig, issueNumber: number, body: string): Promise { const { owner, name } = parseRepoSlug(config.repo); const command = `gh api -X POST repos/${owner}/${name}/issues/${issueNumber}/comments -F body=@-`; - const data = await throttler.schedule(() => runGhJsonAsync(command, body)); + const data = await runGhJsonAsync(command, body); return normalizeGithubIssueComment(data); } @@ -968,7 +968,7 @@ export function updateGithubIssueComment(config: GithubConfig, commentId: number export async function updateGithubIssueCommentAsync(config: GithubConfig, commentId: number, body: string): Promise { const { owner, name } = parseRepoSlug(config.repo); const command = `gh api -X PATCH repos/${owner}/${name}/issues/comments/${commentId} -F body=@-`; - const data = await throttler.schedule(() => runGhJsonAsync(command, body)); + const data = await runGhJsonAsync(command, body); return normalizeGithubIssueComment(data); } @@ -982,7 +982,7 @@ export function getGithubIssueComment(config: GithubConfig, commentId: number): export async function getGithubIssueCommentAsync(config: GithubConfig, commentId: number): Promise { const { owner, name } = parseRepoSlug(config.repo); const command = `gh api repos/${owner}/${name}/issues/comments/${commentId} --json id,body,updatedAt,user`; - const data = await throttler.schedule(() => runGhJsonAsync(command)); + const data = await runGhJsonAsync(command); return normalizeGithubIssueComment(data); } @@ -1184,7 +1184,7 @@ export async function ensureGithubLabelsAsync(config: GithubConfig, labels: stri let existing = existingLabelsCache.get(config.repo); if (existing === undefined && !existingLabelsCache.has(config.repo)) { try { - const existingRaw = await throttler.schedule(() => runGhJsonAsync(`gh api repos/${owner}/${name}/labels --paginate`)); + const existingRaw = await runGhJsonAsync(`gh api repos/${owner}/${name}/labels --paginate`); const parsedSet = new Set(); if (existingRaw) { for (const entry of existingRaw) { @@ -1205,14 +1205,14 @@ export async function ensureGithubLabelsAsync(config: GithubConfig, labels: stri if (existing.has(label)) continue; const color = labelColor(label); const createCommand = `gh api -X POST repos/${owner}/${name}/labels -f name=${JSON.stringify(label)} -f color=${JSON.stringify(color)}`; - try { - await throttler.schedule(() => runGhAsync(createCommand)); - existing.add(label); - continue; - } catch { - const fallbackCommand = `gh issue label create ${JSON.stringify(label)} --repo ${config.repo} --color ${color}`; - try { await throttler.schedule(() => runGhAsync(fallbackCommand)); existing.add(label); } catch (_) { /* ignore */ } - } + try { + await runGhAsync(createCommand); + existing.add(label); + continue; + } catch { + const fallbackCommand = `gh issue label create ${JSON.stringify(label)} --repo ${config.repo} --color ${color}`; + try { await runGhAsync(fallbackCommand); existing.add(label); } catch (_) { /* ignore */ } + } } } catch { // ignore label creation failures @@ -1256,23 +1256,25 @@ async function ensureGithubLabelsOnceAsync(config: GithubConfig, labels: string[ } export async function createGithubIssueAsync(config: GithubConfig, payload: { title: string; body: string; labels: string[] }): Promise { - const command = `gh issue create --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; - const output = await throttler.schedule(() => runGhAsync(command, payload.body)); - let issueNumber: number | null = null; - const match = output.match(/\/(\d+)$/); - if (match) issueNumber = parseInt(match[1], 10); - if (issueNumber !== null && payload.labels.length > 0) { - // Ensure labels once per process to reduce API calls - await ensureGithubLabelsOnceAsync(config, payload.labels); - try { await throttler.schedule(() => runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`)); } catch (_) {} - } - if (issueNumber === null) { - const view = await throttler.schedule(() => runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`)); - if (Array.isArray(view) && view.length > 0) return normalizeGithubIssue(view[0]); - throw new Error('Failed to create GitHub issue'); - } - const parsed = await throttler.schedule(() => runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`)); - return normalizeGithubIssue(parsed); + return await throttler.schedule(async () => { + const command = `gh issue create --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; + const output = await runGhAsync(command, payload.body); + let issueNumber: number | null = null; + const match = output.match(/\/(\d+)$/); + if (match) issueNumber = parseInt(match[1], 10); + if (issueNumber !== null && payload.labels.length > 0) { + // Ensure labels once per process to reduce API calls + await ensureGithubLabelsOnceAsync(config, payload.labels); + try { await runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`); } catch (_) {} + } + if (issueNumber === null) { + const view = await runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`); + if (Array.isArray(view) && view.length > 0) return normalizeGithubIssue(view[0]); + throw new Error('Failed to create GitHub issue'); + } + const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); + return normalizeGithubIssue(parsed); + }); } export async function updateGithubIssueAsync( @@ -1280,61 +1282,65 @@ export async function updateGithubIssueAsync( issueNumber: number, payload: { title: string; body: string; labels: string[]; state: 'open' | 'closed' } ): Promise { - // Fetch current issue once and compute minimal set of operations - let current: GithubIssueRecord; - try { - current = await getGithubIssueAsync(config, issueNumber); - } catch { - current = getGithubIssue(config, issueNumber); - } + // Run the entire update flow as a single scheduled task to avoid + // serializing internal parallel operations via per-call scheduling. + return await throttler.schedule(async () => { + // Fetch current issue once and compute minimal set of operations + let current: GithubIssueRecord; + try { + current = await getGithubIssueAsync(config, issueNumber); + } catch { + current = getGithubIssue(config, issueNumber); + } - const ops: Array> = []; - const titleChanged = (current.title || '') !== (payload.title || ''); - const bodyChanged = (current.body || '') !== (payload.body || ''); - // Only edit title/body if something changed - if (titleChanged || bodyChanged) { - const command = `gh issue edit ${issueNumber} --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; - ops.push(throttler.schedule(() => runGhAsync(command, payload.body)).then(() => {}).catch(() => {})); - } + const ops: Array> = []; + const titleChanged = (current.title || '') !== (payload.title || ''); + const bodyChanged = (current.body || '') !== (payload.body || ''); + // Only edit title/body if something changed + if (titleChanged || bodyChanged) { + const command = `gh issue edit ${issueNumber} --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; + ops.push(runGhAsync(command, payload.body).then(() => {}).catch(() => {})); + } - // State change: only close/reopen when different + // State change: only close/reopen when different if (payload.state === 'closed' && current.state !== 'closed') { - ops.push(throttler.schedule(() => runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`)).then(() => {}).catch(() => {})); - } else if (payload.state === 'open' && current.state === 'closed') { - ops.push(throttler.schedule(() => runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`)).then(() => {}).catch(() => {})); - } - - // Labels: compute status labels to remove and labels to add - if (payload.labels.length > 0) { - const desiredSet = new Set(payload.labels); - // Remove any single-valued category labels (stage, priority, status, type, - // risk, effort) that are on the issue but not in the desired set. This - // prevents label accumulation when e.g. stage changes from idea -> done. - const staleLabelsToRemove = current.labels.filter(label => isSingleValueCategoryLabel(label, config.labelPrefix) && !desiredSet.has(label)); - if (staleLabelsToRemove.length > 0) { - ops.push(throttler.schedule(() => runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`)).then(() => {}).catch(() => {})); + ops.push(runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); + } else if (payload.state === 'open' && current.state === 'closed') { + ops.push(runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); } - // Compute labels that are not already present - const labelsToAdd = payload.labels.filter(l => !current.labels.includes(l)); - if (labelsToAdd.length > 0) { - await ensureGithubLabelsOnceAsync(config, labelsToAdd); - ops.push(throttler.schedule(() => runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`)).then(() => {}).catch(() => {})); + // Labels: compute status labels to remove and labels to add + if (payload.labels.length > 0) { + const desiredSet = new Set(payload.labels); + // Remove any single-valued category labels (stage, priority, status, type, + // risk, effort) that are on the issue but not in the desired set. This + // prevents label accumulation when e.g. stage changes from idea -> done. + const staleLabelsToRemove = current.labels.filter(label => isSingleValueCategoryLabel(label, config.labelPrefix) && !desiredSet.has(label)); + if (staleLabelsToRemove.length > 0) { + ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`).then(() => {}).catch(() => {})); + } + + // Compute labels that are not already present + const labelsToAdd = payload.labels.filter(l => !current.labels.includes(l)); + if (labelsToAdd.length > 0) { + await ensureGithubLabelsOnceAsync(config, labelsToAdd); + ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`).then(() => {}).catch(() => {})); + } } - } - // Execute operations — remove stale labels first, then add new ones, - // to avoid transient states where both old and new labels coexist. - if (ops.length > 0) await Promise.all(ops); + // Execute operations — remove stale labels first, then add new ones, + // to avoid transient states where both old and new labels coexist. + if (ops.length > 0) await Promise.all(ops); - // If no ops ran, return current object, else fetch fresh state - if (ops.length === 0) return current; - const parsed = await throttler.schedule(() => runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`)); - return normalizeGithubIssue(parsed); + // If no ops ran, return current object, else fetch fresh state + if (ops.length === 0) return current; + const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); + return normalizeGithubIssue(parsed); + }); } export async function getGithubIssueAsync(config: GithubConfig, issueNumber: number): Promise { - const parsed = await throttler.schedule(() => runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`)); + const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); return normalizeGithubIssue(parsed); } @@ -1342,7 +1348,7 @@ export async function listGithubIssuesAsync(config: GithubConfig, since?: string const sinceParam = since ? `&since=${encodeURIComponent(since)}` : ''; const apiPath = `repos/${config.repo}/issues?state=all&per_page=100${sinceParam}`; const apiCommand = `gh api ${quoteShellValue(apiPath)} --paginate`; - const output = await throttler.schedule(() => runGhAsync(apiCommand)); + const output = await runGhAsync(apiCommand); const parsed = JSON.parse(output) as any[]; const issuesOnly = parsed.filter(entry => { if (entry.pull_request) return false; @@ -1538,7 +1544,7 @@ export async function fetchLabelEventsAsync( try { const command = `gh api repos/${owner}/${name}/issues/${issueNumber}/events --paginate`; - const result = await throttler.schedule(() => runGhJsonDetailedAsync(command)); + const result = await runGhJsonDetailedAsync(command); if (!result.ok || !Array.isArray(result.data)) { // API failure — cache empty array to avoid retrying in same run From 96ba25437b9b324dcf15a274bc95aa7bf31995d3 Mon Sep 17 00:00:00 2001 From: Sorra Date: Wed, 11 Mar 2026 09:21:41 -0700 Subject: [PATCH 4/5] WL-0MLGBAPEO1QGMTGM: Add debug tracing to TokenBucketThrottler (env WL_GITHUB_THROTTLER_DEBUG) --- src/github-throttler.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/github-throttler.ts b/src/github-throttler.ts index 6c53d3c..76826bb 100644 --- a/src/github-throttler.ts +++ b/src/github-throttler.ts @@ -34,6 +34,7 @@ export class TokenBucketThrottler { private lastRefill: number; // ms private active = 0; private queue: Array> = []; + private debug = false; constructor(opts: ThrottlerOptions) { this.rate = opts.rate; @@ -44,6 +45,7 @@ export class TokenBucketThrottler { // start full this.tokens = this.burst; this.lastRefill = this.clock.now(); + this.debug = Boolean(process.env.WL_GITHUB_THROTTLER_DEBUG); } schedule(fn: () => Promise | T): Promise { @@ -75,12 +77,16 @@ export class TokenBucketThrottler { this.refillTokens(); // If no queued tasks, nothing to do - if (this.queue.length === 0) return; + if (this.queue.length === 0) { + if (this.debug) console.debug(`[throttler] idle tokens=${this.tokens.toFixed(2)} active=${this.active} queue=0`); + return; + } // If we have no tokens, compute next token arrival and schedule if (this.tokens < 1) { const missing = 1 - this.tokens; const msUntil = (missing / this.rate) * 1000; + if (this.debug) console.debug(`[throttler] no tokens (tokens=${this.tokens.toFixed(2)}), scheduling next check in ${Math.ceil(msUntil)}ms queue=${this.queue.length} active=${this.active}`); this.scheduleProcess(msUntil); return; } @@ -98,6 +104,7 @@ export class TokenBucketThrottler { if (this.tokens < 0) this.tokens = 0; this.active += 1; + if (this.debug) console.debug(`[throttler] dispatch task (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); // Execute task Promise.resolve() @@ -105,12 +112,14 @@ export class TokenBucketThrottler { .then((res) => { this.active -= 1; (task.resolve as (v: unknown) => void)(res); + if (this.debug) console.debug(`[throttler] task complete (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); // process more tasks (immediately) - may schedule next refill internally this.processQueue(); }) .catch((err) => { this.active -= 1; task.reject(err); + if (this.debug) console.debug(`[throttler] task error (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length}) ${String(err?.message ?? err)}`); this.processQueue(); }); From e00b327d9daad3498d9bb2685263e2b42a38eb86 Mon Sep 17 00:00:00 2001 From: Sorra Date: Wed, 11 Mar 2026 09:38:14 -0700 Subject: [PATCH 5/5] WL-0MLGBAPEO1QGMTGM: Make throttler debug also enabled with --verbose and add timestamps --- src/github-throttler.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/github-throttler.ts b/src/github-throttler.ts index 76826bb..f6d542f 100644 --- a/src/github-throttler.ts +++ b/src/github-throttler.ts @@ -45,7 +45,9 @@ export class TokenBucketThrottler { // start full this.tokens = this.burst; this.lastRefill = this.clock.now(); - this.debug = Boolean(process.env.WL_GITHUB_THROTTLER_DEBUG); + // Enable debug when explicit env var set or when the process is started + // with a `--verbose` flag (useful when running test runner with `--verbose`). + this.debug = Boolean(process.env.WL_GITHUB_THROTTLER_DEBUG) || (Array.isArray(process.argv) && process.argv.includes('--verbose')); } schedule(fn: () => Promise | T): Promise { @@ -78,7 +80,7 @@ export class TokenBucketThrottler { // If no queued tasks, nothing to do if (this.queue.length === 0) { - if (this.debug) console.debug(`[throttler] idle tokens=${this.tokens.toFixed(2)} active=${this.active} queue=0`); + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] idle tokens=${this.tokens.toFixed(2)} active=${this.active} queue=0`); return; } @@ -86,7 +88,7 @@ export class TokenBucketThrottler { if (this.tokens < 1) { const missing = 1 - this.tokens; const msUntil = (missing / this.rate) * 1000; - if (this.debug) console.debug(`[throttler] no tokens (tokens=${this.tokens.toFixed(2)}), scheduling next check in ${Math.ceil(msUntil)}ms queue=${this.queue.length} active=${this.active}`); + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] no tokens (tokens=${this.tokens.toFixed(2)}), scheduling next check in ${Math.ceil(msUntil)}ms queue=${this.queue.length} active=${this.active}`); this.scheduleProcess(msUntil); return; } @@ -104,7 +106,7 @@ export class TokenBucketThrottler { if (this.tokens < 0) this.tokens = 0; this.active += 1; - if (this.debug) console.debug(`[throttler] dispatch task (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] dispatch task (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); // Execute task Promise.resolve() @@ -112,14 +114,14 @@ export class TokenBucketThrottler { .then((res) => { this.active -= 1; (task.resolve as (v: unknown) => void)(res); - if (this.debug) console.debug(`[throttler] task complete (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] task complete (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); // process more tasks (immediately) - may schedule next refill internally this.processQueue(); }) .catch((err) => { this.active -= 1; task.reject(err); - if (this.debug) console.debug(`[throttler] task error (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length}) ${String(err?.message ?? err)}`); + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] task error (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length}) ${String(err?.message ?? err)}`); this.processQueue(); });