Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cli/src/codex/codexLocalLauncher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export async function codexLocalLauncher(session: CodexSession): Promise<'switch
}
const createdScanner = await createCodexSessionScanner({
transcriptPath,
replayExistingEvents: session.importHistory,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MINOR] runCodex() already imports the transcript before launch when --hapi-import-history is set. In local mode this scanner then replays the existing transcript and sends the same user/agent messages again, producing duplicate history in HAPI for hapi codex resume <id> --hapi-import-history.

Suggested fix:

const createdScanner = await createCodexSessionScanner({
    transcriptPath,
    replayExistingEvents: false,
    onSessionId: (sessionId) => {
        session.onSessionFound(sessionId)
    },
    // existing onEvent...
})

onSessionId: (sessionId) => {
session.onSessionFound(sessionId);
},
Expand All @@ -82,6 +83,9 @@ export async function codexLocalLauncher(session: CodexSession): Promise<'switch
session.sendUserMessage(converted.userMessage);
}
if (converted?.message) {
if (converted.message.type === 'token_count') {
session.recordCodexUsage(converted.message);
}
session.sendAgentMessage(converted.message);
}
}
Expand Down
85 changes: 83 additions & 2 deletions cli/src/codex/codexRemoteLauncher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ const harness = vi.hoisted(() => ({
startThreadIds: [] as string[],
resumeThreadIds: [] as string[],
startTurnThreadIds: [] as string[],
remainingThreadSystemErrors: 0
remainingThreadSystemErrors: 0,
transcriptPathByThreadId: new Map<string, string>(),
scannerStarts: [] as Array<{ transcriptPath: string | null; replayExistingEvents?: boolean }>,
scannerCleanups: 0,
scannerEvents: [] as Array<(event: unknown) => void>
}));

vi.mock('./codexAppServerClient', () => {
Expand Down Expand Up @@ -88,6 +92,30 @@ vi.mock('./utils/buildHapiMcpBridge', () => ({
})
}));

vi.mock('@/modules/common/codexSessions', () => ({
findCodexSessionFile: async (threadId: string) => harness.transcriptPathByThreadId.get(threadId) ?? `/tmp/${threadId}.jsonl`
}));

vi.mock('./utils/codexSessionScanner', () => ({
createCodexSessionScanner: async (opts: {
transcriptPath: string | null;
replayExistingEvents?: boolean;
onEvent: (event: unknown) => void;
}) => {
harness.scannerStarts.push({
transcriptPath: opts.transcriptPath,
replayExistingEvents: opts.replayExistingEvents
});
harness.scannerEvents.push(opts.onEvent);
return {
cleanup: async () => {
harness.scannerCleanups += 1;
},
setTranscriptPath: async () => {}
};
}
}));

import { codexRemoteLauncher } from './codexRemoteLauncher';

type FakeAgentState = {
Expand Down Expand Up @@ -115,6 +143,7 @@ function createSessionStub(messages = ['hello from launcher test']) {

const sessionEvents: Array<{ type: string; [key: string]: unknown }> = [];
const codexMessages: unknown[] = [];
const usagePayloads: unknown[] = [];
const thinkingChanges: boolean[] = [];
const foundSessionIds: string[] = [];
let currentModel: string | null | undefined;
Expand Down Expand Up @@ -176,6 +205,9 @@ function createSessionStub(messages = ['hello from launcher test']) {
},
sendUserMessage(text: string) {
client.sendUserMessage(text);
},
recordCodexUsage(payload: unknown) {
usagePayloads.push(payload);
}
};

Expand All @@ -187,7 +219,8 @@ function createSessionStub(messages = ['hello from launcher test']) {
foundSessionIds,
rpcHandlers,
getModel: () => currentModel,
getAgentState: () => agentState
getAgentState: () => agentState,
usagePayloads
};
}

