From 82039a5debb4fbfe238c7558b51b8ee0e9c1a7f7 Mon Sep 17 00:00:00 2001 From: Dean Sharon Date: Wed, 4 Mar 2026 12:52:26 +0200 Subject: [PATCH 1/2] feat: add CreatePipeline MCP tool for CLI/MCP parity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract pipeline creation logic from CLI into ScheduleManagerService.createPipeline(), then expose it through both MCP (CreatePipeline tool) and CLI (refactored pipeline command). - Add PipelineStepRequest, PipelineCreateRequest, PipelineStep, PipelineResult domain types - Add createPipeline() to ScheduleService interface and ScheduleManagerService - Add CreatePipeline MCP tool with Zod schema, path validation, and handler - Refactor CLI pipeline command to use shared service method (68 → 42 lines) - Add 11 service tests and 6 adapter tests for pipeline functionality --- src/adapters/mcp-adapter.ts | 161 +++++++++++++++ src/cli/commands/pipeline.ts | 56 ++---- src/core/domain.ts | 27 +++ src/core/interfaces.ts | 3 + src/services/schedule-manager.ts | 61 ++++++ tests/unit/adapters/mcp-adapter.test.ts | 195 ++++++++++++++++++- tests/unit/services/schedule-manager.test.ts | 153 ++++++++++++++- 7 files changed, 612 insertions(+), 44 deletions(-) diff --git a/src/adapters/mcp-adapter.ts b/src/adapters/mcp-adapter.ts index 180ad07..85fc666 100644 --- a/src/adapters/mcp-adapter.ts +++ b/src/adapters/mcp-adapter.ts @@ -7,6 +7,8 @@ import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { z } from 'zod'; import pkg from '../../package.json' with { type: 'json' }; import { + PipelineCreateRequest, + PipelineStepRequest, Priority, ResumeTaskRequest, ScheduleCreateRequest, @@ -105,6 +107,28 @@ const ResumeScheduleSchema = z.object({ scheduleId: z.string().describe('Schedule ID to resume'), }); +const CreatePipelineSchema = z.object({ + steps: z + .array( + z.object({ + prompt: z.string().min(1).max(4000).describe('Task prompt for this step'), + priority: z.enum(['P0', 'P1', 'P2']).optional().describe('Priority override for this step'), + workingDirectory: z.string().optional().describe('Working directory override (absolute path)'), + }), + ) + .min(2, 'Pipeline requires at least 2 steps') + .max(20, 'Pipeline cannot exceed 20 steps') + .describe('Ordered pipeline steps (executed sequentially)'), + priority: z + .enum(['P0', 'P1', 'P2']) + .optional() + .describe('Default priority for all steps (individual steps can override)'), + workingDirectory: z + .string() + .optional() + .describe('Default working directory for all steps (individual steps can override)'), +}); + /** Standard MCP tool response shape */ interface MCPToolResponse { [key: string]: unknown; @@ -187,6 +211,8 @@ export class MCPAdapter { return await this.handlePauseSchedule(args); case 'ResumeSchedule': return await this.handleResumeSchedule(args); + case 'CreatePipeline': + return await this.handleCreatePipeline(args); default: // ARCHITECTURE: Return error response instead of throwing return { @@ -503,6 +529,53 @@ export class MCPAdapter { required: ['scheduleId'], }, }, + { + name: 'CreatePipeline', + description: + 'Create a sequential pipeline of tasks that execute one after another. Each step runs only after the previous step completes successfully.', + inputSchema: { + type: 'object', + properties: { + steps: { + type: 'array', + description: 'Ordered pipeline steps (executed sequentially)', + items: { + type: 'object', + properties: { + prompt: { + type: 'string', + description: 'Task prompt for this step', + minLength: 1, + maxLength: 4000, + }, + priority: { + type: 'string', + enum: ['P0', 'P1', 'P2'], + description: 'Priority override for this step', + }, + workingDirectory: { + type: 'string', + description: 'Working directory override (absolute path)', + }, + }, + required: ['prompt'], + }, + minItems: 2, + maxItems: 20, + }, + priority: { + type: 'string', + enum: ['P0', 'P1', 'P2'], + description: 'Default priority for all steps (individual steps can override)', + }, + workingDirectory: { + type: 'string', + description: 'Default working directory for all steps (individual steps can override)', + }, + }, + required: ['steps'], + }, + }, ], }; }, @@ -1159,4 +1232,92 @@ export class MCPAdapter { }), }); } + + /** + * Handle CreatePipeline tool call + * Creates a sequential pipeline of chained one-time schedules + */ + private async handleCreatePipeline(args: unknown): Promise { + const parseResult = CreatePipelineSchema.safeParse(args); + if (!parseResult.success) { + return { + content: [{ type: 'text', text: `Validation error: ${parseResult.error.message}` }], + isError: true, + }; + } + + const data = parseResult.data; + + // Validate shared workingDirectory + if (data.workingDirectory) { + const pathValidation = validatePath(data.workingDirectory); + if (!pathValidation.ok) { + return { + content: [{ type: 'text', text: `Invalid shared working directory: ${pathValidation.error.message}` }], + isError: true, + }; + } + } + + // Validate per-step workingDirectory paths + for (let i = 0; i < data.steps.length; i++) { + const step = data.steps[i]; + if (step.workingDirectory) { + const pathValidation = validatePath(step.workingDirectory); + if (!pathValidation.ok) { + return { + content: [ + { + type: 'text', + text: `Invalid working directory for step ${i + 1}: ${pathValidation.error.message}`, + }, + ], + isError: true, + }; + } + } + } + + const request: PipelineCreateRequest = { + steps: data.steps.map( + (s): PipelineStepRequest => ({ + prompt: s.prompt, + priority: s.priority as Priority | undefined, + workingDirectory: s.workingDirectory, + }), + ), + priority: data.priority as Priority | undefined, + workingDirectory: data.workingDirectory, + }; + + const result = await this.scheduleService.createPipeline(request); + + return match(result, { + ok: (pipeline) => ({ + content: [ + { + type: 'text', + text: JSON.stringify( + { + success: true, + pipelineId: pipeline.pipelineId, + stepCount: pipeline.steps.length, + steps: pipeline.steps.map((s) => ({ + index: s.index, + scheduleId: s.scheduleId, + prompt: s.prompt, + })), + }, + null, + 2, + ), + }, + ], + }), + err: (error) => ({ + content: [{ type: 'text', text: JSON.stringify({ success: false, error: error.message }, null, 2) }], + isError: true, + }), + }); + } } diff --git a/src/cli/commands/pipeline.ts b/src/cli/commands/pipeline.ts index fb40075..0d71094 100644 --- a/src/cli/commands/pipeline.ts +++ b/src/cli/commands/pipeline.ts @@ -1,64 +1,40 @@ -import { ScheduleId } from '../../core/domain.js'; import { withServices } from '../services.js'; import * as ui from '../ui.js'; export async function handlePipelineCommand(pipelineArgs: string[]) { - if (pipelineArgs.length === 0) { - ui.error('Usage: beat pipeline []...'); - process.stderr.write('Example: beat pipeline "setup db" "run migrations" "seed data"\n'); - process.exit(1); - } - // Each positional arg is a pipeline step prompt const steps = pipelineArgs.filter((arg) => !arg.startsWith('-')); - if (steps.length === 0) { - ui.error('No pipeline steps found'); + if (steps.length < 2) { + ui.error('Pipeline requires at least 2 steps'); + process.stderr.write('Usage: beat pipeline []...\n'); + process.stderr.write('Example: beat pipeline "setup db" "run migrations" "seed data"\n'); process.exit(1); } const s = ui.createSpinner(); - s.start(`Creating pipeline with ${steps.length} step${steps.length === 1 ? '' : 's'}...`); + s.start(`Creating pipeline with ${steps.length} steps...`); const { scheduleService } = await withServices(s); - const { ScheduleType } = await import('../../core/domain.js'); - // Add 2-second buffer so "now" doesn't become "past" during validation - const scheduledAt = new Date(Date.now() + 2000).toISOString(); - const createdSchedules: Array<{ id: string; prompt: string }> = []; - let previousScheduleId: string | undefined; - for (let i = 0; i < steps.length; i++) { - const prompt = steps[i]; - s.message(`Creating step ${i + 1}/${steps.length}...`); + const result = await scheduleService.createPipeline({ + steps: steps.map((prompt) => ({ prompt })), + }); - const result = await scheduleService.createSchedule({ - prompt, - scheduleType: ScheduleType.ONE_TIME, - scheduledAt, - afterScheduleId: previousScheduleId ? ScheduleId(previousScheduleId) : undefined, - }); - - if (!result.ok) { - s.stop('Pipeline creation failed'); - ui.error(`Failed to create step ${i + 1}: ${result.error.message}`); - process.exit(1); - } - - previousScheduleId = result.value.id; - createdSchedules.push({ - id: result.value.id, - prompt: prompt.substring(0, 50) + (prompt.length > 50 ? '...' : ''), - }); + if (!result.ok) { + s.stop('Pipeline creation failed'); + ui.error(result.error.message); + process.exit(1); } s.stop('Pipeline created'); // Show pipeline visualization const lines: string[] = []; - for (let i = 0; i < createdSchedules.length; i++) { - const cs = createdSchedules[i]; - lines.push(`${i + 1}. ${ui.dim(`[${cs.id}]`)} "${cs.prompt}"`); - if (i < createdSchedules.length - 1) { + for (let i = 0; i < result.value.steps.length; i++) { + const step = result.value.steps[i]; + lines.push(`${i + 1}. ${ui.dim(`[${step.scheduleId}]`)} "${step.prompt}"`); + if (i < result.value.steps.length - 1) { lines.push(' ↓'); } } diff --git a/src/core/domain.ts b/src/core/domain.ts index 82942a4..4271a59 100644 --- a/src/core/domain.ts +++ b/src/core/domain.ts @@ -351,6 +351,33 @@ export interface ScheduleCreateRequest { readonly afterScheduleId?: ScheduleId; // Chain: block until after-schedule's latest task completes } +/** + * Pipeline types - sequential task execution via chained one-time schedules + * ARCHITECTURE: Used by both MCP CreatePipeline tool and CLI pipeline command + */ +export interface PipelineStepRequest { + readonly prompt: string; + readonly priority?: Priority; + readonly workingDirectory?: string; +} + +export interface PipelineCreateRequest { + readonly steps: readonly PipelineStepRequest[]; + readonly priority?: Priority; // shared default for all steps + readonly workingDirectory?: string; // shared default for all steps +} + +export interface PipelineStep { + readonly index: number; + readonly scheduleId: ScheduleId; + readonly prompt: string; +} + +export interface PipelineResult { + readonly pipelineId: ScheduleId; // first schedule ID (stable reference) + readonly steps: readonly PipelineStep[]; +} + /** * Task checkpoint - snapshot of task state at completion/failure * ARCHITECTURE: Captures enough context to create enriched retry prompts diff --git a/src/core/interfaces.ts b/src/core/interfaces.ts index 4fb6106..6ceca06 100644 --- a/src/core/interfaces.ts +++ b/src/core/interfaces.ts @@ -5,6 +5,8 @@ import { ChildProcess } from 'child_process'; import { + PipelineCreateRequest, + PipelineResult, ResumeTaskRequest, Schedule, ScheduleCreateRequest, @@ -404,6 +406,7 @@ export interface ScheduleService { cancelSchedule(scheduleId: ScheduleId, reason?: string): Promise>; pauseSchedule(scheduleId: ScheduleId): Promise>; resumeSchedule(scheduleId: ScheduleId): Promise>; + createPipeline(request: PipelineCreateRequest): Promise>; } /** diff --git a/src/services/schedule-manager.ts b/src/services/schedule-manager.ts index 6e4978a..0292efa 100644 --- a/src/services/schedule-manager.ts +++ b/src/services/schedule-manager.ts @@ -8,6 +8,9 @@ import { createSchedule, MissedRunPolicy, + PipelineCreateRequest, + PipelineResult, + PipelineStep, Priority, Schedule, ScheduleCreateRequest, @@ -276,6 +279,64 @@ export class ScheduleManagerService implements ScheduleService { return ok(undefined); } + async createPipeline(request: PipelineCreateRequest): Promise> { + const { steps } = request; + + if (steps.length < 2) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'Pipeline requires at least 2 steps', { + stepCount: steps.length, + }), + ); + } + + if (steps.length > 20) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'Pipeline cannot exceed 20 steps', { + stepCount: steps.length, + }), + ); + } + + // +2s buffer so "now" doesn't become "past" during validation + const scheduledAt = new Date(Date.now() + 2000).toISOString(); + const createdSteps: PipelineStep[] = []; + let previousScheduleId: ScheduleId | undefined; + + for (let i = 0; i < steps.length; i++) { + const step = steps[i]; + const result = await this.createSchedule({ + prompt: step.prompt, + scheduleType: ScheduleType.ONE_TIME, + scheduledAt, + priority: step.priority ?? request.priority, + workingDirectory: step.workingDirectory ?? request.workingDirectory, + afterScheduleId: previousScheduleId, + }); + + if (!result.ok) { + return err( + new BackbeatError(ErrorCode.SYSTEM_ERROR, `Pipeline failed at step ${i + 1}: ${result.error.message}`, { + failedAtStep: i + 1, + createdSteps, + }), + ); + } + + previousScheduleId = result.value.id; + createdSteps.push({ + index: i, + scheduleId: result.value.id, + prompt: step.prompt.substring(0, 50) + (step.prompt.length > 50 ? '...' : ''), + }); + } + + return ok({ + pipelineId: createdSteps[0].scheduleId, + steps: createdSteps, + }); + } + /** * Fetch a schedule by ID and optionally validate its status * Returns Result with the schedule or a typed error diff --git a/tests/unit/adapters/mcp-adapter.test.ts b/tests/unit/adapters/mcp-adapter.test.ts index 86aa840..e83f680 100644 --- a/tests/unit/adapters/mcp-adapter.test.ts +++ b/tests/unit/adapters/mcp-adapter.test.ts @@ -13,10 +13,12 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { MCPAdapter } from '../../../src/adapters/mcp-adapter'; -import type { Task, TaskRequest } from '../../../src/core/domain'; +import type { PipelineCreateRequest, PipelineResult, Task, TaskRequest } from '../../../src/core/domain'; +import { Priority, ScheduleId } from '../../../src/core/domain'; import { BackbeatError, ErrorCode, taskNotFound } from '../../../src/core/errors'; import type { EventBus } from '../../../src/core/events/event-bus'; -import type { Logger, ScheduleRepository, TaskManager } from '../../../src/core/interfaces'; +import type { Logger, ScheduleRepository, ScheduleService, TaskManager } from '../../../src/core/interfaces'; +import type { Result } from '../../../src/core/result'; import { err, ok } from '../../../src/core/result'; import { TaskFactory } from '../../fixtures/factories'; @@ -634,6 +636,130 @@ describe('MCPAdapter - Protocol Compliance', () => { }); }); +describe('MCPAdapter - CreatePipeline Tool', () => { + let adapter: MCPAdapter; + let mockTaskManager: MockTaskManager; + let mockLogger: MockLogger; + let mockScheduleService: MockScheduleService; + + beforeEach(() => { + mockTaskManager = new MockTaskManager(); + mockLogger = new MockLogger(); + mockScheduleService = new MockScheduleService(); + adapter = new MCPAdapter(mockTaskManager, mockLogger, mockScheduleService as unknown as ScheduleService); + }); + + afterEach(() => { + mockTaskManager.reset(); + mockLogger.reset(); + }); + + it('should reject steps array with fewer than 2 items', async () => { + const result = await simulateCreatePipeline(mockScheduleService, { + steps: [{ prompt: 'only one' }], + }); + + expect(result.isError).toBe(true); + expect(result.content[0].text).toContain('at least 2'); + }); + + it('should reject steps array with more than 20 items', async () => { + const steps = Array.from({ length: 21 }, (_, i) => ({ prompt: `Step ${i + 1}` })); + const result = await simulateCreatePipeline(mockScheduleService, { steps }); + + expect(result.isError).toBe(true); + expect(result.content[0].text).toContain('exceed 20'); + }); + + it('should reject step with empty prompt', async () => { + const result = await simulateCreatePipeline(mockScheduleService, { + steps: [{ prompt: '' }, { prompt: 'valid' }], + }); + + expect(result.isError).toBe(true); + }); + + it('should return pipeline result on success', async () => { + const result = await simulateCreatePipeline(mockScheduleService, { + steps: [{ prompt: 'Step one' }, { prompt: 'Step two' }, { prompt: 'Step three' }], + }); + + expect(result.isError).toBe(false); + const response = JSON.parse(result.content[0].text); + expect(response.success).toBe(true); + expect(response.pipelineId).toBeDefined(); + expect(response.stepCount).toBe(3); + expect(response.steps).toHaveLength(3); + }); + + it('should pass priority through to service', async () => { + await simulateCreatePipeline(mockScheduleService, { + steps: [{ prompt: 'Step one' }, { prompt: 'Step two' }], + priority: 'P0', + }); + + expect(mockScheduleService.createPipelineCalls).toHaveLength(1); + expect(mockScheduleService.createPipelineCalls[0].priority).toBe('P0'); + }); + + it('should return error on service failure', async () => { + mockScheduleService.shouldFailPipeline = true; + + const result = await simulateCreatePipeline(mockScheduleService, { + steps: [{ prompt: 'Step one' }, { prompt: 'Step two' }], + }); + + expect(result.isError).toBe(true); + const response = JSON.parse(result.content[0].text); + expect(response.success).toBe(false); + expect(response.error).toBeDefined(); + }); +}); + +/** + * Mock ScheduleService for CreatePipeline testing + */ +class MockScheduleService { + createPipelineCalls: PipelineCreateRequest[] = []; + shouldFailPipeline = false; + + async createSchedule() { + return ok(null); + } + async listSchedules() { + return ok([]); + } + async getSchedule() { + return ok({ schedule: null }); + } + async cancelSchedule() { + return ok(undefined); + } + async pauseSchedule() { + return ok(undefined); + } + async resumeSchedule() { + return ok(undefined); + } + + async createPipeline(request: PipelineCreateRequest): Promise> { + this.createPipelineCalls.push(request); + + if (this.shouldFailPipeline) { + return err(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Pipeline creation failed', {})); + } + + return ok({ + pipelineId: ScheduleId('schedule-mock-first'), + steps: request.steps.map((s, i) => ({ + index: i, + scheduleId: ScheduleId(`schedule-mock-${i}`), + prompt: s.prompt.substring(0, 50) + (s.prompt.length > 50 ? '...' : ''), + })), + }); + } +} + // ============================================================================ // Helper Functions - Simulate MCP tool calls // ============================================================================ @@ -882,3 +1008,68 @@ async function simulateRetryTask( }; } } + +async function simulateCreatePipeline( + scheduleService: MockScheduleService, + args: { + steps: Array<{ prompt: string; priority?: string; workingDirectory?: string }>; + priority?: string; + workingDirectory?: string; + }, +): Promise { + // Validate min/max steps (mirrors Zod schema) + if (args.steps.length < 2) { + return { + isError: true, + content: [{ type: 'text', text: 'Pipeline requires at least 2 steps' }], + }; + } + if (args.steps.length > 20) { + return { + isError: true, + content: [{ type: 'text', text: 'Pipeline cannot exceed 20 steps' }], + }; + } + + // Validate prompts (mirrors Zod min(1)) + for (const step of args.steps) { + if (!step.prompt || step.prompt.length === 0) { + return { + isError: true, + content: [{ type: 'text', text: 'Step prompt must not be empty' }], + }; + } + } + + const result = await scheduleService.createPipeline({ + steps: args.steps.map((s) => ({ + prompt: s.prompt, + priority: s.priority as Priority | undefined, + workingDirectory: s.workingDirectory, + })), + priority: args.priority as Priority | undefined, + workingDirectory: args.workingDirectory, + }); + + if (!result.ok) { + return { + isError: true, + content: [{ type: 'text', text: JSON.stringify({ success: false, error: result.error.message }) }], + }; + } + + return { + isError: false, + content: [ + { + type: 'text', + text: JSON.stringify({ + success: true, + pipelineId: result.value.pipelineId, + stepCount: result.value.steps.length, + steps: result.value.steps, + }), + }, + ], + }; +} diff --git a/tests/unit/services/schedule-manager.test.ts b/tests/unit/services/schedule-manager.test.ts index 2ac0639..ee91937 100644 --- a/tests/unit/services/schedule-manager.test.ts +++ b/tests/unit/services/schedule-manager.test.ts @@ -5,8 +5,8 @@ */ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import type { ScheduleCreateRequest } from '../../../src/core/domain'; -import { createSchedule, MissedRunPolicy, ScheduleId, ScheduleStatus, ScheduleType } from '../../../src/core/domain'; +import type { PipelineCreateRequest, ScheduleCreateRequest } from '../../../src/core/domain'; +import { createSchedule, MissedRunPolicy, Priority, ScheduleId, ScheduleStatus, ScheduleType } from '../../../src/core/domain'; import { Database } from '../../../src/implementations/database'; import { SQLiteScheduleRepository } from '../../../src/implementations/schedule-repository'; import { ScheduleManagerService, toMissedRunPolicy } from '../../../src/services/schedule-manager'; @@ -447,4 +447,153 @@ describe('ScheduleManagerService - Unit Tests', () => { expect(result.ok).toBe(false); }); }); + + describe('createPipeline()', () => { + function pipelineRequest(overrides: Partial = {}): PipelineCreateRequest { + return { + steps: [{ prompt: 'Step one' }, { prompt: 'Step two' }, { prompt: 'Step three' }], + ...overrides, + }; + } + + it('should reject fewer than 2 steps', async () => { + const result = await service.createPipeline({ steps: [{ prompt: 'Only one' }] }); + + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toContain('at least 2 steps'); + }); + + it('should reject more than 20 steps', async () => { + const steps = Array.from({ length: 21 }, (_, i) => ({ prompt: `Step ${i + 1}` })); + const result = await service.createPipeline({ steps }); + + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toContain('exceed 20 steps'); + }); + + it('should create chained schedules for 3-step pipeline', async () => { + const result = await service.createPipeline(pipelineRequest()); + + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.steps).toHaveLength(3); + expect(result.value.pipelineId).toBe(result.value.steps[0].scheduleId); + }); + + it('should return all schedule IDs in correct order', async () => { + const result = await service.createPipeline(pipelineRequest()); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + // Each step should have a unique schedule ID + const ids = result.value.steps.map((s) => s.scheduleId); + expect(new Set(ids).size).toBe(3); + + // Indices should be sequential + expect(result.value.steps.map((s) => s.index)).toEqual([0, 1, 2]); + }); + + it('should use shared priority as default for all steps', async () => { + const result = await service.createPipeline( + pipelineRequest({ priority: Priority.P0 }), + ); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + // Verify all 3 ScheduleCreated events were emitted + expect(eventBus.getEventCount('ScheduleCreated')).toBe(3); + + // Check each created schedule has P0 priority + const events = eventBus.getEmittedEvents('ScheduleCreated'); + for (const event of events) { + expect(event.schedule.taskTemplate.priority).toBe(Priority.P0); + } + }); + + it('should allow per-step priority override', async () => { + const result = await service.createPipeline({ + steps: [ + { prompt: 'Step one', priority: Priority.P1 }, + { prompt: 'Step two' }, + ], + priority: Priority.P2, + }); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + const events = eventBus.getEmittedEvents('ScheduleCreated'); + expect(events[0].schedule.taskTemplate.priority).toBe(Priority.P1); + expect(events[1].schedule.taskTemplate.priority).toBe(Priority.P2); + }); + + it('should use shared workingDirectory as default', async () => { + const cwd = process.cwd(); + const result = await service.createPipeline( + pipelineRequest({ workingDirectory: cwd }), + ); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + const events = eventBus.getEmittedEvents('ScheduleCreated'); + for (const event of events) { + expect(event.schedule.taskTemplate.workingDirectory).toBe(cwd); + } + }); + + it('should allow per-step workingDirectory override', async () => { + const cwd = process.cwd(); + const overrideDir = `${cwd}/src`; + const result = await service.createPipeline({ + steps: [ + { prompt: 'Step one', workingDirectory: overrideDir }, + { prompt: 'Step two' }, + ], + workingDirectory: cwd, + }); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + const events = eventBus.getEmittedEvents('ScheduleCreated'); + expect(events[0].schedule.taskTemplate.workingDirectory).toBe(overrideDir); + expect(events[1].schedule.taskTemplate.workingDirectory).toBe(cwd); + }); + + it('should truncate long prompts at 50 chars in response', async () => { + const longPrompt = 'A'.repeat(60); + const result = await service.createPipeline({ + steps: [{ prompt: longPrompt }, { prompt: 'Short' }], + }); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + expect(result.value.steps[0].prompt).toBe('A'.repeat(50) + '...'); + expect(result.value.steps[1].prompt).toBe('Short'); + }); + + it('should stop on first failure and report error with step number', async () => { + // Make ScheduleCreated emission fail — first step will fail + eventBus.setEmitFailure('ScheduleCreated', true); + + const result = await service.createPipeline(pipelineRequest()); + + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toContain('step 1'); + }); + + it('should emit ScheduleCreated for each step', async () => { + const result = await service.createPipeline(pipelineRequest()); + + expect(result.ok).toBe(true); + expect(eventBus.getEventCount('ScheduleCreated')).toBe(3); + }); + }); }); From 9cbb05febd011a4f2f06a312896f032c79116492 Mon Sep 17 00:00:00 2001 From: Dean Sharon Date: Wed, 4 Mar 2026 13:19:48 +0200 Subject: [PATCH 2/2] refactor: simplify pipeline code after self-review - Extract truncatePrompt() helper in schedule-manager.ts - Fix stale 4-arg MCPAdapter constructor in adapter tests to use stubScheduleService - Remove redundant PipelineStepRequest import and type annotation in MCP adapter --- src/adapters/mcp-adapter.ts | 13 ++++----- src/services/schedule-manager.ts | 10 ++++++- tests/unit/adapters/mcp-adapter.test.ts | 38 +++++++------------------ 3 files changed, 25 insertions(+), 36 deletions(-) diff --git a/src/adapters/mcp-adapter.ts b/src/adapters/mcp-adapter.ts index 85fc666..d65a108 100644 --- a/src/adapters/mcp-adapter.ts +++ b/src/adapters/mcp-adapter.ts @@ -8,7 +8,6 @@ import { z } from 'zod'; import pkg from '../../package.json' with { type: 'json' }; import { PipelineCreateRequest, - PipelineStepRequest, Priority, ResumeTaskRequest, ScheduleCreateRequest, @@ -1279,13 +1278,11 @@ export class MCPAdapter { } const request: PipelineCreateRequest = { - steps: data.steps.map( - (s): PipelineStepRequest => ({ - prompt: s.prompt, - priority: s.priority as Priority | undefined, - workingDirectory: s.workingDirectory, - }), - ), + steps: data.steps.map((s) => ({ + prompt: s.prompt, + priority: s.priority as Priority | undefined, + workingDirectory: s.workingDirectory, + })), priority: data.priority as Priority | undefined, workingDirectory: data.workingDirectory, }; diff --git a/src/services/schedule-manager.ts b/src/services/schedule-manager.ts index 0292efa..69dd574 100644 --- a/src/services/schedule-manager.ts +++ b/src/services/schedule-manager.ts @@ -25,6 +25,14 @@ import { err, ok, Result } from '../core/result.js'; import { getNextRunTime, isValidTimezone, validateCronExpression } from '../utils/cron.js'; import { validatePath } from '../utils/validation.js'; +/** Truncate a prompt string to maxLen characters, appending '...' if truncated */ +function truncatePrompt(prompt: string, maxLen: number): string { + if (prompt.length <= maxLen) { + return prompt; + } + return prompt.substring(0, maxLen) + '...'; +} + /** * Map missedRunPolicy string to MissedRunPolicy enum * Defaults to SKIP for unrecognized values @@ -327,7 +335,7 @@ export class ScheduleManagerService implements ScheduleService { createdSteps.push({ index: i, scheduleId: result.value.id, - prompt: step.prompt.substring(0, 50) + (step.prompt.length > 50 ? '...' : ''), + prompt: truncatePrompt(step.prompt, 50), }); } diff --git a/tests/unit/adapters/mcp-adapter.test.ts b/tests/unit/adapters/mcp-adapter.test.ts index e83f680..8d6305f 100644 --- a/tests/unit/adapters/mcp-adapter.test.ts +++ b/tests/unit/adapters/mcp-adapter.test.ts @@ -16,8 +16,7 @@ import { MCPAdapter } from '../../../src/adapters/mcp-adapter'; import type { PipelineCreateRequest, PipelineResult, Task, TaskRequest } from '../../../src/core/domain'; import { Priority, ScheduleId } from '../../../src/core/domain'; import { BackbeatError, ErrorCode, taskNotFound } from '../../../src/core/errors'; -import type { EventBus } from '../../../src/core/events/event-bus'; -import type { Logger, ScheduleRepository, ScheduleService, TaskManager } from '../../../src/core/interfaces'; +import type { Logger, ScheduleService, TaskManager } from '../../../src/core/interfaces'; import type { Result } from '../../../src/core/result'; import { err, ok } from '../../../src/core/result'; import { TaskFactory } from '../../fixtures/factories'; @@ -163,30 +162,15 @@ class MockLogger implements Logger { } } -// Stub ScheduleRepository — tests in this file do not exercise schedule features -const stubScheduleRepository: ScheduleRepository = { - save: vi.fn().mockResolvedValue(ok(undefined)), - update: vi.fn().mockResolvedValue(ok(undefined)), - findById: vi.fn().mockResolvedValue(ok(null)), - findAll: vi.fn().mockResolvedValue(ok([])), - findByStatus: vi.fn().mockResolvedValue(ok([])), - findDue: vi.fn().mockResolvedValue(ok([])), - delete: vi.fn().mockResolvedValue(ok(undefined)), - count: vi.fn().mockResolvedValue(ok(0)), - recordExecution: vi - .fn() - .mockResolvedValue(ok({ id: 1, scheduleId: '', scheduledFor: 0, status: 'pending', createdAt: 0 })), - getExecutionHistory: vi.fn().mockResolvedValue(ok([])), -}; - -// Stub EventBus — tests in this file do not exercise event features -const stubEventBus: EventBus = { - emit: vi.fn().mockResolvedValue(ok(undefined)), - request: vi.fn().mockResolvedValue(ok(undefined)), - subscribe: vi.fn().mockReturnValue(ok('sub-id')), - unsubscribe: vi.fn().mockReturnValue(ok(undefined)), - subscribeAll: vi.fn().mockReturnValue(ok('sub-id')), - unsubscribeAll: vi.fn(), +// Stub ScheduleService — task-focused tests do not exercise schedule features +const stubScheduleService: ScheduleService = { + createSchedule: vi.fn().mockResolvedValue(ok(null)), + listSchedules: vi.fn().mockResolvedValue(ok([])), + getSchedule: vi.fn().mockResolvedValue(ok({ schedule: null })), + cancelSchedule: vi.fn().mockResolvedValue(ok(undefined)), + pauseSchedule: vi.fn().mockResolvedValue(ok(undefined)), + resumeSchedule: vi.fn().mockResolvedValue(ok(undefined)), + createPipeline: vi.fn().mockResolvedValue(ok({ pipelineId: '', steps: [] })), }; describe('MCPAdapter - Protocol Compliance', () => { @@ -197,7 +181,7 @@ describe('MCPAdapter - Protocol Compliance', () => { beforeEach(() => { mockTaskManager = new MockTaskManager(); mockLogger = new MockLogger(); - adapter = new MCPAdapter(mockTaskManager, mockLogger, stubScheduleRepository, stubEventBus); + adapter = new MCPAdapter(mockTaskManager, mockLogger, stubScheduleService); }); afterEach(() => {