diff --git a/crates/loopal-telemetry/src/file_metric_exporter.rs b/crates/loopal-telemetry/src/file_metric_exporter.rs index ffd32847..65401e17 100644 --- a/crates/loopal-telemetry/src/file_metric_exporter.rs +++ b/crates/loopal-telemetry/src/file_metric_exporter.rs @@ -1,9 +1,7 @@ //! JSONL file exporter for OpenTelemetry metrics. use std::fmt::Debug; -use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Write}; -use std::path::PathBuf; +use std::path::Path; use std::sync::Mutex; use async_trait::async_trait; @@ -12,9 +10,11 @@ use opentelemetry_sdk::metrics::data::{Metric, ResourceMetrics}; use opentelemetry_sdk::metrics::exporter::PushMetricExporter; use serde::Serialize; +use crate::lazy_jsonl::LazyJsonlFile; + /// Metric exporter that appends JSONL data points to a local file. pub(crate) struct JsonlMetricExporter { - writer: Mutex>, + sink: Mutex, } impl Debug for JsonlMetricExporter { @@ -24,14 +24,13 @@ impl Debug for JsonlMetricExporter { } impl JsonlMetricExporter { - pub fn new(dir: &PathBuf) -> std::io::Result { + pub fn new(dir: &Path) -> std::io::Result { std::fs::create_dir_all(dir)?; let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S"); let pid = std::process::id(); let path = dir.join(format!("metrics-{ts}-{pid}.jsonl")); - let file = OpenOptions::new().create(true).append(true).open(path)?; Ok(Self { - writer: Mutex::new(BufWriter::new(file)), + sink: Mutex::new(LazyJsonlFile::new(path)), }) } @@ -127,15 +126,15 @@ impl JsonlMetricExporter { fn write_json(&self, record: &MetricRecord) { if let Ok(json) = serde_json::to_string(record) - && let Ok(mut w) = self.writer.lock() + && let Ok(mut sink) = self.sink.lock() { - let _ = writeln!(w, "{json}"); + sink.write_line(&json); } } fn flush(&self) { - if let Ok(mut w) = self.writer.lock() { - let _ = w.flush(); + if let Ok(mut sink) = self.sink.lock() { + sink.flush(); } } } diff --git a/crates/loopal-telemetry/src/file_span_exporter.rs b/crates/loopal-telemetry/src/file_span_exporter.rs index bb1c03dd..401eacb9 100644 --- a/crates/loopal-telemetry/src/file_span_exporter.rs +++ b/crates/loopal-telemetry/src/file_span_exporter.rs @@ -4,10 +4,8 @@ //! Designed for offline analysis with `jq`, `pandas`, etc. use std::fmt::Debug; -use std::fs::{File, OpenOptions}; use std::future::Future; -use std::io::{BufWriter, Write}; -use std::path::PathBuf; +use std::path::Path; use std::pin::Pin; use std::time::SystemTime; @@ -16,11 +14,13 @@ use opentelemetry_sdk::Resource; use opentelemetry_sdk::trace::{SpanData, SpanExporter}; use serde::Serialize; +use crate::lazy_jsonl::LazyJsonlFile; + type BoxFuture = Pin + Send>>; /// Span exporter that appends one JSONL line per span to a local file. pub(crate) struct JsonlSpanExporter { - writer: BufWriter, + sink: LazyJsonlFile, } impl Debug for JsonlSpanExporter { @@ -30,14 +30,13 @@ impl Debug for JsonlSpanExporter { } impl JsonlSpanExporter { - pub fn new(dir: &PathBuf) -> std::io::Result { + pub fn new(dir: &Path) -> std::io::Result { std::fs::create_dir_all(dir)?; let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S"); let pid = std::process::id(); let path = dir.join(format!("traces-{ts}-{pid}.jsonl")); - let file = OpenOptions::new().create(true).append(true).open(path)?; Ok(Self { - writer: BufWriter::new(file), + sink: LazyJsonlFile::new(path), }) } @@ -65,7 +64,7 @@ impl JsonlSpanExporter { .collect(), }; if let Ok(json) = serde_json::to_string(&record) { - let _ = writeln!(self.writer, "{json}"); + self.sink.write_line(&json); } } } @@ -78,12 +77,12 @@ impl SpanExporter for JsonlSpanExporter { for span in &batch { self.write_span(span); } - let _ = self.writer.flush(); + self.sink.flush(); Box::pin(std::future::ready(Ok(()))) } fn shutdown(&mut self) -> opentelemetry_sdk::error::OTelSdkResult { - let _ = self.writer.flush(); + self.sink.flush(); Ok(()) } diff --git a/crates/loopal-telemetry/src/lazy_jsonl.rs b/crates/loopal-telemetry/src/lazy_jsonl.rs new file mode 100644 index 00000000..4d1c84ac --- /dev/null +++ b/crates/loopal-telemetry/src/lazy_jsonl.rs @@ -0,0 +1,87 @@ +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Write}; +use std::path::PathBuf; + +/// Append-only JSONL sink that creates its backing file on the first write. +/// A process that emits no spans/metrics never touches the filesystem, so +/// short-lived processes (sub-agents, CLI subcommands, a hub child that exits +/// early) stop leaving empty `traces-`/`metrics-` files behind. +pub(crate) struct LazyJsonlFile { + path: PathBuf, + writer: Option>, +} + +impl LazyJsonlFile { + pub(crate) fn new(path: PathBuf) -> Self { + Self { path, writer: None } + } + + pub(crate) fn write_line(&mut self, line: &str) { + if let Some(w) = self.writer_mut() { + let _ = writeln!(w, "{line}"); + } + } + + pub(crate) fn flush(&mut self) { + if let Some(w) = self.writer.as_mut() { + let _ = w.flush(); + } + } + + fn writer_mut(&mut self) -> Option<&mut BufWriter> { + if self.writer.is_none() { + if let Some(parent) = self.path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.path) + .ok()?; + self.writer = Some(BufWriter::new(file)); + } + self.writer.as_mut() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn scoped_path(tag: &str) -> PathBuf { + use std::sync::atomic::{AtomicU32, Ordering}; + static COUNTER: AtomicU32 = AtomicU32::new(0); + let n = COUNTER.fetch_add(1, Ordering::Relaxed); + let dir = std::env::temp_dir().join(format!("loopal_lazy_{}_{n}", std::process::id())); + let _ = std::fs::remove_dir_all(&dir); + dir.join(format!("{tag}.jsonl")) + } + + #[test] + fn no_file_created_without_write() { + let path = scoped_path("traces-x-1"); + let sink = LazyJsonlFile::new(path.clone()); + drop(sink); + assert!(!path.exists(), "file must not exist until first write"); + } + + #[test] + fn flush_without_write_creates_nothing() { + let path = scoped_path("metrics-x-1"); + let mut sink = LazyJsonlFile::new(path.clone()); + sink.flush(); + assert!(!path.exists(), "flush alone must not create the file"); + } + + #[test] + fn first_write_creates_and_appends() { + let path = scoped_path("traces-x-2"); + let mut sink = LazyJsonlFile::new(path.clone()); + sink.write_line("{\"a\":1}"); + sink.write_line("{\"b\":2}"); + sink.flush(); + let body = std::fs::read_to_string(&path).unwrap(); + assert_eq!(body, "{\"a\":1}\n{\"b\":2}\n"); + let _ = std::fs::remove_dir_all(path.parent().unwrap()); + } +} diff --git a/crates/loopal-telemetry/src/lib.rs b/crates/loopal-telemetry/src/lib.rs index ee5f512f..d5b58125 100644 --- a/crates/loopal-telemetry/src/lib.rs +++ b/crates/loopal-telemetry/src/lib.rs @@ -8,13 +8,16 @@ pub(crate) mod file_metric_exporter; pub(crate) mod file_span_exporter; mod filter; +mod lazy_jsonl; mod logs; mod metrics; mod resource; +mod retention; mod shutdown; mod subscriber; mod traces; pub use filter::build_env_filter; +pub use retention::cleanup_telemetry_files; pub use shutdown::TelemetryGuard; pub use subscriber::init_subscriber; diff --git a/crates/loopal-telemetry/src/retention.rs b/crates/loopal-telemetry/src/retention.rs new file mode 100644 index 00000000..a158ba66 --- /dev/null +++ b/crates/loopal-telemetry/src/retention.rs @@ -0,0 +1,138 @@ +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +const MAX_FILES_PER_KIND: usize = 100; +const MAX_BYTES_PER_KIND: u64 = 50 * 1024 * 1024; +const KIND_PREFIXES: [&str; 2] = ["traces-", "metrics-"]; +const EXT: &str = ".jsonl"; + +/// Bound the JSONL telemetry directory. Every process leaves a +/// `traces-{ts}-{pid}.jsonl` + `metrics-{ts}-{pid}.jsonl` pair, so without a cap +/// the directory grows monotonically across the multiprocess fleet. Enforces a +/// per-kind file-count and byte-size ceiling, dropping the oldest first. +/// +/// `is_alive` guards a long-lived process's open file from being unlinked by a +/// short-lived child's cleanup run (same hazard as log rotation). Only files +/// matching the `traces-`/`metrics-` prefixes are ever touched — sibling files +/// like `secret_access.jsonl` are left alone. +pub fn cleanup_telemetry_files(dir: &Path, is_alive: impl Fn(u32) -> bool) { + for prefix in KIND_PREFIXES { + enforce_kind(dir, prefix, &is_alive); + } +} + +fn enforce_kind(dir: &Path, prefix: &str, is_alive: &impl Fn(u32) -> bool) { + let Ok(entries) = fs::read_dir(dir) else { + return; + }; + let mut files: Vec<(PathBuf, u64, SystemTime, bool)> = entries + .filter_map(|e| e.ok()) + .filter(|e| { + let name = e.file_name().to_string_lossy().into_owned(); + name.starts_with(prefix) && name.ends_with(EXT) + }) + .filter_map(|e| { + let meta = e.metadata().ok()?; + let alive = + pid_from_telemetry_filename(&e.file_name().to_string_lossy()).is_some_and(is_alive); + Some((e.path(), meta.len(), meta.modified().ok()?, alive)) + }) + .collect(); + files.sort_by_key(|(_, _, t, _)| *t); + let total_count = files.len(); + let total_size: u64 = files.iter().map(|(_, s, _, _)| s).sum(); + let (mut removed_count, mut removed_size) = (0usize, 0u64); + for (path, size, _, alive) in &files { + if total_count - removed_count <= MAX_FILES_PER_KIND + && total_size - removed_size <= MAX_BYTES_PER_KIND + { + break; + } + if *alive { + continue; + } + let _ = fs::remove_file(path); + removed_count += 1; + removed_size += size; + } +} + +pub(crate) fn pid_from_telemetry_filename(name: &str) -> Option { + let stem = KIND_PREFIXES + .iter() + .find_map(|p| name.strip_prefix(p))? + .strip_suffix(EXT)?; + let (_, pid) = stem.rsplit_once('-')?; + pid.parse::().ok() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn scoped_dir() -> PathBuf { + use std::sync::atomic::{AtomicU32, Ordering}; + static COUNTER: AtomicU32 = AtomicU32::new(0); + let n = COUNTER.fetch_add(1, Ordering::Relaxed); + let p = std::env::temp_dir().join(format!("loopal_tele_{}_{n}", std::process::id())); + let _ = fs::remove_dir_all(&p); + fs::create_dir_all(&p).unwrap(); + p + } + + #[test] + fn pid_parsing_handles_both_kinds() { + assert_eq!( + pid_from_telemetry_filename("traces-20260410-012952-64885.jsonl"), + Some(64885) + ); + assert_eq!( + pid_from_telemetry_filename("metrics-20260410-012952-64885.jsonl"), + Some(64885) + ); + assert_eq!(pid_from_telemetry_filename("secret_access.jsonl"), None); + assert_eq!(pid_from_telemetry_filename("traces-nopid.txt"), None); + } + + #[test] + fn never_deletes_alive_pid_file() { + let dir = scoped_dir(); + let alive = dir.join("traces-20260512-100000-1001.jsonl"); + fs::write(&alive, b"x").unwrap(); + for i in 0..MAX_FILES_PER_KIND + 5 { + fs::write(dir.join(format!("traces-20260512-110000-{i}.jsonl")), b"x").unwrap(); + } + cleanup_telemetry_files(&dir, |pid| pid == 1001); + assert!(alive.exists(), "alive PID's file must survive cleanup"); + let _ = fs::remove_dir_all(&dir); + } + + #[test] + fn enforces_per_kind_file_count() { + let dir = scoped_dir(); + for i in 0..MAX_FILES_PER_KIND + 30 { + fs::write(dir.join(format!("metrics-20260512-110000-{i}.jsonl")), b"x").unwrap(); + } + cleanup_telemetry_files(&dir, |_| false); + let kept = fs::read_dir(&dir).unwrap().count(); + assert!(kept <= MAX_FILES_PER_KIND, "expected ≤cap, got {kept}"); + let _ = fs::remove_dir_all(&dir); + } + + #[test] + fn leaves_unrelated_files_untouched() { + let dir = scoped_dir(); + let secret = dir.join("secret_access.jsonl"); + let outraced = dir.join("classifier_outraced.jsonl"); + fs::write(&secret, b"s").unwrap(); + fs::write(&outraced, b"o").unwrap(); + for i in 0..MAX_FILES_PER_KIND + 10 { + fs::write(dir.join(format!("traces-20260512-110000-{i}.jsonl")), b"x").unwrap(); + } + cleanup_telemetry_files(&dir, |_| false); + assert!(secret.exists(), "secret_access.jsonl must never be touched"); + assert!(outraced.exists(), "unrelated telemetry must be left alone"); + let _ = fs::remove_dir_all(&dir); + } +} diff --git a/src/bootstrap/mod.rs b/src/bootstrap/mod.rs index 3fdd18b3..74900b4f 100644 --- a/src/bootstrap/mod.rs +++ b/src/bootstrap/mod.rs @@ -77,11 +77,16 @@ pub async fn run() -> anyhow::Result<()> { loopal_git::cleanup_stale_worktrees(&repo_root); } discovery::cleanup_stale(); - // reason: orphan cleanup must run in every process entry, not just - // multiprocess parent. daemon modes (serve / hub-only / acp) own - // sessions too and would otherwise accumulate tmp dirs indefinitely. - // Cost is a single fs read_dir + HashSet compare — negligible. - cleanup_bash_log_orphans().await; + // reason: orphan cleanup scans every session dir (list_sessions reads + + // parses every session.json). The hub-only child is always spawned by a + // parent that ran this same scan seconds earlier, so re-running it here is + // pure duplication — and on a large ~/.loopal/sessions tree it can exceed + // the 30s handshake timeout (hub_spawn::HANDSHAKE_TIMEOUT), getting the + // child killed as an unreachable orphan. Every other entry (incl. serve / + // acp daemons) still owns sessions and must run it to bound tmp growth. + if !cli.parent_only.hub_only { + cleanup_bash_log_orphans().await; + } let mut config = load_config(&cwd)?; cli.apply_overrides(&mut config.settings); diff --git a/src/logging.rs b/src/logging.rs index 4f2fb696..a800a50e 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -16,6 +16,10 @@ pub fn init_logging( // Housekeep: remove old logs exceeding the retention policy crate::log_writer::cleanup_old_logs(&log_dir); + loopal_telemetry::cleanup_telemetry_files( + &telemetry_config.telemetry_dir(), + crate::bootstrap::is_alive, + ); let writer = crate::log_writer::RotatingFileWriter::new(&log_dir); let log_path = writer.current_path();