diff --git a/crates/korg-core/src/telemetry.rs b/crates/korg-core/src/telemetry.rs index 19a086a..1d5a3fa 100644 --- a/crates/korg-core/src/telemetry.rs +++ b/crates/korg-core/src/telemetry.rs @@ -33,6 +33,10 @@ pub fn init_tracing() { // Machine-readable JSON for log shipping / structured analysis let json_layer = fmt::layer() .json() + // Logs go to STDERR. STDOUT is reserved for program output — and, in + // `korg worker` subprocesses, for the ACP envelope stream the leader + // parses as JSON. A log line on stdout corrupts that protocol. + .with_writer(std::io::stderr) .with_current_span(true) .with_span_list(true) .with_target(true) @@ -46,6 +50,9 @@ pub fn init_tracing() { } else { // Human-readable pretty output for development let pretty_layer = fmt::layer() + // Logs go to STDERR (see note above): stdout is the ACP channel for + // worker subprocesses and the program-output channel for the leader. + .with_writer(std::io::stderr) .with_target(true) .with_thread_ids(false) .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) diff --git a/crates/korg-runtime/src/harness.rs b/crates/korg-runtime/src/harness.rs index a99142f..40e1881 100644 --- a/crates/korg-runtime/src/harness.rs +++ b/crates/korg-runtime/src/harness.rs @@ -33,8 +33,11 @@ impl SingleWorkerHarness { } /// Main worker loop (legacy stub path). + /// + /// Diagnostics go to STDERR: if this path is ever reached from a worker + /// subprocess, stdout must stay a clean ACP envelope channel. pub async fn run(&mut self, client: &mut AcpClient) -> Result<()> { - println!( + eprintln!( "[Harness] Worker {} entering main loop (legacy client path)", self.worker_id ); @@ -49,7 +52,7 @@ impl SingleWorkerHarness { permissions, .. } => { - println!( + eprintln!( "[Harness] Received RouteWork with base_snapshot: {}", base_snapshot ); @@ -64,12 +67,12 @@ impl SingleWorkerHarness { .await?; } _ => { - println!("[Harness] Received unhandled message: {:?}", msg); + eprintln!("[Harness] Received unhandled message: {:?}", msg); } } } - println!("[Harness] Worker {} exiting after task", self.worker_id); + eprintln!("[Harness] Worker {} exiting after task", self.worker_id); Ok(()) } @@ -142,6 +145,10 @@ impl SingleWorkerHarness { eprintln!("[Harness] Sent signed tool result back to leader"); } } + // handle_route_work already sent the authoritative + // TerminationReport (carrying the real success/doom_loop + // status + terminal_tx_id). The leader reads it now that + // stdout is a clean ACP channel — no extra report needed. } // === Direct tool request (if worker is sent a tool as first message) === @@ -346,7 +353,8 @@ impl SingleWorkerHarness { loop { ticker.tick().await; tick += 1; - println!( + // stderr, not stdout — stdout is the worker's ACP envelope channel. + eprintln!( "[TelemetryEmitter] {} – live pulse #{} (continuous real-time telemetry)", rid, tick ); diff --git a/crates/korg-runtime/src/personas.rs b/crates/korg-runtime/src/personas.rs index b0a299b..2a83cc1 100644 --- a/crates/korg-runtime/src/personas.rs +++ b/crates/korg-runtime/src/personas.rs @@ -276,7 +276,15 @@ impl LlmPersona { top_p: self.top_p, presence_penalty: self.presence_penalty, frequency_penalty: self.frequency_penalty, - response_format: None, + // Implementer personas (Benjamin/Lucas) must emit a JSON mutations + // block; ask OpenAI-compatible live providers for strictly valid JSON + // so the patch reliably parses. Planner/researcher/evaluator personas + // produce prose and must NOT be forced to JSON. The deterministic stub + // ignores this field, so the hermetic path is unchanged. + response_format: match self.persona { + Persona::Benjamin | Persona::Lucas => Some("json_object".to_string()), + _ => None, + }, }; let response = self.provider.complete(request).await?; diff --git a/crates/korg-runtime/src/session.rs b/crates/korg-runtime/src/session.rs index 5a03f1a..8e9e4fe 100644 --- a/crates/korg-runtime/src/session.rs +++ b/crates/korg-runtime/src/session.rs @@ -358,6 +358,17 @@ impl SessionBackend for SubprocessBackend { cmd.env(k, v); } + // The worker builds its own LLM provider from `KorgConfig::load()`, which + // reads these env vars. The child inherits the parent env by default, but + // forward them explicitly so the worker's provider selection (e.g. + // `--provider ollama` exported by the CLI) is visible here and survives + // even if the inherited environment is ever cleared. + for key in ["KORG_DEFAULT_LLM", "KORG_MODEL", "OLLAMA_BASE_URL"] { + if let Ok(val) = std::env::var(key) { + cmd.env(key, val); + } + } + #[cfg(unix)] { use std::os::unix::process::CommandExt; diff --git a/src/main.rs b/src/main.rs index f2339f0..164d62a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -106,6 +106,21 @@ struct Cli { #[arg(long)] speculative: bool, + /// LLM provider for the whole swarm: `deterministic` (default, hermetic) or + /// `ollama` (live local model — every persona does real, measured work). + /// Exported as `KORG_DEFAULT_LLM` so each worker subprocess builds it too. + #[arg(long, global = true)] + provider: Option, + + /// Model name for a live provider (e.g. `qwen2.5:7b`). Exported as `KORG_MODEL`. + #[arg(long, global = true)] + model: Option, + + /// Base URL for a live provider (ollama default http://localhost:11434/v1). + /// Exported as `OLLAMA_BASE_URL`. + #[arg(long, global = true)] + base_url: Option, + #[command(subcommand)] command: Option, } @@ -260,18 +275,6 @@ enum Commands { /// Target repo. Defaults to a temp git-inited copy of the bundled fixture. #[arg(long)] repo: Option, - - /// Provider: `deterministic` (default, hermetic) or `ollama` (live local model). - #[arg(long, default_value = "deterministic")] - provider: String, - - /// Model name for live providers (e.g. `qwen2.5:7b` for ollama). - #[arg(long)] - model: Option, - - /// Base URL override for the live provider (ollama default: http://localhost:11434/v1). - #[arg(long)] - base_url: Option, }, /// Run the premium Claude Code cooperative session replay and speculative rewind demo @@ -494,6 +497,27 @@ async fn main() -> Result<()> { korg_registry::IS_PREVIEW_MODE.store(true, std::sync::atomic::Ordering::Relaxed); } + // Export the swarm's LLM provider selection so every worker SUBPROCESS — each + // a separate `korg worker` OS process that builds its own provider via + // `KorgConfig::load()` (which reads these env vars) — uses it. Set once here at + // startup; the children inherit the environment on spawn. This makes the full + // multi-persona campaign run on a real model (`--provider ollama`) with no + // config threading. `run-once` reads the flags directly and ignores this. + // + // `set_var` is unsound if another thread reads the environment concurrently. + // These writes run once at the very top of `main()`, before any campaign task, + // worker spawn, or `KorgConfig::load()` is reached — so no concurrent env + // access is in flight here. (Edition 2021; `set_var` is still a safe fn.) + if let Some(provider) = &cli.provider { + std::env::set_var("KORG_DEFAULT_LLM", provider); + if let Some(model) = &cli.model { + std::env::set_var("KORG_MODEL", model); + } + if let Some(base_url) = &cli.base_url { + std::env::set_var("OLLAMA_BASE_URL", base_url); + } + } + if let Some(prompt) = &cli.prompt { if cli.web { println!( @@ -836,14 +860,17 @@ async fn main() -> Result<()> { } } - Commands::RunOnce { - task, - repo, - provider, - model, - base_url, - } => { - run_once_command(task, repo, provider, model, base_url).await?; + Commands::RunOnce { task, repo } => { + run_once_command( + task, + repo, + cli.provider + .clone() + .unwrap_or_else(|| "deterministic".to_string()), + cli.model.clone(), + cli.base_url.clone(), + ) + .await?; } Commands::Demo => { diff --git a/tests/campaign_e2e.rs b/tests/campaign_e2e.rs new file mode 100644 index 0000000..3eefaa8 --- /dev/null +++ b/tests/campaign_e2e.rs @@ -0,0 +1,104 @@ +//! Gated end-to-end test for the multi-persona swarm campaign. +//! +//! Proves the campaign actually runs: each persona executes as a real `korg +//! worker` subprocess in a git worktree, completes its work, and sends a +//! `TerminationReport`, so the leader records it as DONE — not the false +//! `exit_code=-1` "crash" that stdout pollution used to cause for *every* +//! worker. Guards the swarm-real fix (worker sends TerminationReport; all logs +//! go to stderr so stdout stays a clean ACP channel). +//! +//! GATED: spawns the `korg` binary plus worker subprocesses in git worktrees — +//! CI-hostile and slow (~60-90s). Run locally: +//! cargo test --test campaign_e2e -- --ignored --nocapture + +use std::process::Command; + +fn git(dir: &std::path::Path, args: &[&str]) { + let out = Command::new("git") + .args(args) + .current_dir(dir) + .output() + .expect("git spawn failed"); + assert!( + out.status.success(), + "git {:?} failed: {}", + args, + String::from_utf8_lossy(&out.stderr) + ); +} + +/// Last N lines of a log, for readable assertion failures. +fn tail(s: &str) -> String { + let lines: Vec<&str> = s.lines().collect(); + let start = lines.len().saturating_sub(40); + lines[start..].join("\n") +} + +#[test] +#[ignore = "spawns korg + worker subprocesses in git worktrees; CI-hostile/slow — run locally with --ignored"] +fn campaign_workers_complete_and_attest_real_work() { + // A fixture crate with the canonical add-bug (`a - b`). The deterministic + // provider produces Benjamin's real fix for this task, applied + measured. + let dir = std::env::temp_dir().join(format!("korg-campaign-e2e-{}", std::process::id())); + let _ = std::fs::remove_dir_all(&dir); + std::fs::create_dir_all(dir.join("src")).unwrap(); + std::fs::write( + dir.join("Cargo.toml"), + "[package]\nname = \"e2e\"\nversion = \"0.1.0\"\nedition = \"2021\"\n", + ) + .unwrap(); + std::fs::write( + dir.join("src/lib.rs"), + "pub fn add(a: i64, b: i64) -> i64 { a - b }\n", + ) + .unwrap(); + git(&dir, &["init", "-q"]); + git(&dir, &["add", "-A"]); + git( + &dir, + &[ + "-c", + "user.email=t@t", + "-c", + "user.name=t", + "commit", + "-qm", + "base", + ], + ); + + let out = Command::new(env!("CARGO_BIN_EXE_korg")) + .args([ + "Fix the add function in src/lib.rs so it adds", + "--goal", + "--provider", + "deterministic", + ]) + .current_dir(&dir) + .output() + .expect("run korg campaign"); + let log = String::from_utf8_lossy(&out.stderr); + + let _ = std::fs::remove_dir_all(&dir); + + // The workers must terminate SUCCESS — before the fix, stdout pollution + // corrupted the ACP stream and the leader stamped every worker crashed. + assert!( + log.contains("exit_status=success"), + "expected workers to terminate success (TerminationReport received); got:\n{}", + tail(&log) + ); + assert!( + !log.contains("worker_crashed"), + "no worker should be falsely marked crashed; got:\n{}", + tail(&log) + ); + // Benjamin (the implementer) attests exactly one REAL measured mutation — + // the applied fix on the fixture — proving real per-persona work flows + // through the campaign, not theater. + assert!( + log.contains("persona=\"Benjamin\" mutations=1"), + "Benjamin should attest one real measured mutation; got:\n{}", + tail(&log) + ); +} diff --git a/Tests/test_mcp_server.py b/tests/test_mcp_server.py similarity index 100% rename from Tests/test_mcp_server.py rename to tests/test_mcp_server.py