Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions src/core/pipeline/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,40 @@ function collectTaskOutputText(state: SessionState): {
return { rawOutputText, summaryText };
}

function collectDisplayOutputTextFromState(state: SessionState): string {
const taskResults = state.task_results ?? {};
const tasks = state.task_graph?.tasks ?? [];
const completedIds = state.completed_task_ids ?? [];
if (tasks.length > 0) {
const dependencyIds = new Set(tasks.flatMap((task) => task.depends_on));
const terminalTasks = tasks.filter((task) => !dependencyIds.has(task.id));
for (const task of terminalTasks.reverse()) {
if (!completedIds.includes(task.id)) continue;
const rawOutput = (taskResults[task.id] as Record<string, unknown> | undefined)?.raw_output;
if (typeof rawOutput === "string" && rawOutput.trim().length > 0) {
return rawOutput;
}
}
}

const completedOutputs = completedIds
.map((id) => (taskResults[id] as Record<string, unknown> | undefined)?.raw_output)
.filter((value): value is string => typeof value === "string" && value.trim().length > 0);
const lastCompletedOutput = completedOutputs.at(-1);
if (lastCompletedOutput) return lastCompletedOutput;

return Object.values(taskResults)
.map((result) => (result as Record<string, unknown>).raw_output)
.filter((value): value is string => typeof value === "string" && value.trim().length > 0)
.at(-1) ?? "";
}

function collectDisplayOutputTextFromRecords(records: TaskExecutionRecord[]): string {
return records
.filter((record) => record.status === "completed" && record.rawOutput.trim().length > 0)
.at(-1)?.rawOutput ?? records.map((record) => record.rawOutput).filter(Boolean).join("\n---\n");
Comment on lines +435 to +436

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve failure output when a later task fails

When a multi-step run has an earlier completed task and a later task fails, allOk is false so the final rawOutput uses this helper, but the helper returns the last completed output before considering failed records. For example, a successful create followed by a failed validate now reports the create output while ok: false, hiding the actual adapter error that the previous joined-record behavior exposed.

Useful? React with 👍 / 👎.

}

