Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/korg-core/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub fn init_tracing() {
// Machine-readable JSON for log shipping / structured analysis
let json_layer = fmt::layer()
.json()
// Logs go to STDERR. STDOUT is reserved for program output — and, in
// `korg worker` subprocesses, for the ACP envelope stream the leader
// parses as JSON. A log line on stdout corrupts that protocol.
.with_writer(std::io::stderr)
.with_current_span(true)
.with_span_list(true)
.with_target(true)
Expand All @@ -46,6 +50,9 @@ pub fn init_tracing() {
} else {
// Human-readable pretty output for development
let pretty_layer = fmt::layer()
// Logs go to STDERR (see note above): stdout is the ACP channel for
// worker subprocesses and the program-output channel for the leader.
.with_writer(std::io::stderr)
.with_target(true)
.with_thread_ids(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
Expand Down
18 changes: 13 additions & 5 deletions crates/korg-runtime/src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ impl SingleWorkerHarness {
}

/// Main worker loop (legacy stub path).
///
/// Diagnostics go to STDERR: if this path is ever reached from a worker
/// subprocess, stdout must stay a clean ACP envelope channel.
pub async fn run(&mut self, client: &mut AcpClient) -> Result<()> {
println!(
eprintln!(
"[Harness] Worker {} entering main loop (legacy client path)",
self.worker_id
);
Expand All @@ -49,7 +52,7 @@ impl SingleWorkerHarness {
permissions,
..
} => {
println!(
eprintln!(
"[Harness] Received RouteWork with base_snapshot: {}",
base_snapshot
);
Expand All @@ -64,12 +67,12 @@ impl SingleWorkerHarness {
.await?;
}
_ => {
println!("[Harness] Received unhandled message: {:?}", msg);
eprintln!("[Harness] Received unhandled message: {:?}", msg);
}
}
}

println!("[Harness] Worker {} exiting after task", self.worker_id);
eprintln!("[Harness] Worker {} exiting after task", self.worker_id);
Ok(())
}

Expand Down Expand Up @@ -142,6 +145,10 @@ impl SingleWorkerHarness {
eprintln!("[Harness] Sent signed tool result back to leader");
}
}
// handle_route_work already sent the authoritative
// TerminationReport (carrying the real success/doom_loop
// status + terminal_tx_id). The leader reads it now that
// stdout is a clean ACP channel — no extra report needed.
}

// === Direct tool request (if worker is sent a tool as first message) ===
Expand Down Expand Up @@ -346,7 +353,8 @@ impl SingleWorkerHarness {
loop {
ticker.tick().await;
tick += 1;
println!(
// stderr, not stdout — stdout is the worker's ACP envelope channel.
eprintln!(
"[TelemetryEmitter] {} – live pulse #{} (continuous real-time telemetry)",
rid, tick
);
Expand Down
10 changes: 9 additions & 1 deletion crates/korg-runtime/src/personas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,15 @@ impl LlmPersona {
top_p: self.top_p,
presence_penalty: self.presence_penalty,
frequency_penalty: self.frequency_penalty,
response_format: None,
// Implementer personas (Benjamin/Lucas) must emit a JSON mutations
// block; ask OpenAI-compatible live providers for strictly valid JSON
// so the patch reliably parses. Planner/researcher/evaluator personas
// produce prose and must NOT be forced to JSON. The deterministic stub
// ignores this field, so the hermetic path is unchanged.
response_format: match self.persona {
Persona::Benjamin | Persona::Lucas => Some("json_object".to_string()),
_ => None,
},
};

let response = self.provider.complete(request).await?;
Expand Down
11 changes: 11 additions & 0 deletions crates/korg-runtime/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,17 @@ impl SessionBackend for SubprocessBackend {
cmd.env(k, v);
}

// The worker builds its own LLM provider from `KorgConfig::load()`, which
// reads these env vars. The child inherits the parent env by default, but
// forward them explicitly so the worker's provider selection (e.g.
// `--provider ollama` exported by the CLI) is visible here and survives
// even if the inherited environment is ever cleared.
for key in ["KORG_DEFAULT_LLM", "KORG_MODEL", "OLLAMA_BASE_URL"] {
if let Ok(val) = std::env::var(key) {
cmd.env(key, val);
}
}

#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
Expand Down
67 changes: 47 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ struct Cli {
#[arg(long)]
speculative: bool,

/// LLM provider for the whole swarm: `deterministic` (default, hermetic) or
/// `ollama` (live local model — every persona does real, measured work).
/// Exported as `KORG_DEFAULT_LLM` so each worker subprocess builds it too.
#[arg(long, global = true)]
provider: Option<String>,

/// Model name for a live provider (e.g. `qwen2.5:7b`). Exported as `KORG_MODEL`.
#[arg(long, global = true)]
model: Option<String>,

/// Base URL for a live provider (ollama default http://localhost:11434/v1).
/// Exported as `OLLAMA_BASE_URL`.
#[arg(long, global = true)]
base_url: Option<String>,

#[command(subcommand)]
command: Option<Commands>,
}
Expand Down Expand Up @@ -260,18 +275,6 @@ enum Commands {
/// Target repo. Defaults to a temp git-inited copy of the bundled fixture.
#[arg(long)]
repo: Option<std::path::PathBuf>,

/// Provider: `deterministic` (default, hermetic) or `ollama` (live local model).
#[arg(long, default_value = "deterministic")]
provider: String,

/// Model name for live providers (e.g. `qwen2.5:7b` for ollama).
#[arg(long)]
model: Option<String>,

/// Base URL override for the live provider (ollama default: http://localhost:11434/v1).
#[arg(long)]
base_url: Option<String>,
},

/// Run the premium Claude Code cooperative session replay and speculative rewind demo
Expand Down Expand Up @@ -494,6 +497,27 @@ async fn main() -> Result<()> {
korg_registry::IS_PREVIEW_MODE.store(true, std::sync::atomic::Ordering::Relaxed);
}

// Export the swarm's LLM provider selection so every worker SUBPROCESS — each
// a separate `korg worker` OS process that builds its own provider via
// `KorgConfig::load()` (which reads these env vars) — uses it. Set once here at
// startup; the children inherit the environment on spawn. This makes the full
// multi-persona campaign run on a real model (`--provider ollama`) with no
// config threading. `run-once` reads the flags directly and ignores this.
//
// `set_var` is unsound if another thread reads the environment concurrently.
// These writes run once at the very top of `main()`, before any campaign task,
// worker spawn, or `KorgConfig::load()` is reached — so no concurrent env
// access is in flight here. (Edition 2021; `set_var` is still a safe fn.)
if let Some(provider) = &cli.provider {
std::env::set_var("KORG_DEFAULT_LLM", provider);
if let Some(model) = &cli.model {
std::env::set_var("KORG_MODEL", model);
}
if let Some(base_url) = &cli.base_url {
std::env::set_var("OLLAMA_BASE_URL", base_url);
}
}

if let Some(prompt) = &cli.prompt {
if cli.web {
println!(
Expand Down Expand Up @@ -836,14 +860,17 @@ async fn main() -> Result<()> {
}
}

Commands::RunOnce {
task,
repo,
provider,
model,
base_url,
} => {
run_once_command(task, repo, provider, model, base_url).await?;
Commands::RunOnce { task, repo } => {
run_once_command(
task,
repo,
cli.provider
.clone()
.unwrap_or_else(|| "deterministic".to_string()),
cli.model.clone(),
cli.base_url.clone(),
)
.await?;
}

Commands::Demo => {
Expand Down
104 changes: 104 additions & 0 deletions tests/campaign_e2e.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Gated end-to-end test for the multi-persona swarm campaign.
//!
//! Proves the campaign actually runs: each persona executes as a real `korg
//! worker` subprocess in a git worktree, completes its work, and sends a
//! `TerminationReport`, so the leader records it as DONE — not the false
//! `exit_code=-1` "crash" that stdout pollution used to cause for *every*
//! worker. Guards the swarm-real fix (worker sends TerminationReport; all logs
//! go to stderr so stdout stays a clean ACP channel).
//!
//! GATED: spawns the `korg` binary plus worker subprocesses in git worktrees —
//! CI-hostile and slow (~60-90s). Run locally:
//! cargo test --test campaign_e2e -- --ignored --nocapture

use std::process::Command;

fn git(dir: &std::path::Path, args: &[&str]) {
let out = Command::new("git")
.args(args)
.current_dir(dir)
.output()
.expect("git spawn failed");
assert!(
out.status.success(),
"git {:?} failed: {}",
args,
String::from_utf8_lossy(&out.stderr)
);
}

/// Last N lines of a log, for readable assertion failures.
fn tail(s: &str) -> String {
let lines: Vec<&str> = s.lines().collect();
let start = lines.len().saturating_sub(40);
lines[start..].join("\n")
}

#[test]
#[ignore = "spawns korg + worker subprocesses in git worktrees; CI-hostile/slow — run locally with --ignored"]
fn campaign_workers_complete_and_attest_real_work() {
// A fixture crate with the canonical add-bug (`a - b`). The deterministic
// provider produces Benjamin's real fix for this task, applied + measured.
let dir = std::env::temp_dir().join(format!("korg-campaign-e2e-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(dir.join("src")).unwrap();
std::fs::write(
dir.join("Cargo.toml"),
"[package]\nname = \"e2e\"\nversion = \"0.1.0\"\nedition = \"2021\"\n",
)
.unwrap();
std::fs::write(
dir.join("src/lib.rs"),
"pub fn add(a: i64, b: i64) -> i64 { a - b }\n",
)
.unwrap();
git(&dir, &["init", "-q"]);
git(&dir, &["add", "-A"]);
git(
&dir,
&[
"-c",
"user.email=t@t",
"-c",
"user.name=t",
"commit",
"-qm",
"base",
],
);

let out = Command::new(env!("CARGO_BIN_EXE_korg"))
.args([
"Fix the add function in src/lib.rs so it adds",
"--goal",
"--provider",
"deterministic",
])
.current_dir(&dir)
.output()
.expect("run korg campaign");
let log = String::from_utf8_lossy(&out.stderr);

let _ = std::fs::remove_dir_all(&dir);

// The workers must terminate SUCCESS — before the fix, stdout pollution
// corrupted the ACP stream and the leader stamped every worker crashed.
assert!(
log.contains("exit_status=success"),
"expected workers to terminate success (TerminationReport received); got:\n{}",
tail(&log)
);
assert!(
!log.contains("worker_crashed"),
"no worker should be falsely marked crashed; got:\n{}",
tail(&log)
);
// Benjamin (the implementer) attests exactly one REAL measured mutation —
// the applied fix on the fixture — proving real per-persona work flows
// through the campaign, not theater.
assert!(
log.contains("persona=\"Benjamin\" mutations=1"),
"Benjamin should attest one real measured mutation; got:\n{}",
tail(&log)
);
}
File renamed without changes.
Loading