Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Repo: https://github.com/openclaw/acpx

### Fixes

- Sessions/reset: close the live backend session when discarding persistent state so reset flows start a fresh ACP session instead of silently reopening the old one.
- Agents/kiro: use `kiro-cli-chat acp` for the built-in Kiro adapter command to avoid orphan child processes. (#129) Thanks @vokako.
- Agents/cursor: recognize Cursor's `Session \"...\" not found` `session/load` error format so reconnects fall back to `session/new` instead of failing. (#162) Thanks @log-li.
- Output/thinking: preserve line breaks in text-mode `[thinking]` output instead of flattening multi-line thought chunks into one line. (#144) Thanks @Huarong.
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "acpx",
"version": "0.5.1",
"version": "0.5.2",
"description": "Headless CLI client for the Agent Client Protocol (ACP) — talk to coding agents from the command line",
"keywords": [
"acp",
Expand Down
16 changes: 16 additions & 0 deletions src/acp/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ export class AcpClient {
return Boolean(this.initResult?.agentCapabilities?.loadSession);
}

supportsCloseSession(): boolean {
return Boolean(this.initResult?.agentCapabilities?.sessionCapabilities?.close);
}

setEventHandlers(
handlers: Pick<
AcpClientOptions,
Expand Down Expand Up @@ -827,6 +831,18 @@ export class AcpClient {
);
}

async closeSession(sessionId: string): Promise<void> {
const connection = this.getConnection();
await this.runConnectionRequest(() =>
connection.unstable_closeSession({
sessionId,
}),
);
if (this.loadedSessionId === sessionId) {
this.loadedSessionId = undefined;
}
}

async requestCancelActivePrompt(): Promise<boolean> {
const active = this.activePrompt;
if (!active) {
Expand Down
19 changes: 14 additions & 5 deletions src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,22 @@ export class AcpxRuntime implements AcpxRuntimeLike {
});
}

async close(input: { handle: AcpRuntimeHandle; reason: string }): Promise<void> {
async close(input: {
handle: AcpRuntimeHandle;
reason: string;
discardPersistentState?: boolean;
}): Promise<void> {
const state = this.resolveHandleState(input.handle);
const manager = await this.getManager();
await manager.close({
...input.handle,
acpxRecordId: state.acpxRecordId ?? input.handle.acpxRecordId ?? input.handle.sessionKey,
});
await manager.close(
{
...input.handle,
acpxRecordId: state.acpxRecordId ?? input.handle.acpxRecordId ?? input.handle.sessionKey,
},
{
discardPersistentState: input.discardPersistentState,
},
);
}

private async getManager(): Promise<AcpRuntimeManager> {
Expand Down
79 changes: 78 additions & 1 deletion src/runtime/engine/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { randomUUID } from "node:crypto";
import path from "node:path";
import { AcpClient } from "../../acp/client.js";
import { normalizeOutputError } from "../../acp/error-normalization.js";
import { extractAcpError, isAcpResourceNotFoundError } from "../../acp/error-shapes.js";
import { withTimeout } from "../../async-control.js";
import { textPrompt, type PromptInput } from "../../prompt-content.js";
import {
cloneSessionAcpxState,
Expand Down Expand Up @@ -120,6 +122,21 @@ function isoNow(): string {
return new Date().toISOString();
}

function isUnsupportedSessionCloseError(error: unknown): boolean {
const acp = extractAcpError(error);
if (!acp) {
return false;
}
if (acp.code === -32601 || acp.code === -32602) {
return true;
}
if (acp.code !== -32603 || !acp.data || typeof acp.data !== "object") {
return false;
}
const details = (acp.data as { details?: unknown }).details;
return typeof details === "string" && details.toLowerCase().includes("invalid params");
}

function toPromptInput(
text: string,
attachments?: AcpRuntimeTurnAttachment[],
Expand Down Expand Up @@ -611,14 +628,74 @@ export class AcpRuntimeManager {
await controller?.requestCancelActivePrompt();
}

async close(handle: AcpRuntimeHandle): Promise<void> {
async close(
handle: AcpRuntimeHandle,
options: { discardPersistentState?: boolean } = {},
): Promise<void> {
const record = await this.requireRecord(handle.acpxRecordId ?? handle.sessionKey);
await this.cancel(handle);
if (options.discardPersistentState) {
await this.closeBackendSession(record);
record.acpx = {
...record.acpx,
reset_on_next_ensure: true,
};
}
record.closed = true;
record.closedAt = isoNow();
await this.options.sessionStore.save(record);
}

private async closeBackendSession(record: SessionRecord): Promise<void> {
const pendingClient = this.pendingPersistentClients.get(record.acpxRecordId);
if (pendingClient) {
this.pendingPersistentClients.delete(record.acpxRecordId);
}
const reusablePendingClient =
pendingClient?.hasReusableSession(record.acpSessionId) === true ? pendingClient : undefined;
if (pendingClient && !reusablePendingClient) {
await pendingClient.close().catch(() => {});
}

const client =
reusablePendingClient ??
this.createClient({
agentCommand: record.agentCommand,
cwd: record.cwd,
mcpServers: [...(this.options.mcpServers ?? [])],
permissionMode: this.options.permissionMode,
nonInteractivePermissions: this.options.nonInteractivePermissions,
verbose: this.options.verbose,
});

try {
if (!reusablePendingClient) {
await withTimeout(client.start(), this.options.timeoutMs);
}
if (!client.supportsCloseSession()) {
throw new AcpRuntimeError(
"ACP_BACKEND_UNSUPPORTED_CONTROL",
`Agent does not support session/close for ${record.acpxRecordId}.`,
);
}
await withTimeout(client.closeSession(record.acpSessionId), this.options.timeoutMs);
} catch (error) {
if (isUnsupportedSessionCloseError(error)) {
throw new AcpRuntimeError(
"ACP_BACKEND_UNSUPPORTED_CONTROL",
`Agent does not support session/close for ${record.acpxRecordId}.`,
{ cause: error },
);
}
if (isAcpResourceNotFoundError(error)) {
return;
}
throw error;
} finally {
await client.close().catch(() => {});
}
}

private async requireRecord(sessionId: string): Promise<SessionRecord> {
const record = await this.options.sessionStore.load(sessionId);
if (!record) {
Expand Down
5 changes: 4 additions & 1 deletion src/runtime/engine/reuse-policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ import path from "node:path";
import type { SessionRecord } from "../../types.js";

export function shouldReuseExistingRecord(
record: Pick<SessionRecord, "cwd" | "agentCommand" | "acpSessionId">,
record: Pick<SessionRecord, "cwd" | "agentCommand" | "acpSessionId" | "acpx">,
params: {
cwd: string;
agentCommand: string;
resumeSessionId?: string;
},
): boolean {
if (record.acpx?.reset_on_next_ensure === true) {
return false;
}
if (path.resolve(record.cwd) !== path.resolve(params.cwd)) {
return false;
}
Expand Down
6 changes: 5 additions & 1 deletion src/runtime/public/contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ export interface AcpRuntime {
setConfigOption?(input: { handle: AcpRuntimeHandle; key: string; value: string }): Promise<void>;
doctor?(): Promise<AcpRuntimeDoctorReport>;
cancel(input: { handle: AcpRuntimeHandle; reason?: string }): Promise<void>;
close(input: { handle: AcpRuntimeHandle; reason: string }): Promise<void>;
close(input: {
handle: AcpRuntimeHandle;
reason: string;
discardPersistentState?: boolean;
}): Promise<void>;
}

export type AcpSessionRecord = SessionRecord;
Expand Down
4 changes: 4 additions & 0 deletions src/session/persistence/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ function parseAcpxState(raw: unknown): SessionAcpxState | undefined {

const state: SessionAcpxState = {};

if (record.reset_on_next_ensure === true) {
state.reset_on_next_ensure = true;
}

if (typeof record.current_mode_id === "string") {
state.current_mode_id = record.current_mode_id;
}
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ export type SessionConversation = {
};

export type SessionAcpxState = {
reset_on_next_ensure?: boolean;
current_mode_id?: string;
desired_mode_id?: string;
current_model_id?: string;
Expand Down
36 changes: 36 additions & 0 deletions test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ type ClientInternals = {
| undefined;
cancellingSessionIds: Set<string>;
promptPermissionFailures: Map<string, PermissionPromptUnavailableError>;
initResult?: {
agentCapabilities?: {
sessionCapabilities?: {
close?: Record<string, never>;
};
};
};
loadedSessionId?: string;
lastKnownPid?: number;
agentStartedAt?: string;
closing: boolean;
Expand Down Expand Up @@ -469,6 +477,34 @@ test("AcpClient setSessionModel uses session/set_model", async () => {
});
});

test("AcpClient closes sessions through session/close and clears the loaded session id", async () => {
const client = makeClient();
const internals = asInternals(client);
let capturedCloseSessionParams: { sessionId: string } | undefined;
internals.initResult = {
agentCapabilities: {
sessionCapabilities: {
close: {},
},
},
};
internals.loadedSessionId = "session-close-1";
internals.connection = {
unstable_closeSession: async (params: { sessionId: string }) => {
capturedCloseSessionParams = params;
return {};
},
};

assert.equal(client.supportsCloseSession(), true);
await client.closeSession("session-close-1");

assert.deepEqual(capturedCloseSessionParams, {
sessionId: "session-close-1",
});
assert.equal(internals.loadedSessionId, undefined);
});

test("AcpClient session update handling drains queued callbacks and swallows handler failures", async () => {
const notifications: string[] = [];
const client = makeClient({
Expand Down
16 changes: 16 additions & 0 deletions test/runtime-helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ test("runtime reuse policy only keeps compatible records", () => {
cwd: path.resolve("/workspace"),
agentCommand: "codex --acp",
acpSessionId: "sid-1",
acpx: {},
};
assert.equal(
shouldReuseExistingRecord(base, {
Expand Down Expand Up @@ -201,4 +202,19 @@ test("runtime reuse policy only keeps compatible records", () => {
}),
false,
);
assert.equal(
shouldReuseExistingRecord(
{
...base,
acpx: {
reset_on_next_ensure: true,
},
},
{
cwd: "/workspace",
agentCommand: "codex --acp",
},
),
false,
);
});
Loading