From d7b98d0b6b9cc9fe378b6eb3439fce3def36a202 Mon Sep 17 00:00:00 2001 From: Jurij Skornik Date: Thu, 30 Apr 2026 19:24:57 +0200 Subject: [PATCH] fix(openclaw): quiet duplicate typed hook timeouts --- .../adapter-openclaw/src/DkgNodePlugin.ts | 88 +++++++++++++++-- packages/adapter-openclaw/src/HookSurface.ts | 53 +++++++--- packages/adapter-openclaw/test/plugin.test.ts | 97 +++++++++++++++++++ 3 files changed, 215 insertions(+), 23 deletions(-) diff --git a/packages/adapter-openclaw/src/DkgNodePlugin.ts b/packages/adapter-openclaw/src/DkgNodePlugin.ts index 79ec4828..3765f8a7 100644 --- a/packages/adapter-openclaw/src/DkgNodePlugin.ts +++ b/packages/adapter-openclaw/src/DkgNodePlugin.ts @@ -169,6 +169,8 @@ export class DkgNodePlugin { * reachability rules. */ private hookSurfaceInstalledAt: WeakMap = new WeakMap(); + private typedHookFireSeq = 0; + private typedHookFireGeneration: Map = new Map(); /** * Grace window before a never-fired surface becomes evictable. Long * enough that a slow gateway dispatch path doesn't trigger spurious @@ -830,16 +832,36 @@ export class DkgNodePlugin { // `installedVia: 'none'`. The `installedVia: 'none'` precondition // guarantees we're not double-binding a live handler. if (typedNeedsRetry('before_prompt_build')) { - this.hookSurface.install('typed', 'before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx)); + this.hookSurface.install( + 'typed', + 'before_prompt_build', + this.observedTypedHandler('before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx)), + this.observedTypedOptions('before_prompt_build'), + ); } if (typedNeedsRetry('agent_end')) { - this.hookSurface.install('typed', 'agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx)); + this.hookSurface.install( + 'typed', + 'agent_end', + this.observedTypedHandler('agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx)), + this.observedTypedOptions('agent_end'), + ); } if (typedNeedsRetry('before_compaction')) { - this.hookSurface.install('typed', 'before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx), { rareFireExpected: true }); + this.hookSurface.install( + 'typed', + 'before_compaction', + this.observedTypedHandler('before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx)), + this.observedTypedOptions('before_compaction', { rareFireExpected: true }), + ); } if (typedNeedsRetry('before_reset')) { - this.hookSurface.install('typed', 'before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx), { rareFireExpected: true }); + this.hookSurface.install( + 'typed', + 'before_reset', + this.observedTypedHandler('before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx)), + this.observedTypedOptions('before_reset', { rareFireExpected: true }), + ); } } // T7 — Legacy `session_end` retry. Same logic: only retry if the @@ -905,15 +927,35 @@ export class DkgNodePlugin { if (!runtimeHooks) return; // W3 — auto-recall every turn via before_prompt_build typed hook - this.hookSurface.install('typed', 'before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx)); + this.hookSurface.install( + 'typed', + 'before_prompt_build', + this.observedTypedHandler('before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx)), + this.observedTypedOptions('before_prompt_build'), + ); // W4a — LLM-driven turn capture via typed hooks. `before_compaction` // and `before_reset` are rare on healthy gateways; tag them so the // HookSurface commit-by-timeout warn downgrades to debug (otherwise // they false-positive within 30s of startup every time). - this.hookSurface.install('typed', 'agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx)); - this.hookSurface.install('typed', 'before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx), { rareFireExpected: true }); - this.hookSurface.install('typed', 'before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx), { rareFireExpected: true }); + this.hookSurface.install( + 'typed', + 'agent_end', + this.observedTypedHandler('agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx)), + this.observedTypedOptions('agent_end'), + ); + this.hookSurface.install( + 'typed', + 'before_compaction', + this.observedTypedHandler('before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx)), + this.observedTypedOptions('before_compaction', { rareFireExpected: true }), + ); + this.hookSurface.install( + 'typed', + 'before_reset', + this.observedTypedHandler('before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx)), + this.observedTypedOptions('before_reset', { rareFireExpected: true }), + ); // W4b — non-LLM channel capture via internal-hook map (PR #216 mechanism). // Internal hooks fire across both `full` and `setup-runtime` modes, so @@ -1029,6 +1071,36 @@ export class DkgNodePlugin { return false; } + private recordTypedHookFire(event: string): void { + this.typedHookFireSeq += 1; + this.typedHookFireGeneration.set(event, this.typedHookFireSeq); + } + + private observedTypedHookSinceInstall(event: string): () => boolean { + const generationAtInstall = this.typedHookFireGeneration.get(event) ?? 0; + return () => (this.typedHookFireGeneration.get(event) ?? 0) > generationAtInstall; + } + + private observedTypedOptions( + event: string, + opts: { rareFireExpected?: boolean } = {}, + ): { rareFireExpected?: boolean; observedFireSinceInstall: () => boolean } { + return { + ...opts, + observedFireSinceInstall: this.observedTypedHookSinceInstall(event), + }; + } + + private observedTypedHandler( + event: string, + handler: (...args: any[]) => unknown, + ): (...args: any[]) => unknown { + return (...args: any[]) => { + this.recordTypedHookFire(event); + return handler(...args); + }; + } + /** * T34 — Bounded-retention eviction for `allHookSurfaces`. Multi-phase * init can hand the plugin a fresh `api` on every inbound turn, so diff --git a/packages/adapter-openclaw/src/HookSurface.ts b/packages/adapter-openclaw/src/HookSurface.ts index 94072c28..cd1f1e18 100644 --- a/packages/adapter-openclaw/src/HookSurface.ts +++ b/packages/adapter-openclaw/src/HookSurface.ts @@ -41,8 +41,9 @@ * * I4 — deterministic commit timing. After first observed fire OR a 30s * grace period (whichever first), each event's `commitState` flips to - * `committed-by-fire` or `committed-by-timeout`. Callers can surface a - * warn if a typed-hook event never fires within the grace period. + * `committed-by-fire`, `committed-by-peer-fire`, or + * `committed-by-timeout`. Callers can surface a warn if a typed-hook + * event never fires within the grace period. * * C5 — double-registration guard. The same `(kind, event, handler)` triple * is a no-op on repeat install; we return the existing unsubscribe. @@ -68,7 +69,7 @@ export const INTERNAL_HOOK_SYMBOL = Symbol.for('openclaw.internalHookHandlers'); export type InstalledVia = 'on' | 'registerHook' | 'globalThis' | 'none'; /** Commit state per I4 — frozen after first fire or 30s grace. */ -export type CommitState = 'pending' | 'committed-by-fire' | 'committed-by-timeout'; +export type CommitState = 'pending' | 'committed-by-fire' | 'committed-by-peer-fire' | 'committed-by-timeout'; export interface DispatchStats { installedVia: InstalledVia; @@ -77,6 +78,18 @@ export interface DispatchStats { installError?: string; } +export interface HookInstallOptions { + rareFireExpected?: boolean; + /** + * Multi-surface typed-hook installs can race: OpenClaw may dispatch a + * typed event through one retained API registry while sibling registries + * stay idle. When a sibling has observed the same event since this install + * began, this install is proven live enough for the process and should not + * emit a duplicate timeout warning. + */ + observedFireSinceInstall?: () => boolean; +} + /** Minimum logger shape used by HookSurface. */ export interface HookSurfaceLogger { info?: (...args: unknown[]) => void; @@ -157,7 +170,7 @@ export class HookSurface { kind: HookKind, event: string, handler: HookHandler, - opts: { rareFireExpected?: boolean } = {}, + opts: HookInstallOptions = {}, ): Unsubscribe | null { const key = `${kind}:${event}`; if (opts.rareFireExpected) this.rareFireKeys.add(key); @@ -211,17 +224,25 @@ export class HookSurface { const timer = setTimeout(() => { const s = this.stats.get(key); if (s && s.commitState === 'pending') { - this.stats.set(key, { ...s, commitState: 'committed-by-timeout' }); - const msg = - `[hook-surface] commit-by-timeout: ${key} never fired within ${this.commitGraceMs}ms. ` + - `installedVia=${s.installedVia}, fireCount=0.`; - // Rare-fire hooks (e.g. before_compaction, before_reset) don't - // fire in routine traffic; surface at debug so real install - // failures on frequent hooks aren't drowned out by startup noise. - if (this.rareFireKeys.has(key)) { - this.logger.debug?.(msg); + if (kind === 'typed' && opts.observedFireSinceInstall?.()) { + this.stats.set(key, { ...s, commitState: 'committed-by-peer-fire' }); + this.logger.debug?.( + `[hook-surface] commit-by-peer-fire: ${key} observed on another retained surface; ` + + `suppressing duplicate timeout warn.`, + ); } else { - this.logger.warn?.(msg); + this.stats.set(key, { ...s, commitState: 'committed-by-timeout' }); + const msg = + `[hook-surface] commit-by-timeout: ${key} never fired within ${this.commitGraceMs}ms. ` + + `installedVia=${s.installedVia}, fireCount=0.`; + // Rare-fire hooks (e.g. before_compaction, before_reset) don't + // fire in routine traffic; surface at debug so real install + // failures on frequent hooks aren't drowned out by startup noise. + if (this.rareFireKeys.has(key)) { + this.logger.debug?.(msg); + } else { + this.logger.warn?.(msg); + } } } this.commitTimers.delete(key); @@ -443,7 +464,9 @@ export class HookSurface { if (!prev) return; const fireCount = prev.fireCount + 1; const nextState: CommitState = - prev.commitState === 'pending' ? 'committed-by-fire' : prev.commitState; + prev.commitState === 'pending' || prev.commitState === 'committed-by-peer-fire' + ? 'committed-by-fire' + : prev.commitState; this.stats.set(key, { ...prev, fireCount, commitState: nextState }); if (fireCount === 1) { diff --git a/packages/adapter-openclaw/test/plugin.test.ts b/packages/adapter-openclaw/test/plugin.test.ts index 02127bf6..ee0dc0a5 100644 --- a/packages/adapter-openclaw/test/plugin.test.ts +++ b/packages/adapter-openclaw/test/plugin.test.ts @@ -2686,6 +2686,8 @@ describe('DkgNodePlugin', () => { const events = onSpy.mock.calls.map((c: any) => c[0]); expect(events).toContain('before_prompt_build'); expect(events).toContain('agent_end'); + expect(events).toContain('before_compaction'); + expect(events).toContain('before_reset'); }); it('T31 — multi-phase init re-bind: typed hooks installed on EVERY api so emit-against-old-api still fires', async () => { @@ -2747,6 +2749,97 @@ describe('DkgNodePlugin', () => { expect((plugin as any).allHookSurfaces.size).toBe(2); }); + it('T338 - typed fires on one multi-phase surface suppress sibling timeout warnings', async () => { + vi.useFakeTimers(); + const logger = { info: vi.fn(), warn: vi.fn(), debug: vi.fn() }; + const plugin = new DkgNodePlugin({ + daemonUrl: 'http://localhost:9200', + channel: { enabled: false }, + memory: { enabled: false }, + } as any); + + const makeApi = () => { + const handlers = new Map unknown>>(); + const api = { + config: {}, + registrationMode: 'full', + registerTool: () => {}, + registerHook: vi.fn(), + registerMemoryCapability: vi.fn(), + on: vi.fn((event: string, handler: (...args: any[]) => unknown) => { + const existing = handlers.get(event) ?? []; + existing.push(handler); + handlers.set(event, existing); + }), + logger, + } as unknown as OpenClawPluginApi; + return { api, handlers }; + }; + + const api1 = makeApi(); + const api2 = makeApi(); + const api3 = makeApi(); + + try { + plugin.register(api1.api); + plugin.register(api2.api); + plugin.register(api3.api); + + const writer = (plugin as any).chatTurnWriter; + writer.onAgentEnd = vi.fn().mockResolvedValue(undefined); + + await api2.handlers.get('before_prompt_build')![0]( + { messages: [{ role: 'user', content: 'hello dkg' }] }, + { sessionKey: 's1' }, + ); + await api2.handlers.get('agent_end')![0]( + { messages: [{ role: 'user', content: 'hello dkg' }, { role: 'assistant', content: 'hi' }] }, + { sessionKey: 's1' }, + ); + + await vi.advanceTimersByTimeAsync(30_000); + + const warnMessages = logger.warn.mock.calls.map((args) => String(args[0])); + expect(warnMessages.filter((msg) => msg.includes('typed:before_prompt_build'))).toHaveLength(0); + expect(warnMessages.filter((msg) => msg.includes('typed:agent_end'))).toHaveLength(0); + expect(writer.onAgentEnd).toHaveBeenCalledTimes(1); + + const peerCommittedSurfaces = Array.from((plugin as any).allHookSurfaces).filter((surface: any) => { + const stats = surface.getDispatchStats(); + return stats['typed:agent_end']?.commitState === 'committed-by-peer-fire'; + }); + expect(peerCommittedSurfaces.length).toBeGreaterThan(0); + } finally { + await plugin.stop(); + vi.useRealTimers(); + } + }); + + it('T338 - full-mode typed install failures still warn loudly', () => { + const logger = { info: vi.fn(), warn: vi.fn(), debug: vi.fn() }; + const plugin = new DkgNodePlugin({ + daemonUrl: 'http://localhost:9200', + channel: { enabled: false }, + memory: { enabled: false }, + } as any); + const api: any = { + config: {}, + registrationMode: 'full', + registerTool: () => {}, + registerHook: vi.fn(), + logger, + }; + + plugin.register(api); + + const warnMessages = logger.warn.mock.calls.map((args) => String(args[0])); + expect(warnMessages.some((msg) => msg.includes('install FAILED: typed hook before_prompt_build'))).toBe(true); + expect(warnMessages.some((msg) => msg.includes('install FAILED: typed hook agent_end'))).toBe(true); + const stats = (plugin as any).hookSurface.getDispatchStats(); + expect(stats['typed:before_prompt_build']?.installedVia).toBe('none'); + expect(stats['typed:agent_end']?.installedVia).toBe('none'); + }); + it('T7 — session_end goes through HookSurface so stop() → register() does NOT accumulate handlers', async () => { // Regression for T7: pre-fix, `session_end` was registered via // direct `api.registerHook(...)` on every install. After @@ -3413,9 +3506,13 @@ describe('DkgNodePlugin', () => { expect(debugMessages.some((msg) => msg.includes("legacy:session_end"))).toBe(true); expect(debugMessages.some((msg) => msg.includes("internal:message:received"))).toBe(true); expect(debugMessages.some((msg) => msg.includes("internal:message:sent"))).toBe(true); + expect(debugMessages.some((msg) => msg.includes("typed:before_compaction"))).toBe(true); + expect(debugMessages.some((msg) => msg.includes("typed:before_reset"))).toBe(true); expect(warnMessages.some((msg) => msg.includes("legacy:session_end"))).toBe(false); expect(warnMessages.some((msg) => msg.includes("internal:message:received"))).toBe(false); expect(warnMessages.some((msg) => msg.includes("internal:message:sent"))).toBe(false); + expect(warnMessages.some((msg) => msg.includes("typed:before_compaction"))).toBe(false); + expect(warnMessages.some((msg) => msg.includes("typed:before_reset"))).toBe(false); expect(warnMessages.some((msg) => msg.includes("typed:agent_end"))).toBe(true); } finally { await plugin.stop();