Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 80 additions & 8 deletions packages/adapter-openclaw/src/DkgNodePlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ export class DkgNodePlugin {
* reachability rules.
*/
private hookSurfaceInstalledAt: WeakMap<HookSurface, number> = new WeakMap();
private typedHookFireSeq = 0;
private typedHookFireGeneration: Map<string, number> = new Map();
/**
* Grace window before a never-fired surface becomes evictable. Long
* enough that a slow gateway dispatch path doesn't trigger spurious
Expand Down Expand Up @@ -830,16 +832,36 @@ export class DkgNodePlugin {
// `installedVia: 'none'`. The `installedVia: 'none'` precondition
// guarantees we're not double-binding a live handler.
if (typedNeedsRetry('before_prompt_build')) {
this.hookSurface.install('typed', 'before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx));
this.hookSurface.install(
'typed',
'before_prompt_build',
this.observedTypedHandler('before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx)),
this.observedTypedOptions('before_prompt_build'),
);
}
if (typedNeedsRetry('agent_end')) {
this.hookSurface.install('typed', 'agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx));
this.hookSurface.install(
'typed',
'agent_end',
this.observedTypedHandler('agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx)),
this.observedTypedOptions('agent_end'),
);
}
if (typedNeedsRetry('before_compaction')) {
this.hookSurface.install('typed', 'before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx), { rareFireExpected: true });
this.hookSurface.install(
'typed',
'before_compaction',
this.observedTypedHandler('before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx)),
this.observedTypedOptions('before_compaction', { rareFireExpected: true }),
);
}
if (typedNeedsRetry('before_reset')) {
this.hookSurface.install('typed', 'before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx), { rareFireExpected: true });
this.hookSurface.install(
'typed',
'before_reset',
this.observedTypedHandler('before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx)),
this.observedTypedOptions('before_reset', { rareFireExpected: true }),
);
}
}
// T7 — Legacy `session_end` retry. Same logic: only retry if the
Expand Down Expand Up @@ -905,15 +927,35 @@ export class DkgNodePlugin {
if (!runtimeHooks) return;

// W3 — auto-recall every turn via before_prompt_build typed hook
this.hookSurface.install('typed', 'before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx));
this.hookSurface.install(
'typed',
'before_prompt_build',
this.observedTypedHandler('before_prompt_build', (ev, ctx) => this.handleBeforePromptBuild(ev, ctx)),
this.observedTypedOptions('before_prompt_build'),
);

// W4a — LLM-driven turn capture via typed hooks. `before_compaction`
// and `before_reset` are rare on healthy gateways; tag them so the
// HookSurface commit-by-timeout warn downgrades to debug (otherwise
// they false-positive within 30s of startup every time).
this.hookSurface.install('typed', 'agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx));
this.hookSurface.install('typed', 'before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx), { rareFireExpected: true });
this.hookSurface.install('typed', 'before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx), { rareFireExpected: true });
this.hookSurface.install(
'typed',
'agent_end',
this.observedTypedHandler('agent_end', (ev, ctx) => this.chatTurnWriter!.onAgentEnd(ev, ctx)),
this.observedTypedOptions('agent_end'),
);
this.hookSurface.install(
'typed',
'before_compaction',
this.observedTypedHandler('before_compaction', (ev, ctx) => this.chatTurnWriter!.onBeforeCompaction(ev, ctx)),
this.observedTypedOptions('before_compaction', { rareFireExpected: true }),
);
this.hookSurface.install(
'typed',
'before_reset',
this.observedTypedHandler('before_reset', (ev, ctx) => this.chatTurnWriter!.onBeforeReset(ev, ctx)),
this.observedTypedOptions('before_reset', { rareFireExpected: true }),
);

// W4b — non-LLM channel capture via internal-hook map (PR #216 mechanism).
// Internal hooks fire across both `full` and `setup-runtime` modes, so
Expand Down Expand Up @@ -1029,6 +1071,36 @@ export class DkgNodePlugin {
return false;
}

private recordTypedHookFire(event: string): void {
this.typedHookFireSeq += 1;
this.typedHookFireGeneration.set(event, this.typedHookFireSeq);
}

private observedTypedHookSinceInstall(event: string): () => boolean {
const generationAtInstall = this.typedHookFireGeneration.get(event) ?? 0;
return () => (this.typedHookFireGeneration.get(event) ?? 0) > generationAtInstall;
}

private observedTypedOptions(
event: string,
opts: { rareFireExpected?: boolean } = {},
): { rareFireExpected?: boolean; observedFireSinceInstall: () => boolean } {
return {
...opts,
observedFireSinceInstall: this.observedTypedHookSinceInstall(event),
};
}

private observedTypedHandler(
event: string,
handler: (...args: any[]) => unknown,
): (...args: any[]) => unknown {
return (...args: any[]) => {
this.recordTypedHookFire(event);
return handler(...args);
};
}

/**
* T34 — Bounded-retention eviction for `allHookSurfaces`. Multi-phase
* init can hand the plugin a fresh `api` on every inbound turn, so
Expand Down
53 changes: 38 additions & 15 deletions packages/adapter-openclaw/src/HookSurface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
*
* I4 — deterministic commit timing. After first observed fire OR a 30s
* grace period (whichever first), each event's `commitState` flips to
* `committed-by-fire` or `committed-by-timeout`. Callers can surface a
* warn if a typed-hook event never fires within the grace period.
* `committed-by-fire`, `committed-by-peer-fire`, or
* `committed-by-timeout`. Callers can surface a warn if a typed-hook
* event never fires within the grace period.
*
* C5 — double-registration guard. The same `(kind, event, handler)` triple
* is a no-op on repeat install; we return the existing unsubscribe.
Expand All @@ -68,7 +69,7 @@ export const INTERNAL_HOOK_SYMBOL = Symbol.for('openclaw.internalHookHandlers');
export type InstalledVia = 'on' | 'registerHook' | 'globalThis' | 'none';

/** Commit state per I4 — frozen after first fire or 30s grace. */
export type CommitState = 'pending' | 'committed-by-fire' | 'committed-by-timeout';
export type CommitState = 'pending' | 'committed-by-fire' | 'committed-by-peer-fire' | 'committed-by-timeout';

export interface DispatchStats {
installedVia: InstalledVia;
Expand All @@ -77,6 +78,18 @@ export interface DispatchStats {
installError?: string;
}

export interface HookInstallOptions {
rareFireExpected?: boolean;
/**
* Multi-surface typed-hook installs can race: OpenClaw may dispatch a
* typed event through one retained API registry while sibling registries
* stay idle. When a sibling has observed the same event since this install
* began, this install is proven live enough for the process and should not
* emit a duplicate timeout warning.
*/
observedFireSinceInstall?: () => boolean;
}

/** Minimum logger shape used by HookSurface. */
export interface HookSurfaceLogger {
info?: (...args: unknown[]) => void;
Expand Down Expand Up @@ -157,7 +170,7 @@ export class HookSurface {
kind: HookKind,
event: string,
handler: HookHandler,
opts: { rareFireExpected?: boolean } = {},
opts: HookInstallOptions = {},
): Unsubscribe | null {
const key = `${kind}:${event}`;
if (opts.rareFireExpected) this.rareFireKeys.add(key);
Expand Down Expand Up @@ -211,17 +224,25 @@ export class HookSurface {
const timer = setTimeout(() => {
const s = this.stats.get(key);
if (s && s.commitState === 'pending') {
this.stats.set(key, { ...s, commitState: 'committed-by-timeout' });
const msg =
`[hook-surface] commit-by-timeout: ${key} never fired within ${this.commitGraceMs}ms. ` +
`installedVia=${s.installedVia}, fireCount=0.`;
// Rare-fire hooks (e.g. before_compaction, before_reset) don't
// fire in routine traffic; surface at debug so real install
// failures on frequent hooks aren't drowned out by startup noise.
if (this.rareFireKeys.has(key)) {
this.logger.debug?.(msg);
if (kind === 'typed' && opts.observedFireSinceInstall?.()) {
this.stats.set(key, { ...s, commitState: 'committed-by-peer-fire' });
this.logger.debug?.(
`[hook-surface] commit-by-peer-fire: ${key} observed on another retained surface; ` +
`suppressing duplicate timeout warn.`,
);
} else {
this.logger.warn?.(msg);
this.stats.set(key, { ...s, commitState: 'committed-by-timeout' });
const msg =
`[hook-surface] commit-by-timeout: ${key} never fired within ${this.commitGraceMs}ms. ` +
`installedVia=${s.installedVia}, fireCount=0.`;
// Rare-fire hooks (e.g. before_compaction, before_reset) don't
// fire in routine traffic; surface at debug so real install
// failures on frequent hooks aren't drowned out by startup noise.
if (this.rareFireKeys.has(key)) {
this.logger.debug?.(msg);
} else {
this.logger.warn?.(msg);
}
}
}
this.commitTimers.delete(key);
Expand Down Expand Up @@ -443,7 +464,9 @@ export class HookSurface {
if (!prev) return;
const fireCount = prev.fireCount + 1;
const nextState: CommitState =
prev.commitState === 'pending' ? 'committed-by-fire' : prev.commitState;
prev.commitState === 'pending' || prev.commitState === 'committed-by-peer-fire'
? 'committed-by-fire'
: prev.commitState;
this.stats.set(key, { ...prev, fireCount, commitState: nextState });

if (fireCount === 1) {
Expand Down
97 changes: 97 additions & 0 deletions packages/adapter-openclaw/test/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2686,6 +2686,8 @@ describe('DkgNodePlugin', () => {
const events = onSpy.mock.calls.map((c: any) => c[0]);
expect(events).toContain('before_prompt_build');
expect(events).toContain('agent_end');
expect(events).toContain('before_compaction');
expect(events).toContain('before_reset');
});

it('T31 — multi-phase init re-bind: typed hooks installed on EVERY api so emit-against-old-api still fires', async () => {
Expand Down Expand Up @@ -2747,6 +2749,97 @@ describe('DkgNodePlugin', () => {
expect((plugin as any).allHookSurfaces.size).toBe(2);
});

it('T338 - typed fires on one multi-phase surface suppress sibling timeout warnings', async () => {
vi.useFakeTimers();
const logger = { info: vi.fn(), warn: vi.fn(), debug: vi.fn() };
const plugin = new DkgNodePlugin({
daemonUrl: 'http://localhost:9200',
channel: { enabled: false },
memory: { enabled: false },
} as any);

const makeApi = () => {
const handlers = new Map<string, Array<(...args: any[]) => unknown>>();
const api = {
config: {},
registrationMode: 'full',
registerTool: () => {},
registerHook: vi.fn(),
registerMemoryCapability: vi.fn(),
on: vi.fn((event: string, handler: (...args: any[]) => unknown) => {
const existing = handlers.get(event) ?? [];
existing.push(handler);
handlers.set(event, existing);
}),
logger,
} as unknown as OpenClawPluginApi;
return { api, handlers };
};

const api1 = makeApi();
const api2 = makeApi();
const api3 = makeApi();

try {
plugin.register(api1.api);
plugin.register(api2.api);
plugin.register(api3.api);

const writer = (plugin as any).chatTurnWriter;
writer.onAgentEnd = vi.fn().mockResolvedValue(undefined);

await api2.handlers.get('before_prompt_build')![0](
{ messages: [{ role: 'user', content: 'hello dkg' }] },
{ sessionKey: 's1' },
);
await api2.handlers.get('agent_end')![0](
{ messages: [{ role: 'user', content: 'hello dkg' }, { role: 'assistant', content: 'hi' }] },
{ sessionKey: 's1' },
);

await vi.advanceTimersByTimeAsync(30_000);

const warnMessages = logger.warn.mock.calls.map((args) => String(args[0]));
expect(warnMessages.filter((msg) => msg.includes('typed:before_prompt_build'))).toHaveLength(0);
expect(warnMessages.filter((msg) => msg.includes('typed:agent_end'))).toHaveLength(0);
expect(writer.onAgentEnd).toHaveBeenCalledTimes(1);

const peerCommittedSurfaces = Array.from((plugin as any).allHookSurfaces).filter((surface: any) => {
const stats = surface.getDispatchStats();
return stats['typed:agent_end']?.commitState === 'committed-by-peer-fire';
});
expect(peerCommittedSurfaces.length).toBeGreaterThan(0);
} finally {
await plugin.stop();
vi.useRealTimers();
}
});

it('T338 - full-mode typed install failures still warn loudly', () => {
const logger = { info: vi.fn(), warn: vi.fn(), debug: vi.fn() };
const plugin = new DkgNodePlugin({
daemonUrl: 'http://localhost:9200',
channel: { enabled: false },
memory: { enabled: false },
} as any);
const api: any = {
config: {},
registrationMode: 'full',
registerTool: () => {},
registerHook: vi.fn(),
logger,
};

plugin.register(api);

const warnMessages = logger.warn.mock.calls.map((args) => String(args[0]));
expect(warnMessages.some((msg) => msg.includes('install FAILED: typed hook before_prompt_build'))).toBe(true);
expect(warnMessages.some((msg) => msg.includes('install FAILED: typed hook agent_end'))).toBe(true);
const stats = (plugin as any).hookSurface.getDispatchStats();
expect(stats['typed:before_prompt_build']?.installedVia).toBe('none');
expect(stats['typed:agent_end']?.installedVia).toBe('none');
});

it('T7 — session_end goes through HookSurface so stop() → register() does NOT accumulate handlers', async () => {
// Regression for T7: pre-fix, `session_end` was registered via
// direct `api.registerHook(...)` on every install. After
Expand Down Expand Up @@ -3413,9 +3506,13 @@ describe('DkgNodePlugin', () => {
expect(debugMessages.some((msg) => msg.includes("legacy:session_end"))).toBe(true);
expect(debugMessages.some((msg) => msg.includes("internal:message:received"))).toBe(true);
expect(debugMessages.some((msg) => msg.includes("internal:message:sent"))).toBe(true);
expect(debugMessages.some((msg) => msg.includes("typed:before_compaction"))).toBe(true);
expect(debugMessages.some((msg) => msg.includes("typed:before_reset"))).toBe(true);
expect(warnMessages.some((msg) => msg.includes("legacy:session_end"))).toBe(false);
expect(warnMessages.some((msg) => msg.includes("internal:message:received"))).toBe(false);
expect(warnMessages.some((msg) => msg.includes("internal:message:sent"))).toBe(false);
expect(warnMessages.some((msg) => msg.includes("typed:before_compaction"))).toBe(false);
expect(warnMessages.some((msg) => msg.includes("typed:before_reset"))).toBe(false);
expect(warnMessages.some((msg) => msg.includes("typed:agent_end"))).toBe(true);
} finally {
await plugin.stop();
Expand Down
Loading