Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/agents/claude-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ export class AgentRunner {
private config?: Config;
private activeSessions: Map<string, string> = new Map();
private activeStreams = new Map<string, AsyncIterable<any>>();
private activeAbortControllers = new Map<string, AbortController>();
private interruptFns = new Map<string, () => Promise<void>>();
private onSessionIdUpdate?: (sessionId: string, agentSessionId: string) => void;
private onCompactStart?: (sessionId: string) => void;
Expand Down Expand Up @@ -699,13 +700,19 @@ export class AgentRunner {
env: this.getAgentEnv()
};

// AbortController for this query — enables interrupt() to cancel both
// the connection phase and the streaming phase
const controller = new AbortController();
this.activeAbortControllers.set(sessionId, controller);

const createQuery = (promptInput: string | MessageStream, resumeSessionId?: string) => {
if (useSettingSources) {
// 新方式:SDK 自动加载 CLAUDE.md 和 MCP 配置
return query({
prompt: promptInput as any,
options: {
...commonOptions,
abortController: controller,
settingSources: ['project', 'user'],
systemPrompt: {
type: 'preset' as const,
Expand Down Expand Up @@ -752,6 +759,7 @@ export class AgentRunner {
prompt: promptInput as any,
options: {
...commonOptions,
abortController: controller,
...(resumeSessionId ? { resume: resumeSessionId } : {}),
...(Object.keys(globalMcpServers).length > 0 ? { mcpServers: globalMcpServers } : {}),
...(fullAppend ? {
Expand Down Expand Up @@ -787,6 +795,13 @@ export class AgentRunner {
}

async interrupt(sessionId: string): Promise<void> {
// Abort via controller first (covers connection phase before stream starts)
const controller = this.activeAbortControllers.get(sessionId);
if (controller) {
controller.abort('User interrupt');
this.activeAbortControllers.delete(sessionId);
}

const fn = this.interruptFns.get(sessionId);
if (fn) {
try {
Expand All @@ -810,6 +825,7 @@ export class AgentRunner {

cleanupStream(sessionId: string): void {
this.activeStreams.delete(sessionId);
this.activeAbortControllers.delete(sessionId);
this.interruptFns.delete(sessionId);
}

Expand Down Expand Up @@ -885,6 +901,7 @@ export class AgentRunner {
async closeSession(sessionId: string): Promise<void> {
this.activeSessions.delete(sessionId);
this.activeStreams.delete(sessionId);
this.activeAbortControllers.delete(sessionId);
this.interruptFns.delete(sessionId);
}

Expand Down
30 changes: 21 additions & 9 deletions src/core/message/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,19 +494,31 @@ ${suggestions}`,
const effectiveSystemPrompt = [options?.systemPromptAppend, ...contextParts].filter(Boolean).join('\n') || undefined;

// 可重试错误(403/429/5xx)指数退避重试,最多 3 次
const connectionTimeoutMs = (this.config.idleMonitor?.connectionTimeout ?? 30) * 1000;
const MAX_RETRIES = 3;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
let streamRegistered = false;
try {
const stream = await agent.runQuery(
session.id,
effectivePrompt,
absoluteProjectPath,
session.agentSessionId,
message.images,
effectiveSystemPrompt,
this.sessionManager
);
// Connection-phase timeout: if runQuery() (API handshake) hangs
// beyond connectionTimeout, abort and throw a clear error.
let connectionTimer: ReturnType<typeof setTimeout> | undefined;
const stream = await Promise.race([
agent.runQuery(
session.id,
effectivePrompt,
absoluteProjectPath,
session.agentSessionId,
message.images,
effectiveSystemPrompt,
this.sessionManager
).then(s => { clearTimeout(connectionTimer); return s; }),
new Promise<never>((_, reject) => {
connectionTimer = setTimeout(() => {
agent.interrupt(streamKey).catch(() => {});
reject(new Error('Connection timeout: agent failed to start stream within ' + (connectionTimeoutMs / 1000) + 's'));
}, connectionTimeoutMs);
}),
]);
agent.registerStream(streamKey, stream);
streamRegistered = true;

Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export interface Config {
enabled?: boolean; // 是否启用空闲监控,默认 true
safeModeThreshold?: number; // 连续错误几次进入安全模式,默认 3;设为 0 关闭 safe mode
timeout?: number; // 无输出超时(秒),默认 120
connectionTimeout?: number; // 连接阶段超时(秒),默认 30;query() 调用到流开始前的最大等待
};
showActivities?: 'all' | 'dm-only' | 'owner-dm-only' | 'none'; // 中间输出显示范围(工具活动+流式文本),默认 'all'
}
Expand Down