Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/github-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
labelFieldsDiffer,
getLatestLabelEventTimestamp,
} from './github.js';
import throttler from './github-throttler.js';
import { increment, snapshot, diff } from './github-metrics.js';
import { mergeWorkItems } from './sync.js';

Expand Down Expand Up @@ -298,7 +299,7 @@ export async function upsertIssuesFromWorkItems(
}
if (item.githubIssueNumber) {
increment('api.issue.update');
issue = await updateGithubIssueAsync(config, item.githubIssueNumber, payload);
issue = await updateGithubIssueAsync(config, item.githubIssueNumber!, payload);
if (item.status === 'deleted') {
result.closed += 1;
result.syncedItems.push({
Expand Down Expand Up @@ -345,7 +346,7 @@ export async function upsertIssuesFromWorkItems(
if (shouldSyncCommentsNow && issueNumber) {
const commentListStart = Date.now();
increment('api.comment.list');
const existingComments = await listGithubIssueCommentsAsync(config, issueNumber);
const existingComments = await listGithubIssueCommentsAsync(config, issueNumber!);
timing.commentListMs += Date.now() - commentListStart;
const commentUpsertStart = Date.now();
const commentSummary = await upsertGithubIssueCommentsAsync(config, issueNumber, itemComments, existingComments);
Expand Down Expand Up @@ -403,7 +404,7 @@ export async function upsertIssuesFromWorkItems(
const bodyMatch = (existing.body || '').trim() === body.trim();
if (!bodyMatch) {
increment('api.comment.update');
const updatedComment = await updateGithubIssueCommentAsync(issueConfig, existing.id, body);
const updatedComment = await updateGithubIssueCommentAsync(issueConfig, existing.id!, body);
// Persist mapping back to local comment
comment.githubCommentId = existing.id;
comment.githubCommentUpdatedAt = updatedComment.updatedAt;
Expand All @@ -418,7 +419,7 @@ export async function upsertIssuesFromWorkItems(

// No GH comment mapping found — create a new comment
increment('api.comment.create');
const createdComment = await createGithubIssueCommentAsync(issueConfig, issueNumber, body);
const createdComment = await createGithubIssueCommentAsync(issueConfig, issueNumber, body);
// Persist mapping back to local comment so future runs can directly reference by ID
comment.githubCommentId = createdComment.id;
comment.githubCommentUpdatedAt = createdComment.updatedAt;
Expand Down
161 changes: 161 additions & 0 deletions src/github-throttler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Small token-bucket throttler with optional concurrency cap.
* - Rate: tokens per second
* - Burst: bucket capacity (initial tokens = burst)
* - Concurrency: max number of concurrent running tasks
*
* The implementation keeps a FIFO queue of pending tasks and attempts to
* dispatch them when both a token is available and concurrency allows.
* The clock is injectable to allow deterministic unit tests.
*/

export type Clock = { now(): number };

export type ThrottlerOptions = {
rate: number; // tokens per second
burst: number; // bucket capacity
concurrency: number; // max concurrent tasks (0 or Infinity = unlimited)
clock?: Clock;
};

type Task<T> = {
fn: () => Promise<T> | T;
resolve: (v: T) => void;
reject: (e: unknown) => void;
};

export class TokenBucketThrottler {
private rate: number;
private burst: number;
private concurrency: number;
private clock: Clock;

private tokens: number;
private lastRefill: number; // ms
private active = 0;
private queue: Array<Task<unknown>> = [];
private debug = false;

constructor(opts: ThrottlerOptions) {
this.rate = opts.rate;
this.burst = Math.max(1, Math.floor(opts.burst));
this.concurrency = opts.concurrency <= 0 ? Infinity : Math.floor(opts.concurrency);
this.clock = opts.clock || { now: () => Date.now() };

// start full
this.tokens = this.burst;
this.lastRefill = this.clock.now();
// Enable debug when explicit env var set or when the process is started
// with a `--verbose` flag (useful when running test runner with `--verbose`).
this.debug = Boolean(process.env.WL_GITHUB_THROTTLER_DEBUG) || (Array.isArray(process.argv) && process.argv.includes('--verbose'));
}

schedule<T>(fn: () => Promise<T> | T): Promise<T> {
return new Promise<T>((resolve, reject) => {
const task: Task<T> = { fn, resolve, reject } as Task<T>;
this.queue.push(task as Task<unknown>);
// try dispatch immediately
this.processQueue();
});
}

private refillTokens(): void {
const now = this.clock.now();
if (now <= this.lastRefill) return;
const elapsedMs = now - this.lastRefill;
const toAdd = (elapsedMs / 1000) * this.rate;
if (toAdd <= 0) return;
this.tokens = Math.min(this.burst, this.tokens + toAdd);
this.lastRefill = now;
}

private scheduleProcess(delayMs: number): void {
// schedule a future attempt to process the queue
setTimeout(() => this.processQueue(), Math.max(0, Math.floor(delayMs)));
}

private processQueue(): void {
// refill using clock
this.refillTokens();

// If no queued tasks, nothing to do
if (this.queue.length === 0) {
if (this.debug) console.debug(`${new Date().toISOString()} [throttler] idle tokens=${this.tokens.toFixed(2)} active=${this.active} queue=0`);
return;
}

// If we have no tokens, compute next token arrival and schedule
if (this.tokens < 1) {
const missing = 1 - this.tokens;
const msUntil = (missing / this.rate) * 1000;
if (this.debug) console.debug(`${new Date().toISOString()} [throttler] no tokens (tokens=${this.tokens.toFixed(2)}), scheduling next check in ${Math.ceil(msUntil)}ms queue=${this.queue.length} active=${this.active}`);
this.scheduleProcess(msUntil);
return;
}

// If concurrency limit reached, wait for running tasks to complete
if (this.active >= this.concurrency) return;

// Pop next task and run it consuming one token
const task = this.queue.shift() as Task<unknown> | undefined;
if (!task) return;

// consume one token
this.tokens -= 1;
// Ensure tokens not negative
if (this.tokens < 0) this.tokens = 0;

this.active += 1;
if (this.debug) console.debug(`${new Date().toISOString()} [throttler] dispatch task (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`);

// Execute task
Promise.resolve()
.then(() => task.fn())
.then((res) => {
this.active -= 1;
(task.resolve as (v: unknown) => void)(res);
if (this.debug) console.debug(`${new Date().toISOString()} [throttler] task complete (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length})`);
// process more tasks (immediately) - may schedule next refill internally
this.processQueue();
})
.catch((err) => {
this.active -= 1;
task.reject(err);
if (this.debug) console.debug(`${new Date().toISOString()} [throttler] task error (active=${this.active} tokens=${this.tokens.toFixed(2)} queue=${this.queue.length}) ${String(err?.message ?? err)}`);
this.processQueue();
});

// After starting one, attempt to start more if possible
// Use setImmediate style to avoid deep recursion
if (typeof setImmediate !== 'undefined') setImmediate(() => this.processQueue());
else this.scheduleProcess(0);
}
}

/**
* Make a throttler instance from environment variables (or provided overrides)
*/
export function makeThrottlerFromEnv(overrides?: Partial<ThrottlerOptions>): TokenBucketThrottler {
const rate = Number(process.env.WL_GITHUB_RATE || '6');
const burst = Number(process.env.WL_GITHUB_BURST || '12');
// Only enforce a concurrency cap if the env var is explicitly set. Leaving
// it unset preserves existing per-callsite concurrency controls. When not
// set we use Infinity (unlimited) so the throttler only enforces rate.
const concurrency = process.env.WL_GITHUB_CONCURRENCY !== undefined
? Number(process.env.WL_GITHUB_CONCURRENCY)
: Infinity;

const opts: ThrottlerOptions = {
rate: overrides?.rate ?? rate,
burst: overrides?.burst ?? burst,
concurrency: overrides?.concurrency ?? concurrency,
clock: overrides?.clock,
} as ThrottlerOptions;

return new TokenBucketThrottler(opts);
}

// Default shared instance
export const throttler = makeThrottlerFromEnv();

export default throttler;
145 changes: 76 additions & 69 deletions src/github.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { execSync, spawnSync, spawn } from 'child_process';
import throttler from './github-throttler.js';
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
Expand Down Expand Up @@ -1204,14 +1205,14 @@ export async function ensureGithubLabelsAsync(config: GithubConfig, labels: stri
if (existing.has(label)) continue;
const color = labelColor(label);
const createCommand = `gh api -X POST repos/${owner}/${name}/labels -f name=${JSON.stringify(label)} -f color=${JSON.stringify(color)}`;
try {
await runGhAsync(createCommand);
existing.add(label);
continue;
} catch {
const fallbackCommand = `gh issue label create ${JSON.stringify(label)} --repo ${config.repo} --color ${color}`;
try { await runGhAsync(fallbackCommand); existing.add(label); } catch (_) { /* ignore */ }
}
try {
await runGhAsync(createCommand);
existing.add(label);
continue;
} catch {
const fallbackCommand = `gh issue label create ${JSON.stringify(label)} --repo ${config.repo} --color ${color}`;
try { await runGhAsync(fallbackCommand); existing.add(label); } catch (_) { /* ignore */ }
}
}
} catch {
// ignore label creation failures
Expand Down Expand Up @@ -1255,81 +1256,87 @@ async function ensureGithubLabelsOnceAsync(config: GithubConfig, labels: string[
}

export async function createGithubIssueAsync(config: GithubConfig, payload: { title: string; body: string; labels: string[] }): Promise<GithubIssueRecord> {
const command = `gh issue create --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`;
const output = await runGhAsync(command, payload.body);
let issueNumber: number | null = null;
const match = output.match(/\/(\d+)$/);
if (match) issueNumber = parseInt(match[1], 10);
if (issueNumber !== null && payload.labels.length > 0) {
// Ensure labels once per process to reduce API calls
await ensureGithubLabelsOnceAsync(config, payload.labels);
try { await runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`); } catch (_) {}
}
if (issueNumber === null) {
const view = await runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`);
if (Array.isArray(view) && view.length > 0) return normalizeGithubIssue(view[0]);
throw new Error('Failed to create GitHub issue');
}
const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`);
return normalizeGithubIssue(parsed);
return await throttler.schedule(async () => {
const command = `gh issue create --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`;
const output = await runGhAsync(command, payload.body);
let issueNumber: number | null = null;
const match = output.match(/\/(\d+)$/);
if (match) issueNumber = parseInt(match[1], 10);
if (issueNumber !== null && payload.labels.length > 0) {
// Ensure labels once per process to reduce API calls
await ensureGithubLabelsOnceAsync(config, payload.labels);
try { await runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(payload.labels.join(','))}`); } catch (_) {}
}
if (issueNumber === null) {
const view = await runGhJsonAsync(`gh issue list --repo ${config.repo} --limit 1 --json number,id,title,body,state,labels,updatedAt`);
if (Array.isArray(view) && view.length > 0) return normalizeGithubIssue(view[0]);
throw new Error('Failed to create GitHub issue');
}
const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`);
return normalizeGithubIssue(parsed);
});
}

export async function updateGithubIssueAsync(
config: GithubConfig,
issueNumber: number,
payload: { title: string; body: string; labels: string[]; state: 'open' | 'closed' }
): Promise<GithubIssueRecord> {
// Fetch current issue once and compute minimal set of operations
let current: GithubIssueRecord;
try {
current = await getGithubIssueAsync(config, issueNumber);
} catch {
current = getGithubIssue(config, issueNumber);
}

const ops: Array<Promise<void>> = [];
const titleChanged = (current.title || '') !== (payload.title || '');
const bodyChanged = (current.body || '') !== (payload.body || '');
// Only edit title/body if something changed
if (titleChanged || bodyChanged) {
const command = `gh issue edit ${issueNumber} --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`;
ops.push(runGhAsync(command, payload.body).then(() => {}).catch(() => {}));
}
// Run the entire update flow as a single scheduled task to avoid
// serializing internal parallel operations via per-call scheduling.
return await throttler.schedule(async () => {
// Fetch current issue once and compute minimal set of operations
let current: GithubIssueRecord;
try {
current = await getGithubIssueAsync(config, issueNumber);
} catch {
current = getGithubIssue(config, issueNumber);
}

// State change: only close/reopen when different
if (payload.state === 'closed' && current.state !== 'closed') {
ops.push(runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {}));
} else if (payload.state === 'open' && current.state === 'closed') {
ops.push(runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {}));
}
const ops: Array<Promise<void>> = [];
const titleChanged = (current.title || '') !== (payload.title || '');
const bodyChanged = (current.body || '') !== (payload.body || '');
// Only edit title/body if something changed
if (titleChanged || bodyChanged) {
const command = `gh issue edit ${issueNumber} --repo ${config.repo} --title ${JSON.stringify(payload.title)} --body-file -`;
ops.push(runGhAsync(command, payload.body).then(() => {}).catch(() => {}));
}

// Labels: compute status labels to remove and labels to add
if (payload.labels.length > 0) {
const desiredSet = new Set(payload.labels);
// Remove any single-valued category labels (stage, priority, status, type,
// risk, effort) that are on the issue but not in the desired set. This
// prevents label accumulation when e.g. stage changes from idea -> done.
const staleLabelsToRemove = current.labels.filter(label => isSingleValueCategoryLabel(label, config.labelPrefix) && !desiredSet.has(label));
if (staleLabelsToRemove.length > 0) {
ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`).then(() => {}).catch(() => {}));
// State change: only close/reopen when different
if (payload.state === 'closed' && current.state !== 'closed') {
ops.push(runGhAsync(`gh issue close ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {}));
} else if (payload.state === 'open' && current.state === 'closed') {
ops.push(runGhAsync(`gh issue reopen ${issueNumber} --repo ${config.repo}`).then(() => {}).catch(() => {}));
}

// Compute labels that are not already present
const labelsToAdd = payload.labels.filter(l => !current.labels.includes(l));
if (labelsToAdd.length > 0) {
await ensureGithubLabelsOnceAsync(config, labelsToAdd);
ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`).then(() => {}).catch(() => {}));
// Labels: compute status labels to remove and labels to add
if (payload.labels.length > 0) {
const desiredSet = new Set(payload.labels);
// Remove any single-valued category labels (stage, priority, status, type,
// risk, effort) that are on the issue but not in the desired set. This
// prevents label accumulation when e.g. stage changes from idea -> done.
const staleLabelsToRemove = current.labels.filter(label => isSingleValueCategoryLabel(label, config.labelPrefix) && !desiredSet.has(label));
if (staleLabelsToRemove.length > 0) {
ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --remove-label ${JSON.stringify(staleLabelsToRemove.join(','))}`).then(() => {}).catch(() => {}));
}

// Compute labels that are not already present
const labelsToAdd = payload.labels.filter(l => !current.labels.includes(l));
if (labelsToAdd.length > 0) {
await ensureGithubLabelsOnceAsync(config, labelsToAdd);
ops.push(runGhAsync(`gh issue edit ${issueNumber} --repo ${config.repo} --add-label ${JSON.stringify(labelsToAdd.join(','))}`).then(() => {}).catch(() => {}));
}
}
}

// Execute operations — remove stale labels first, then add new ones,
// to avoid transient states where both old and new labels coexist.
if (ops.length > 0) await Promise.all(ops);
// Execute operations — remove stale labels first, then add new ones,
// to avoid transient states where both old and new labels coexist.
if (ops.length > 0) await Promise.all(ops);

// If no ops ran, return current object, else fetch fresh state
if (ops.length === 0) return current;
const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`);
return normalizeGithubIssue(parsed);
// If no ops ran, return current object, else fetch fresh state
if (ops.length === 0) return current;
const parsed = await runGhJsonAsync(`gh issue view ${issueNumber} --repo ${config.repo} --json number,id,title,body,state,labels,updatedAt`);
return normalizeGithubIssue(parsed);
});
}

export async function getGithubIssueAsync(config: GithubConfig, issueNumber: number): Promise<GithubIssueRecord> {
Expand Down
Loading
Loading