function applySessionTokenMetrics(
state: SessionState,
inputOriginalText: string,
Expand Down Expand Up @@ -686,7 +720,7 @@ export const orchestratePipeline = async (
summary: `F1 캐시 hit — 세션 ${cachedSessionId} (${Math.round(cacheAge / 86400000)}일 전)`,
}),
);
const { rawOutputText, summaryText } = collectTaskOutputText(cachedSession!);
const { summaryText } = collectTaskOutputText(cachedSession!);
return {
ok: true,
mode: request.mode,
Expand All @@ -695,7 +729,7 @@ export const orchestratePipeline = async (
nextAction: cachedSession!.next_action ?? "캐시된 결과를 반환했습니다.",
originalPrompt: request.userRequest.raw_input,
stages: buildPipelineStages(true),
rawOutput: rawOutputText,
rawOutput: collectDisplayOutputTextFromState(cachedSession!),
sessionId: cachedSessionId,
taskRecords: cachedSession!.completed_task_ids.map((id) => ({
taskId: id,
Expand Down Expand Up @@ -1397,23 +1431,15 @@ export const orchestratePipeline = async (
}

// (6) LLM 실행
// embedded-pane: codex exec은 단순 작업 설명만 기대 — 구조화된 템플릿은 모델을 혼란시킴.
// 다른 모드: 번역된 원본 명령 + RAG 컨텍스트 + 태스크 정보 포함.
let prompt: string;
if (request.presentationMode === "embedded-pane") {
// 한국어 입력이면 응답 언어 지시를 앞에 붙여 codex가 한국어로 응답하도록 한다.
prompt = responseLanguageInstruction
? `${responseLanguageInstruction}${compiledPrompt.compressed_prompt}`
: compiledPrompt.compressed_prompt;
} else {
const promptParts: string[] = [];
if (responseLanguageInstruction) promptParts.push(responseLanguageInstruction.trimEnd());
if (ragContext) promptParts.push(ragContext.trimEnd());
promptParts.push(`[${task.type.toUpperCase()}] ${task.title}`);
promptParts.push(`User request: ${compiledPrompt.compressed_prompt}`);
promptParts.push(`Context: ${executionContext.context_summary}`);
prompt = promptParts.join("\n\n");
}
// 모든 presentation mode에서 task-specific 지시를 유지한다.
// embedded-pane도 원본 프롬프트만 넘기면 여러 task가 같은 일을 반복할 수 있다.
const promptParts: string[] = [];
if (responseLanguageInstruction) promptParts.push(responseLanguageInstruction.trimEnd());
if (ragContext) promptParts.push(ragContext.trimEnd());
promptParts.push(`[${task.type.toUpperCase()}] ${task.title}`);
promptParts.push(`User request: ${compiledPrompt.compressed_prompt}`);
promptParts.push(`Context: ${executionContext.context_summary}`);
const prompt = promptParts.join("\n\n");
if (process.env.DETOKS_DEBUG_ADAPTER_PROMPT === "1") {
process.stderr.write(
`[adapter-prompt] task=${task.id} type=${task.type} bytes=${Buffer.byteLength(prompt, "utf8")}\n` +
Expand Down Expand Up @@ -1654,7 +1680,9 @@ export const orchestratePipeline = async (
originalPrompt: request.userRequest.raw_input,
tokenMetrics: sessionTokenMetrics.tokenMetrics,
stages: buildPipelineStages(allOk),
rawOutput: taskRecords.map((r) => r.rawOutput).filter(Boolean).join("\n---\n"),
rawOutput: allOk
? collectDisplayOutputTextFromState(state)
: collectDisplayOutputTextFromRecords(taskRecords),
sessionId,
taskRecords,
...(adapterTranscript ? { adapterTranscript } : {}),
Expand Down
46 changes: 46 additions & 0 deletions src/core/task-graph/TaskGraphProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ import type { Task, TaskGraph, RequestCategory } from "../../schemas/pipeline.js
* → ParallelClassifier.classify()
*/
export class TaskGraphProcessor {
private static readonly DELIVERABLE_TYPES = new Set<RequestCategory>([
"create",
"document",
"plan",
]);

private static readonly DELIVERABLE_PATTERN =
/\b(presentation|slides?|deck|outline|structure|report|proposal|script|summary|brief|document|docs|guide|plan|roadmap|table|matrix|comparison|q&a|questions?|talk\s+track|message|copy|email|memo|one[-\s]?pager)\b/i;

private static readonly INDEPENDENT_ACTION_START_PATTERN =
/^(?:find|locate|trace|track|follow|show|tell|read|search|explore|browse|inspect|analyze|investigate|explain|diagnose|review|compare|assess|evaluate|create|build|generate|scaffold|implement|add|make|draft|set up|spin up|bootstrap|modify|update|change|fix|patch|edit|refactor|rename|rewrite|remove|replace|improve|optimi[sz]e|tune|correct|enable|disable|toggle|test|validate|verify|assert|confirm|ensure|check|lint|typecheck|run|execute|deploy|start|launch|restart|stop|install|migrate|seed|serve|document|summari[sz]e|describe|write|prepare|plan|design|organize|outline|strategize|propose|break down)\b/i;
Comment on lines +40 to +41

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Recognize data-gathering prefixes before collapsing

Because the collapse check treats fallback execute sentences as context unless they start with this verb list, inputs such as Collect customer requirements. Draft a proposal. or Fetch sales data. Create a report. collapse into a single deliverable task: collect/fetch are not classified by TYPE_PATTERNS and are absent here. That removes the separate data-gathering task that would otherwise feed the final document, so common ordered workflows can be skipped rather than executed sequentially.

Useful? React with 👍 / 👎.


// 'make' 등 단일 키워드가 create/modify 패턴에 먼저 걸리는 숙어들을 TYPE_PATTERNS보다 먼저 처리.
// 패턴 순서가 바뀌어도 이 테이블의 판정은 항상 우선한다.
private static readonly IDIOM_PATTERNS: ReadonlyArray<{
Expand Down Expand Up @@ -255,6 +267,14 @@ export class TaskGraphProcessor {
// ① 먼저 모든 문장의 type을 한 번에 분류 — resolveDependsOn에서 이전 type을 참조하기 때문
const types: RequestCategory[] = sentences.map((s) => this.classifyType(s));

if (this.shouldCollapseSingleDeliverable(sentences, types)) {
const finalType = types[types.length - 1]!;
const mergedSentence = sentences.join(" ");
return TaskGraphSchema.parse({
tasks: [this.buildTask(mergedSentence, 0, finalType, [])],
});
}

// ② 각 문장을 Task로 변환하면서 depends_on 결정
const tasks: Task[] = sentences.map((sentence, index) => {
const type = types[index]!;
Expand Down Expand Up @@ -287,6 +307,32 @@ export class TaskGraphProcessor {
return this.FLOWS_TO[prev]?.includes(curr) ? [`t${index}`] : [];
}

private static shouldCollapseSingleDeliverable(
sentences: string[],
types: RequestCategory[],
): boolean {
if (sentences.length < 2) return false;

const finalSentence = sentences[sentences.length - 1]!;
const finalType = types[types.length - 1]!;
if (!this.DELIVERABLE_TYPES.has(finalType)) return false;
if (!this.DELIVERABLE_PATTERN.test(finalSentence)) return false;

const prefixSentences = sentences.slice(0, -1);
const prefixTypes = types.slice(0, -1);
return prefixSentences.every((sentence, index) =>
this.isContextQualifier(sentence, prefixTypes[index]!),
);
}

private static isContextQualifier(
sentence: string,
type: RequestCategory,
): boolean {
if (type !== "execute") return false;
return !this.INDEPENDENT_ACTION_START_PATTERN.test(sentence.trim());
}

/**
* 하나의 문장으로부터 Task 객체를 생성합니다.
*
Expand Down
153 changes: 152 additions & 1 deletion tests/ts/unit/core/pipeline/orchestrator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,96 @@ describe("orchestratePipeline", () => {
expect(executeWithAdapterMock).toHaveBeenCalledWith(
expect.objectContaining({
presentationMode: "embedded-pane",
prompt: "Respond entirely in Korean.\n\nCreate a new file",
prompt: expect.stringContaining("[CREATE] Create a new file"),
}),
);
expect(executeWithAdapterMock).toHaveBeenCalledWith(
expect.objectContaining({
prompt: expect.stringContaining("Respond entirely in Korean."),
}),
);
expect(executeWithAdapterMock).toHaveBeenCalledWith(
expect.objectContaining({
prompt: expect.stringContaining("User request: Create a new file"),
}),
);
});

it("keeps task-specific instructions in embedded-pane execution", async () => {
nodeRuntimeMocks.completeChatWithNodeLlamaCpp.mockResolvedValueOnce({
content: "Find the module. Analyze the flow.",
raw_response: {
choices: [{ message: { content: "Find the module. Analyze the flow." } }],
},
inference_time_sec: 0.01,
});
executeWithAdapterMock.mockResolvedValue({
ok: true,
adapter: "codex",
rawOutput: "[mock-embedded]",
exitCode: 0,
});

await orchestratePipeline({
mode: "run",
adapter: "codex",
executionMode: "stub",
verbose: false,
presentationMode: "embedded-pane",
userRequest: {
raw_input: "Find the module. Analyze the flow.",
},
});

expect(executeWithAdapterMock).toHaveBeenCalledTimes(2);
expect(executeWithAdapterMock).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
presentationMode: "embedded-pane",
prompt: expect.stringContaining("[EXPLORE] Find the module."),
}),
);
expect(executeWithAdapterMock).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
presentationMode: "embedded-pane",
prompt: expect.stringContaining("[ANALYZE] Analyze the flow."),
}),
);
});

it("returns only the terminal task output for successful multi-task runs", async () => {
executeWithAdapterMock
.mockResolvedValueOnce({
ok: true,
adapter: "codex",
rawOutput: "[mock] explored module",
exitCode: 0,
})
.mockResolvedValueOnce({
ok: true,
adapter: "codex",
rawOutput: "[mock] final analysis",
exitCode: 0,
});

const result = await orchestratePipeline({
mode: "run",
adapter: "codex",
executionMode: "stub",
verbose: false,
userRequest: {
raw_input: "Find the module. Analyze the flow.",
},
});

expect(result.taskRecords).toEqual([
{ taskId: "t1", status: "completed", rawOutput: "[mock] explored module" },
{ taskId: "t2", status: "completed", rawOutput: "[mock] final analysis" },
]);
expect(result.rawOutput).toBe("[mock] final analysis");
});

it("skips completed tasks from an existing session and resumes remaining work", async () => {
vi.spyOn(SessionStateManager, "sessionExists").mockResolvedValue(true);
vi.spyOn(SessionStateManager, "loadSession").mockResolvedValue({
Expand Down Expand Up @@ -669,6 +754,72 @@ describe("orchestratePipeline", () => {
expect(executeWithAdapterMock).not.toHaveBeenCalled();
});

it("F1: 캐시된 multi-task 세션은 terminal task output만 반환한다", async () => {
const cachedSession = {
shared_context: {
session_id: "cached-multi-session",
raw_input_hash: "willbematched",
project_id: "git-test123",
failed_task_ids: [],
},
task_results: {
t1: {
task_id: "t1",
success: true,
raw_output: "cached explore output",
summary: "cached explore output",
},
t2: {
task_id: "t2",
success: true,
raw_output: "cached final output",
summary: "cached final output",
},
},
current_task_id: null,
completed_task_ids: ["t1", "t2"],
task_graph: {
tasks: [
{
id: "t1",
type: "explore",
status: "pending",
title: "Find the module.",
input_hash: "hash1",
depends_on: [],
},
{
id: "t2",
type: "analyze",
status: "pending",
title: "Analyze the flow.",
input_hash: "hash2",
depends_on: ["t1"],
},
],
},
last_summary: "2개 작업을 모두 완료했습니다",
next_action: "파이프라인이 완료되었습니다.",
updated_at: new Date().toISOString(),
};
vi.spyOn(SessionStateManager, "findSuccessfulSessionByInputHash").mockResolvedValue(
cachedSession as any,
);

const result = await orchestratePipeline({
mode: "run",
adapter: "codex",
executionMode: "real",
verbose: false,
projectInfo: { projectId: "git-test123", projectPath: "/test", projectName: "test" },
userRequest: { raw_input: "cached prompt" },
});

expect(result.rawOutput).toBe("cached final output");
expect(result.taskRecords).toHaveLength(2);
expect(executeWithAdapterMock).not.toHaveBeenCalled();
});

it("F1: stub 모드에서는 캐시 조회를 건너뜀", async () => {
vi.spyOn(SessionStateManager, "sessionExists").mockResolvedValue(false);
vi.spyOn(SessionStateManager, "saveSession").mockResolvedValue(undefined);
Expand Down
35 changes: 35 additions & 0 deletions tests/ts/unit/core/task-graph/TaskGraphProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,41 @@ describe("TaskGraphProcessor", () => {
});

describe("multi 요청 — sequential 흐름", () => {
it("단일 산출물 요청의 컨텍스트 문장들은 하나의 create task로 병합한다", () => {
const result = TaskGraphProcessor.process({
sentences: [
"Next week prepare for the Hyundai IT team meeting.",
"From the perspective of AI PC adoption, focus on the advantages of Intel vPro and Core Ultra.",
"Create a presentation structure for 15 covering customer concerns, anticipated questions, key points compared to competitors, and reflect any content from the previous meeting.",
],
});

expect(result.tasks).toHaveLength(1);
expect(result.tasks[0]).toMatchObject({
id: "t1",
type: "create",
depends_on: [],
title: "Next week prepare for the Hyundai IT team meeting. From the perspective of AI PC adoption, focus on the advantages of Intel vPro and Core Ultra. Create a presentation structure for 15 covering customer concerns, anticipated questions, key points compared to competitors, and reflect any content from the previous meeting.",
});
});

it("명시적인 선행 작업이 있는 산출물 요청은 병합하지 않는다", () => {
const result = TaskGraphProcessor.process({
sentences: [
"Fetch last month's sales data from the database",
"Analyze the differences",
"Create a visualization chart",
],
});

expect(result.tasks).toHaveLength(3);
expect(result.tasks.map((task) => task.type)).toEqual([
"execute",
"analyze",
"create",
]);
});

it("explore → analyze → modify 흐름은 순차 의존을 생성한다", () => {
const result = TaskGraphProcessor.process({
sentences: [
Expand Down
Loading