Expand All @@ -200,6 +233,10 @@ describe('codexRemoteLauncher', () => {
harness.resumeThreadIds = [];
harness.startTurnThreadIds = [];
harness.remainingThreadSystemErrors = 0;
harness.transcriptPathByThreadId = new Map();
harness.scannerStarts = [];
harness.scannerCleanups = 0;
harness.scannerEvents = [];
});

it('finishes a turn and emits ready when task lifecycle events include turn_id', async () => {
Expand Down Expand Up @@ -260,4 +297,48 @@ describe('codexRemoteLauncher', () => {
expect(session.sessionId).toBe('thread-2');
expect(session.thinking).toBe(false);
});

it('tails remote Codex transcript for usage without replaying transcript messages', async () => {
harness.transcriptPathByThreadId.set('thread-1', '/tmp/codex-thread-1.jsonl');
const { session, codexMessages, usagePayloads } = createSessionStub();

const exitReason = await codexRemoteLauncher(session as never);

expect(exitReason).toBe('exit');
expect(harness.scannerStarts).toEqual([{
transcriptPath: '/tmp/codex-thread-1.jsonl',
replayExistingEvents: true
}]);

harness.scannerEvents[0]?.({
type: 'event_msg',
payload: {
type: 'token_count',
info: {
total_token_usage: { total_tokens: 42000 },
model_context_window: 128000
}
}
});
harness.scannerEvents[0]?.({
type: 'event_msg',
payload: {
type: 'agent_message',
message: 'transcript duplicate'
}
});

expect(usagePayloads).toHaveLength(1);
expect(usagePayloads[0]).toMatchObject({
type: 'token_count',
info: {
total_token_usage: { total_tokens: 42000 },
model_context_window: 128000
}
});
expect(codexMessages).not.toContainEqual(expect.objectContaining({
message: 'transcript duplicate'
}));
expect(harness.scannerCleanups).toBe(1);
});
});
113 changes: 113 additions & 0 deletions cli/src/codex/codexRemoteLauncher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import type { CodexSession } from './session';
import type { EnhancedMode } from './loop';
import { hasCodexCliOverrides } from './utils/codexCliOverrides';
import { AppServerEventConverter } from './utils/appServerEventConverter';
import { convertCodexEvent } from './utils/codexEventConverter';
import { createCodexSessionScanner, type CodexSessionScanner } from './utils/codexSessionScanner';
import { registerAppServerPermissionHandlers } from './utils/appServerPermissionAdapter';
import { buildThreadStartParams, buildTurnStartParams } from './utils/appServerConfig';
import { shouldIgnoreTerminalEvent } from './utils/terminalEventGuard';
import { findCodexSessionFile } from '@/modules/common/codexSessions';
import {
RemoteLauncherBase,
type RemoteLauncherDisplayContext,
Expand All @@ -35,6 +38,10 @@ class CodexRemoteLauncher extends RemoteLauncherBase {
private abortController: AbortController = new AbortController();
private currentThreadId: string | null = null;
private currentTurnId: string | null = null;
private usageScanner: CodexSessionScanner | null = null;
private usageScannerThreadId: string | null = null;
private usageScannerSetup: Promise<void> | null = null;
private shuttingDown = false;

constructor(session: CodexSession) {
super(process.env.DEBUG ? session.logPath : undefined);
Expand Down Expand Up @@ -94,6 +101,104 @@ class CodexRemoteLauncher extends RemoteLauncherBase {
await this.handleAbort();
}

private async ensureUsageScanner(threadId: string): Promise<void> {
if (this.usageScannerThreadId === threadId && (this.usageScanner || this.usageScannerSetup)) {
return this.usageScannerSetup ?? Promise.resolve();
}

const setupTask = this.replaceUsageScanner(threadId);
this.usageScannerSetup = setupTask.finally(() => {
if (this.usageScannerSetup === setupTask) {
this.usageScannerSetup = null;
}
});
return this.usageScannerSetup;
}

private async replaceUsageScanner(threadId: string): Promise<void> {
const previousScanner = this.usageScanner;
this.usageScanner = null;
this.usageScannerThreadId = threadId;
if (previousScanner) {
await previousScanner.cleanup();
}

const transcriptPath = await this.findTranscriptWithRetry(threadId);
if (this.shuttingDown || this.usageScannerThreadId !== threadId) {
return;
}
if (!transcriptPath) {
logger.debug(`[Codex] No transcript found for remote thread ${threadId}; usage unavailable`);
return;
}

const scanner = await createCodexSessionScanner({
transcriptPath,
replayExistingEvents: true,
onEvent: (event) => {
const converted = convertCodexEvent(event);
if (converted?.message?.type === 'token_count') {
this.session.recordCodexUsage(converted.message);
}
}
});
if (this.shuttingDown || this.usageScannerThreadId !== threadId) {
await scanner.cleanup();
return;
}
this.usageScanner = scanner;
}

private async findTranscriptWithRetry(threadId: string): Promise<string | null> {
const attempts = 6;
for (let attempt = 0; attempt < attempts; attempt += 1) {
if (this.shuttingDown || this.usageScannerThreadId !== threadId) {
return null;
}
try {
const transcriptPath = await findCodexSessionFile(threadId);
if (transcriptPath) {
return transcriptPath;
}
} catch (error) {
logger.debug(`[Codex] Failed to find transcript for remote thread ${threadId}:`, error);
return null;
}
if (attempt < attempts - 1) {
await this.sleep(250);
}
}
return null;
}

private async sleep(ms: number): Promise<void> {
await new Promise<void>((resolve) => {
const timer = setTimeout(resolve, ms);
timer.unref?.();
});
}

private async cleanupUsageScanner(): Promise<void> {
if (this.usageScannerSetup) {
try {
await this.usageScannerSetup;
} catch (error) {
logger.debug('[Codex] Remote usage scanner setup failed during cleanup:', error);
}
}
this.shuttingDown = true;
const scanner = this.usageScanner;
this.usageScanner = null;
this.usageScannerThreadId = null;
if (scanner) {
try {
await scanner.cleanup();
} catch (error) {
logger.debug('[Codex] Remote usage scanner cleanup failed:', error);
}
}
}

public async launch(): Promise<RemoteLauncherExitReason> {
if (this.session.codexArgs && this.session.codexArgs.length > 0) {
if (hasCodexCliOverrides(this.session.codexCliOverrides)) {
Expand Down Expand Up @@ -255,6 +360,9 @@ class CodexRemoteLauncher extends RemoteLauncherBase {
if (threadId) {
this.currentThreadId = threadId;
session.onSessionFound(threadId);
void this.ensureUsageScanner(threadId).catch((error) => {
logger.debug(`[Codex] Failed to start remote usage scanner for ${threadId}:`, error);
});
}
return;
}
Expand Down Expand Up @@ -419,6 +527,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase {
}
}
if (msgType === 'token_count') {
session.recordCodexUsage(msg);
session.sendAgentMessage({
...msg,
id: randomUUID()
Expand Down Expand Up @@ -684,6 +793,9 @@ class CodexRemoteLauncher extends RemoteLauncherBase {

this.currentThreadId = threadId;
session.onSessionFound(threadId);
void this.ensureUsageScanner(threadId).catch((error) => {
logger.debug(`[Codex] Failed to start remote usage scanner for ${threadId}:`, error);
});
hasThread = true;
} else {
if (!this.currentThreadId) {
Expand Down Expand Up @@ -758,6 +870,7 @@ class CodexRemoteLauncher extends RemoteLauncherBase {

protected async cleanup(): Promise<void> {
logger.debug('[codex-remote]: cleanup start');
await this.cleanupUsageScanner();
try {
await this.appServerClient.disconnect();
} catch (error) {
Expand Down
Loading
Loading