Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions src/adapters/mcp-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { z } from 'zod';
import pkg from '../../package.json' with { type: 'json' };
import {
PipelineCreateRequest,
Priority,
ResumeTaskRequest,
ScheduleCreateRequest,
Expand Down Expand Up @@ -105,6 +106,28 @@ const ResumeScheduleSchema = z.object({
scheduleId: z.string().describe('Schedule ID to resume'),
});

const CreatePipelineSchema = z.object({
steps: z
.array(
z.object({
prompt: z.string().min(1).max(4000).describe('Task prompt for this step'),
priority: z.enum(['P0', 'P1', 'P2']).optional().describe('Priority override for this step'),
workingDirectory: z.string().optional().describe('Working directory override (absolute path)'),
}),
)
.min(2, 'Pipeline requires at least 2 steps')
.max(20, 'Pipeline cannot exceed 20 steps')
.describe('Ordered pipeline steps (executed sequentially)'),
priority: z
.enum(['P0', 'P1', 'P2'])
.optional()
.describe('Default priority for all steps (individual steps can override)'),
workingDirectory: z
.string()
.optional()
.describe('Default working directory for all steps (individual steps can override)'),
});

/** Standard MCP tool response shape */
interface MCPToolResponse {
[key: string]: unknown;
Expand Down Expand Up @@ -187,6 +210,8 @@ export class MCPAdapter {
return await this.handlePauseSchedule(args);
case 'ResumeSchedule':
return await this.handleResumeSchedule(args);
case 'CreatePipeline':
return await this.handleCreatePipeline(args);
default:
// ARCHITECTURE: Return error response instead of throwing
return {
Expand Down Expand Up @@ -503,6 +528,53 @@ export class MCPAdapter {
required: ['scheduleId'],
},
},
{
name: 'CreatePipeline',
description:
'Create a sequential pipeline of tasks that execute one after another. Each step runs only after the previous step completes successfully.',
inputSchema: {
type: 'object',
properties: {
steps: {
type: 'array',
description: 'Ordered pipeline steps (executed sequentially)',
items: {
type: 'object',
properties: {
prompt: {
type: 'string',
description: 'Task prompt for this step',
minLength: 1,
maxLength: 4000,
},
priority: {
type: 'string',
enum: ['P0', 'P1', 'P2'],
description: 'Priority override for this step',
},
workingDirectory: {
type: 'string',
description: 'Working directory override (absolute path)',
},
},
required: ['prompt'],
},
minItems: 2,
maxItems: 20,
},
priority: {
type: 'string',
enum: ['P0', 'P1', 'P2'],
description: 'Default priority for all steps (individual steps can override)',
},
workingDirectory: {
type: 'string',
description: 'Default working directory for all steps (individual steps can override)',
},
},
required: ['steps'],
},
},
],
};
},
Expand Down Expand Up @@ -1159,4 +1231,90 @@ export class MCPAdapter {
}),
});
}

/**
* Handle CreatePipeline tool call
* Creates a sequential pipeline of chained one-time schedules
*/
private async handleCreatePipeline(args: unknown): Promise<MCPToolResponse> {
const parseResult = CreatePipelineSchema.safeParse(args);
if (!parseResult.success) {
return {
content: [{ type: 'text', text: `Validation error: ${parseResult.error.message}` }],
isError: true,
};
}

const data = parseResult.data;

// Validate shared workingDirectory
if (data.workingDirectory) {
const pathValidation = validatePath(data.workingDirectory);
if (!pathValidation.ok) {
return {
content: [{ type: 'text', text: `Invalid shared working directory: ${pathValidation.error.message}` }],
isError: true,
};
}
}

// Validate per-step workingDirectory paths
for (let i = 0; i < data.steps.length; i++) {
const step = data.steps[i];
if (step.workingDirectory) {
const pathValidation = validatePath(step.workingDirectory);
if (!pathValidation.ok) {
return {
content: [
{
type: 'text',
text: `Invalid working directory for step ${i + 1}: ${pathValidation.error.message}`,
},
],
isError: true,
};
}
}
}

const request: PipelineCreateRequest = {
steps: data.steps.map((s) => ({
prompt: s.prompt,
priority: s.priority as Priority | undefined,
workingDirectory: s.workingDirectory,
})),
priority: data.priority as Priority | undefined,
workingDirectory: data.workingDirectory,
};

const result = await this.scheduleService.createPipeline(request);

return match(result, {
ok: (pipeline) => ({
content: [
{
type: 'text',
text: JSON.stringify(
{
success: true,
pipelineId: pipeline.pipelineId,
stepCount: pipeline.steps.length,
steps: pipeline.steps.map((s) => ({
index: s.index,
scheduleId: s.scheduleId,
prompt: s.prompt,
})),
},
null,
2,
),
},
],
}),
err: (error) => ({
content: [{ type: 'text', text: JSON.stringify({ success: false, error: error.message }, null, 2) }],
isError: true,
}),
});
}
}
56 changes: 16 additions & 40 deletions src/cli/commands/pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,40 @@
import { ScheduleId } from '../../core/domain.js';
import { withServices } from '../services.js';
import * as ui from '../ui.js';

