diff --git a/src/github-sync.ts b/src/github-sync.ts index 7e6478a..1d50f93 100644 --- a/src/github-sync.ts +++ b/src/github-sync.ts @@ -38,6 +38,7 @@ import { labelFieldsDiffer, getLatestLabelEventTimestamp, } from './github.js'; +import throttler from './github-throttler.js'; import { increment, snapshot, diff } from './github-metrics.js'; import { mergeWorkItems } from './sync.js'; @@ -298,7 +299,7 @@ export async function upsertIssuesFromWorkItems( } if (item.githubIssueNumber) { increment('api.issue.update'); - issue = await updateGithubIssueAsync(config, item.githubIssueNumber, payload); + issue = await updateGithubIssueAsync(config, item.githubIssueNumber!, payload); if (item.status === 'deleted') { result.closed += 1; result.syncedItems.push({ @@ -345,7 +346,7 @@ export async function upsertIssuesFromWorkItems( if (shouldSyncCommentsNow && issueNumber) { const commentListStart = Date.now(); increment('api.comment.list'); - const existingComments = await listGithubIssueCommentsAsync(config, issueNumber); + const existingComments = await listGithubIssueCommentsAsync(config, issueNumber!); timing.commentListMs += Date.now() - commentListStart; const commentUpsertStart = Date.now(); const commentSummary = await upsertGithubIssueCommentsAsync(config, issueNumber, itemComments, existingComments); @@ -403,7 +404,7 @@ export async function upsertIssuesFromWorkItems( const bodyMatch = (existing.body || '').trim() === body.trim(); if (!bodyMatch) { increment('api.comment.update'); - const updatedComment = await updateGithubIssueCommentAsync(issueConfig, existing.id, body); + const updatedComment = await updateGithubIssueCommentAsync(issueConfig, existing.id!, body); // Persist mapping back to local comment comment.githubCommentId = existing.id; comment.githubCommentUpdatedAt = updatedComment.updatedAt; @@ -418,7 +419,7 @@ export async function upsertIssuesFromWorkItems( // No GH comment mapping found — create a new comment increment('api.comment.create'); - const createdComment = await createGithubIssueCommentAsync(issueConfig, issueNumber, body); + const createdComment = await createGithubIssueCommentAsync(issueConfig, issueNumber, body); // Persist mapping back to local comment so future runs can directly reference by ID comment.githubCommentId = createdComment.id; comment.githubCommentUpdatedAt = createdComment.updatedAt; diff --git a/src/github-throttler.ts b/src/github-throttler.ts new file mode 100644 index 0000000..f6d542f --- /dev/null +++ b/src/github-throttler.ts @@ -0,0 +1,161 @@ +/** + * Small token-bucket throttler with optional concurrency cap. + * - Rate: tokens per second + * - Burst: bucket capacity (initial tokens = burst) + * - Concurrency: max number of concurrent running tasks + * + * The implementation keeps a FIFO queue of pending tasks and attempts to + * dispatch them when both a token is available and concurrency allows. + * The clock is injectable to allow deterministic unit tests. + */ + +export type Clock = { now(): number }; + +export type ThrottlerOptions = { + rate: number; // tokens per second + burst: number; // bucket capacity + concurrency: number; // max concurrent tasks (0 or Infinity = unlimited) + clock?: Clock; +}; + +type Task = { + fn: () => Promise | T; + resolve: (v: T) => void; + reject: (e: unknown) => void; +}; + +export class TokenBucketThrottler { + private rate: number; + private burst: number; + private concurrency: number; + private clock: Clock; + + private tokens: number; + private lastRefill: number; // ms + private active = 0; + private queue: Array> = []; + private debug = false; + + constructor(opts: ThrottlerOptions) { + this.rate = opts.rate; + this.burst = Math.max(1, Math.floor(opts.burst)); + this.concurrency = opts.concurrency <= 0 ? Infinity : Math.floor(opts.concurrency); + this.clock = opts.clock || { now: () => Date.now() }; + + // start full + this.tokens = this.burst; + this.lastRefill = this.clock.now(); + // Enable debug when explicit env var set or when the process is started + // with a `--verbose` flag (useful when running test runner with `--verbose`). + this.debug = Boolean(process.env.WL_GITHUB_THROTTLER_DEBUG) || (Array.isArray(process.argv) && process.argv.includes('--verbose')); + } + + schedule(fn: () => Promise | T): Promise { + return new Promise((resolve, reject) => { + const task: Task = { fn, resolve, reject } as Task; + this.queue.push(task as Task); + // try dispatch immediately + this.processQueue(); + }); + } + + private refillTokens(): void { + const now = this.clock.now(); + if (now <= this.lastRefill) return; + const elapsedMs = now - this.lastRefill; + const toAdd = (elapsedMs / 1000) * this.rate; + if (toAdd <= 0) return; + this.tokens = Math.min(this.burst, this.tokens + toAdd); + this.lastRefill = now; + } + + private scheduleProcess(delayMs: number): void { + // schedule a future attempt to process the queue + setTimeout(() => this.processQueue(), Math.max(0, Math.floor(delayMs))); + } + + private processQueue(): void { + // refill using clock + this.refillTokens(); + + // If no queued tasks, nothing to do + if (this.queue.length === 0) { + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] idle tokens=${this.tokens.toFixed(2)} active=${this.active} queue=0`); + return; + } + + // If we have no tokens, compute next token arrival and schedule + if (this.tokens < 1) { + const missing = 1 - this.tokens; + const msUntil = (missing / this.rate) * 1000; + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] no tokens (tokens=${this.tokens.toFixed(2)}), scheduling next check in ${Math.ceil(msUntil)}ms queue=${this.queue.length} active=${this.active}`); + this.scheduleProcess(msUntil); + return; + } + + // If concurrency limit reached, wait for running tasks to complete + if (this.active >= this.concurrency) return; + + // Pop next task and run it consuming one token + const task = this.queue.shift() as Task | undefined; + if (!task) return; + + // consume one token + this.tokens -= 1; + // Ensure tokens not negative + if (this.tokens < 0) this.tokens = 0; + + this.active += 1; + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] dispatch task (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); + + // Execute task + Promise.resolve() + .then(() => task.fn()) + .then((res) => { + this.active -= 1; + (task.resolve as (v: unknown) => void)(res); + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] task complete (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`); + // process more tasks (immediately) - may schedule next refill internally + this.processQueue(); + }) + .catch((err) => { + this.active -= 1; + task.reject(err); + if (this.debug) console.debug(`${new Date().toISOString()} [throttler] task error (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length}) ${String(err?.message ?? err)}`); + this.processQueue(); + }); + + // After starting one, attempt to start more if possible + // Use setImmediate style to avoid deep recursion + if (typeof setImmediate !== 'undefined') setImmediate(() => this.processQueue()); + else this.scheduleProcess(0); + } +} + +/** + * Make a throttler instance from environment variables (or provided overrides) + */ +export function makeThrottlerFromEnv(overrides?: Partial): TokenBucketThrottler { + const rate = Number(process.env.WL_GITHUB_RATE || '6'); + const burst = Number(process.env.WL_GITHUB_BURST || '12'); + // Only enforce a concurrency cap if the env var is explicitly set. Leaving + // it unset preserves existing per-callsite concurrency controls. When not + // set we use Infinity (unlimited) so the throttler only enforces rate. + const concurrency = process.env.WL_GITHUB_CONCURRENCY !== undefined + ? Number(process.env.WL_GITHUB_CONCURRENCY) + : Infinity; + + const opts: ThrottlerOptions = { + rate: overrides?.rate ?? rate, + burst: overrides?.burst ?? burst, + concurrency: overrides?.concurrency ?? concurrency, + clock: overrides?.clock, + } as ThrottlerOptions; + + return new TokenBucketThrottler(opts); +} + +// Default shared instance +export const throttler = makeThrottlerFromEnv(); + +export default throttler; diff --git a/src/github.ts b/src/github.ts index b1188ba..f4f8c07 100644 --- a/src/github.ts +++ b/src/github.ts @@ -1,4 +1,5 @@ import { execSync, spawnSync, spawn } from 'child_process'; +import throttler from './github-throttler.js'; import * as fs from 'fs'; import * as os from 'os'; import * as path from 'path'; @@ -1204,14 +1205,14 @@ export async function ensureGithubLabelsAsync(config: GithubConfig, labels: stri if (existing.has(label)) continue; const color = labelColor(label); const createCommand = `gh api -X POST repos/${owner}/${name}/labels -f name=${JSON.stringify(label)} -f color=${JSON.stringify(color)}`; - try { - await runGhAsync(createCommand); - existing.add(label); - continue; - } catch { - const fallbackCommand = `gh issue label create ${JSON.stringify(label)} --repo ${config.repo} --color ${color}`; - try { await runGhAsync(fallbackCommand); existing.add(label); } catch (_) { /* ignore */ } - } + try { + await runGhAsync(createCommand); + existing.add(label); + continue; + } catch { + const fallbackCommand = `gh issue label create ${JSON.stringify(label)} --repo ${config.repo} --color ${color}`; + try { await runGhAsync(fallbackCommand); existing.add(label); } catch (_) { /* ignore */ } + } } } catch { // ignore label creation failures @@ -1255,23 +1256,25 @@ async function ensureGithubLabelsOnceAsync(config: GithubConfig, labels: string[ } export async function createGithubIssueAsync(config: GithubConfig, payload: { title: string; body: string; labels: string[] }): Promise { - const command = `gh issue create --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; - const output = await runGhAsync(command, payload.body); - let issueNumber: number | null = null; - const match = output.match(/\/(\d+)$/); - if (match) issueNumber = parseInt(match[1], 10); - if (issueNumber !== null && payload.labels.length > 0) { - // Ensure labels once per process to reduce API calls - await ensureGithubLabelsOnceAsync(config, payload.labels); - try { await runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`); } catch (_) {} - } - if (issueNumber === null) { - const view = await runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`); - if (Array.isArray(view) && view.length > 0) return normalizeGithubIssue(view[0]); - throw new Error('Failed to create GitHub issue'); - } - const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); - return normalizeGithubIssue(parsed); + return await throttler.schedule(async () => { + const command = `gh issue create --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; + const output = await runGhAsync(command, payload.body); + let issueNumber: number | null = null; + const match = output.match(/\/(\d+)$/); + if (match) issueNumber = parseInt(match[1], 10); + if (issueNumber !== null && payload.labels.length > 0) { + // Ensure labels once per process to reduce API calls + await ensureGithubLabelsOnceAsync(config, payload.labels); + try { await runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`); } catch (_) {} + } + if (issueNumber === null) { + const view = await runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`); + if (Array.isArray(view) && view.length > 0) return normalizeGithubIssue(view[0]); + throw new Error('Failed to create GitHub issue'); + } + const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); + return normalizeGithubIssue(parsed); + }); } export async function updateGithubIssueAsync( @@ -1279,57 +1282,61 @@ export async function updateGithubIssueAsync( issueNumber: number, payload: { title: string; body: string; labels: string[]; state: 'open' | 'closed' } ): Promise { - // Fetch current issue once and compute minimal set of operations - let current: GithubIssueRecord; - try { - current = await getGithubIssueAsync(config, issueNumber); - } catch { - current = getGithubIssue(config, issueNumber); - } - - const ops: Array> = []; - const titleChanged = (current.title || '') !== (payload.title || ''); - const bodyChanged = (current.body || '') !== (payload.body || ''); - // Only edit title/body if something changed - if (titleChanged || bodyChanged) { - const command = `gh issue edit ${issueNumber} --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; - ops.push(runGhAsync(command, payload.body).then(() => {}).catch(() => {})); - } + // Run the entire update flow as a single scheduled task to avoid + // serializing internal parallel operations via per-call scheduling. + return await throttler.schedule(async () => { + // Fetch current issue once and compute minimal set of operations + let current: GithubIssueRecord; + try { + current = await getGithubIssueAsync(config, issueNumber); + } catch { + current = getGithubIssue(config, issueNumber); + } - // State change: only close/reopen when different - if (payload.state === 'closed' && current.state !== 'closed') { - ops.push(runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); - } else if (payload.state === 'open' && current.state === 'closed') { - ops.push(runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); - } + const ops: Array> = []; + const titleChanged = (current.title || '') !== (payload.title || ''); + const bodyChanged = (current.body || '') !== (payload.body || ''); + // Only edit title/body if something changed + if (titleChanged || bodyChanged) { + const command = `gh issue edit ${issueNumber} --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`; + ops.push(runGhAsync(command, payload.body).then(() => {}).catch(() => {})); + } - // Labels: compute status labels to remove and labels to add - if (payload.labels.length > 0) { - const desiredSet = new Set(payload.labels); - // Remove any single-valued category labels (stage, priority, status, type, - // risk, effort) that are on the issue but not in the desired set. This - // prevents label accumulation when e.g. stage changes from idea -> done. - const staleLabelsToRemove = current.labels.filter(label => isSingleValueCategoryLabel(label, config.labelPrefix) && !desiredSet.has(label)); - if (staleLabelsToRemove.length > 0) { - ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`).then(() => {}).catch(() => {})); + // State change: only close/reopen when different + if (payload.state === 'closed' && current.state !== 'closed') { + ops.push(runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); + } else if (payload.state === 'open' && current.state === 'closed') { + ops.push(runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {})); } - // Compute labels that are not already present - const labelsToAdd = payload.labels.filter(l => !current.labels.includes(l)); - if (labelsToAdd.length > 0) { - await ensureGithubLabelsOnceAsync(config, labelsToAdd); - ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`).then(() => {}).catch(() => {})); + // Labels: compute status labels to remove and labels to add + if (payload.labels.length > 0) { + const desiredSet = new Set(payload.labels); + // Remove any single-valued category labels (stage, priority, status, type, + // risk, effort) that are on the issue but not in the desired set. This + // prevents label accumulation when e.g. stage changes from idea -> done. + const staleLabelsToRemove = current.labels.filter(label => isSingleValueCategoryLabel(label, config.labelPrefix) && !desiredSet.has(label)); + if (staleLabelsToRemove.length > 0) { + ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`).then(() => {}).catch(() => {})); + } + + // Compute labels that are not already present + const labelsToAdd = payload.labels.filter(l => !current.labels.includes(l)); + if (labelsToAdd.length > 0) { + await ensureGithubLabelsOnceAsync(config, labelsToAdd); + ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`).then(() => {}).catch(() => {})); + } } - } - // Execute operations — remove stale labels first, then add new ones, - // to avoid transient states where both old and new labels coexist. - if (ops.length > 0) await Promise.all(ops); + // Execute operations — remove stale labels first, then add new ones, + // to avoid transient states where both old and new labels coexist. + if (ops.length > 0) await Promise.all(ops); - // If no ops ran, return current object, else fetch fresh state - if (ops.length === 0) return current; - const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); - return normalizeGithubIssue(parsed); + // If no ops ran, return current object, else fetch fresh state + if (ops.length === 0) return current; + const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`); + return normalizeGithubIssue(parsed); + }); } export async function getGithubIssueAsync(config: GithubConfig, issueNumber: number): Promise { diff --git a/test/throttler.test.ts b/test/throttler.test.ts new file mode 100644 index 0000000..5b8ca11 --- /dev/null +++ b/test/throttler.test.ts @@ -0,0 +1,66 @@ +import { describe, it, expect } from 'vitest'; +import { TokenBucketThrottler, makeThrottlerFromEnv } from '../src/github-throttler.js'; + +// Fake clock that we can advance manually +class FakeClock { + private t = 0; + now() { return this.t; } + advance(ms: number) { this.t += ms; } +} + +describe('TokenBucketThrottler - basic token semantics', () => { + it('starts with burst tokens and consumes one per scheduled task', async () => { + const clock = new FakeClock(); + const t = new TokenBucketThrottler({ rate: 1, burst: 2, concurrency: 10, clock }); + let ran = 0; + await Promise.all([ + t.schedule(async () => { ran += 1; return 1; }), + t.schedule(async () => { ran += 1; return 2; }), + ]); + expect(ran).toBe(2); + }); + + it('refills tokens over time according to rate', async () => { + const clock = new FakeClock(); + const t = new TokenBucketThrottler({ rate: 1, burst: 2, concurrency: 10, clock }); + // consume burst + await t.schedule(() => 1); + await t.schedule(() => 2); + // schedule a third task which will wait for a token + const p = t.schedule(() => 3); + // advance clock less than required -> still pending + clock.advance(500); + // allow event loop to process any timers + await new Promise(r => setTimeout(r, 0)); + // not resolved yet + let resolved = false; + p.then(() => { resolved = true; }); + await new Promise(r => setTimeout(r, 0)); + expect(resolved).toBe(false); + // advance one second to refill 1 token + clock.advance(500); + await new Promise(r => setTimeout(r, 0)); + await p; + expect(resolved).toBe(true); + }); +}); + +describe('TokenBucketThrottler - concurrency cap', () => { + it('enforces concurrency cap', async () => { + const clock = new FakeClock(); + const t = new TokenBucketThrottler({ rate: 10, burst: 10, concurrency: 1, clock }); + let running = 0; + const tasks = Array.from({ length: 3 }, () => t.schedule(async () => { + running += 1; + // hang until we advance clock (simulate async work) + await new Promise(r => setTimeout(r, 0)); + running -= 1; + return true; + })); + // allow tasks to start + await new Promise(r => setTimeout(r, 0)); + // only one should be running due to concurrency cap + expect(running).toBeLessThanOrEqual(1); + await Promise.all(tasks); + }); +});