export async function handlePipelineCommand(pipelineArgs: string[]) {
if (pipelineArgs.length === 0) {
ui.error('Usage: beat pipeline <prompt> [<prompt>]...');
process.stderr.write('Example: beat pipeline "setup db" "run migrations" "seed data"\n');
process.exit(1);
}

// Each positional arg is a pipeline step prompt
const steps = pipelineArgs.filter((arg) => !arg.startsWith('-'));

if (steps.length === 0) {
ui.error('No pipeline steps found');
if (steps.length < 2) {
ui.error('Pipeline requires at least 2 steps');
process.stderr.write('Usage: beat pipeline <prompt> <prompt> [<prompt>]...\n');
process.stderr.write('Example: beat pipeline "setup db" "run migrations" "seed data"\n');
process.exit(1);
}

const s = ui.createSpinner();
s.start(`Creating pipeline with ${steps.length} step${steps.length === 1 ? '' : 's'}...`);
s.start(`Creating pipeline with ${steps.length} steps...`);

const { scheduleService } = await withServices(s);
const { ScheduleType } = await import('../../core/domain.js');
// Add 2-second buffer so "now" doesn't become "past" during validation
const scheduledAt = new Date(Date.now() + 2000).toISOString();
const createdSchedules: Array<{ id: string; prompt: string }> = [];
let previousScheduleId: string | undefined;

for (let i = 0; i < steps.length; i++) {
const prompt = steps[i];
s.message(`Creating step ${i + 1}/${steps.length}...`);
const result = await scheduleService.createPipeline({
steps: steps.map((prompt) => ({ prompt })),
});

const result = await scheduleService.createSchedule({
prompt,
scheduleType: ScheduleType.ONE_TIME,
scheduledAt,
afterScheduleId: previousScheduleId ? ScheduleId(previousScheduleId) : undefined,
});

if (!result.ok) {
s.stop('Pipeline creation failed');
ui.error(`Failed to create step ${i + 1}: ${result.error.message}`);
process.exit(1);
}

previousScheduleId = result.value.id;
createdSchedules.push({
id: result.value.id,
prompt: prompt.substring(0, 50) + (prompt.length > 50 ? '...' : ''),
});
if (!result.ok) {
s.stop('Pipeline creation failed');
ui.error(result.error.message);
process.exit(1);
}

s.stop('Pipeline created');

// Show pipeline visualization
const lines: string[] = [];
for (let i = 0; i < createdSchedules.length; i++) {
const cs = createdSchedules[i];
lines.push(`${i + 1}. ${ui.dim(`[${cs.id}]`)} "${cs.prompt}"`);
if (i < createdSchedules.length - 1) {
for (let i = 0; i < result.value.steps.length; i++) {
const step = result.value.steps[i];
lines.push(`${i + 1}. ${ui.dim(`[${step.scheduleId}]`)} "${step.prompt}"`);
if (i < result.value.steps.length - 1) {
lines.push(' ↓');
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/core/domain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,33 @@ export interface ScheduleCreateRequest {
readonly afterScheduleId?: ScheduleId; // Chain: block until after-schedule's latest task completes
}

/**
* Pipeline types - sequential task execution via chained one-time schedules
* ARCHITECTURE: Used by both MCP CreatePipeline tool and CLI pipeline command
*/
export interface PipelineStepRequest {
readonly prompt: string;
readonly priority?: Priority;
readonly workingDirectory?: string;
}

export interface PipelineCreateRequest {
readonly steps: readonly PipelineStepRequest[];
readonly priority?: Priority; // shared default for all steps
readonly workingDirectory?: string; // shared default for all steps
}

export interface PipelineStep {
readonly index: number;
readonly scheduleId: ScheduleId;
readonly prompt: string;
}

export interface PipelineResult {
readonly pipelineId: ScheduleId; // first schedule ID (stable reference)
readonly steps: readonly PipelineStep[];
}

/**
* Task checkpoint - snapshot of task state at completion/failure
* ARCHITECTURE: Captures enough context to create enriched retry prompts
Expand Down
3 changes: 3 additions & 0 deletions src/core/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import { ChildProcess } from 'child_process';
import {
PipelineCreateRequest,
PipelineResult,
ResumeTaskRequest,
Schedule,
ScheduleCreateRequest,
Expand Down Expand Up @@ -404,6 +406,7 @@ export interface ScheduleService {
cancelSchedule(scheduleId: ScheduleId, reason?: string): Promise<Result<void>>;
pauseSchedule(scheduleId: ScheduleId): Promise<Result<void>>;
resumeSchedule(scheduleId: ScheduleId): Promise<Result<void>>;
createPipeline(request: PipelineCreateRequest): Promise<Result<PipelineResult>>;
}

/**
Expand Down
Loading
Loading