Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions crates/loopal-telemetry/src/file_metric_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
//! JSONL file exporter for OpenTelemetry metrics.

use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::path::Path;
use std::sync::Mutex;

use async_trait::async_trait;
Expand All @@ -12,9 +10,11 @@ use opentelemetry_sdk::metrics::data::{Metric, ResourceMetrics};
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use serde::Serialize;

use crate::lazy_jsonl::LazyJsonlFile;

/// Metric exporter that appends JSONL data points to a local file.
pub(crate) struct JsonlMetricExporter {
writer: Mutex<BufWriter<File>>,
sink: Mutex<LazyJsonlFile>,
}

impl Debug for JsonlMetricExporter {
Expand All @@ -24,14 +24,13 @@ impl Debug for JsonlMetricExporter {
}

impl JsonlMetricExporter {
pub fn new(dir: &PathBuf) -> std::io::Result<Self> {
pub fn new(dir: &Path) -> std::io::Result<Self> {
std::fs::create_dir_all(dir)?;
let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S");
let pid = std::process::id();
let path = dir.join(format!("metrics-{ts}-{pid}.jsonl"));
let file = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self {
writer: Mutex::new(BufWriter::new(file)),
sink: Mutex::new(LazyJsonlFile::new(path)),
})
}

Expand Down Expand Up @@ -127,15 +126,15 @@ impl JsonlMetricExporter {

fn write_json(&self, record: &MetricRecord) {
if let Ok(json) = serde_json::to_string(record)
&& let Ok(mut w) = self.writer.lock()
&& let Ok(mut sink) = self.sink.lock()
{
let _ = writeln!(w, "{json}");
sink.write_line(&json);
}
}

fn flush(&self) {
if let Ok(mut w) = self.writer.lock() {
let _ = w.flush();
if let Ok(mut sink) = self.sink.lock() {
sink.flush();
}
}
}
Expand Down
19 changes: 9 additions & 10 deletions crates/loopal-telemetry/src/file_span_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
//! Designed for offline analysis with `jq`, `pandas`, etc.

use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::future::Future;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::path::Path;
use std::pin::Pin;
use std::time::SystemTime;

Expand All @@ -16,11 +14,13 @@ use opentelemetry_sdk::Resource;
use opentelemetry_sdk::trace::{SpanData, SpanExporter};
use serde::Serialize;

use crate::lazy_jsonl::LazyJsonlFile;

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;

/// Span exporter that appends one JSONL line per span to a local file.
pub(crate) struct JsonlSpanExporter {
writer: BufWriter<File>,
sink: LazyJsonlFile,
}

impl Debug for JsonlSpanExporter {
Expand All @@ -30,14 +30,13 @@ impl Debug for JsonlSpanExporter {
}

impl JsonlSpanExporter {
pub fn new(dir: &PathBuf) -> std::io::Result<Self> {
pub fn new(dir: &Path) -> std::io::Result<Self> {
std::fs::create_dir_all(dir)?;
let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S");
let pid = std::process::id();
let path = dir.join(format!("traces-{ts}-{pid}.jsonl"));
let file = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self {
writer: BufWriter::new(file),
sink: LazyJsonlFile::new(path),
})
}

Expand Down Expand Up @@ -65,7 +64,7 @@ impl JsonlSpanExporter {
.collect(),
};
if let Ok(json) = serde_json::to_string(&record) {
let _ = writeln!(self.writer, "{json}");
self.sink.write_line(&json);
}
}
}
Expand All @@ -78,12 +77,12 @@ impl SpanExporter for JsonlSpanExporter {
for span in &batch {
self.write_span(span);
}
let _ = self.writer.flush();
self.sink.flush();
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) -> opentelemetry_sdk::error::OTelSdkResult {
let _ = self.writer.flush();
self.sink.flush();
Ok(())
}

Expand Down
87 changes: 87 additions & 0 deletions crates/loopal-telemetry/src/lazy_jsonl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::PathBuf;

/// Append-only JSONL sink that creates its backing file on the first write.
/// A process that emits no spans/metrics never touches the filesystem, so
/// short-lived processes (sub-agents, CLI subcommands, a hub child that exits
/// early) stop leaving empty `traces-`/`metrics-` files behind.
pub(crate) struct LazyJsonlFile {
path: PathBuf,
writer: Option<BufWriter<File>>,
}

impl LazyJsonlFile {
pub(crate) fn new(path: PathBuf) -> Self {
Self { path, writer: None }
}

pub(crate) fn write_line(&mut self, line: &str) {
if let Some(w) = self.writer_mut() {
let _ = writeln!(w, "{line}");
}
}

pub(crate) fn flush(&mut self) {
if let Some(w) = self.writer.as_mut() {
let _ = w.flush();
}
}

fn writer_mut(&mut self) -> Option<&mut BufWriter<File>> {
if self.writer.is_none() {
if let Some(parent) = self.path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.ok()?;
self.writer = Some(BufWriter::new(file));
}
self.writer.as_mut()
}
}

#[cfg(test)]
mod tests {
use super::*;

fn scoped_path(tag: &str) -> PathBuf {
use std::sync::atomic::{AtomicU32, Ordering};
static COUNTER: AtomicU32 = AtomicU32::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let dir = std::env::temp_dir().join(format!("loopal_lazy_{}_{n}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
dir.join(format!("{tag}.jsonl"))
}

#[test]
fn no_file_created_without_write() {
let path = scoped_path("traces-x-1");
let sink = LazyJsonlFile::new(path.clone());
drop(sink);
assert!(!path.exists(), "file must not exist until first write");
}

#[test]
fn flush_without_write_creates_nothing() {
let path = scoped_path("metrics-x-1");
let mut sink = LazyJsonlFile::new(path.clone());
sink.flush();
assert!(!path.exists(), "flush alone must not create the file");
}

#[test]
fn first_write_creates_and_appends() {
let path = scoped_path("traces-x-2");
let mut sink = LazyJsonlFile::new(path.clone());
sink.write_line("{\"a\":1}");
sink.write_line("{\"b\":2}");
sink.flush();
let body = std::fs::read_to_string(&path).unwrap();
assert_eq!(body, "{\"a\":1}\n{\"b\":2}\n");
let _ = std::fs::remove_dir_all(path.parent().unwrap());
}
}
3 changes: 3 additions & 0 deletions crates/loopal-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
pub(crate) mod file_metric_exporter;
pub(crate) mod file_span_exporter;
mod filter;
mod lazy_jsonl;
mod logs;
mod metrics;
mod resource;
mod retention;
mod shutdown;
mod subscriber;
mod traces;

pub use filter::build_env_filter;
pub use retention::cleanup_telemetry_files;
pub use shutdown::TelemetryGuard;
pub use subscriber::init_subscriber;
138 changes: 138 additions & 0 deletions crates/loopal-telemetry/src/retention.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::time::SystemTime;

const MAX_FILES_PER_KIND: usize = 100;
const MAX_BYTES_PER_KIND: u64 = 50 * 1024 * 1024;
const KIND_PREFIXES: [&str; 2] = ["traces-", "metrics-"];
const EXT: &str = ".jsonl";

/// Bound the JSONL telemetry directory. Every process leaves a
/// `traces-{ts}-{pid}.jsonl` + `metrics-{ts}-{pid}.jsonl` pair, so without a cap
/// the directory grows monotonically across the multiprocess fleet. Enforces a
/// per-kind file-count and byte-size ceiling, dropping the oldest first.
///
/// `is_alive` guards a long-lived process's open file from being unlinked by a
/// short-lived child's cleanup run (same hazard as log rotation). Only files
/// matching the `traces-`/`metrics-` prefixes are ever touched — sibling files
/// like `secret_access.jsonl` are left alone.
pub fn cleanup_telemetry_files(dir: &Path, is_alive: impl Fn(u32) -> bool) {
for prefix in KIND_PREFIXES {
enforce_kind(dir, prefix, &is_alive);
}
}

fn enforce_kind(dir: &Path, prefix: &str, is_alive: &impl Fn(u32) -> bool) {
let Ok(entries) = fs::read_dir(dir) else {
return;
};
let mut files: Vec<(PathBuf, u64, SystemTime, bool)> = entries
.filter_map(|e| e.ok())
.filter(|e| {
let name = e.file_name().to_string_lossy().into_owned();
name.starts_with(prefix) && name.ends_with(EXT)
})
.filter_map(|e| {
let meta = e.metadata().ok()?;
let alive =
pid_from_telemetry_filename(&e.file_name().to_string_lossy()).is_some_and(is_alive);
Some((e.path(), meta.len(), meta.modified().ok()?, alive))
})
.collect();
files.sort_by_key(|(_, _, t, _)| *t);
let total_count = files.len();
let total_size: u64 = files.iter().map(|(_, s, _, _)| s).sum();
let (mut removed_count, mut removed_size) = (0usize, 0u64);
for (path, size, _, alive) in &files {
if total_count - removed_count <= MAX_FILES_PER_KIND
&& total_size - removed_size <= MAX_BYTES_PER_KIND
{
break;
}
if *alive {
continue;
}
let _ = fs::remove_file(path);
removed_count += 1;
removed_size += size;
}
}

pub(crate) fn pid_from_telemetry_filename(name: &str) -> Option<u32> {
let stem = KIND_PREFIXES
.iter()
.find_map(|p| name.strip_prefix(p))?
.strip_suffix(EXT)?;
let (_, pid) = stem.rsplit_once('-')?;
pid.parse::<u32>().ok()
}

#[cfg(test)]
mod tests {
use super::*;

fn scoped_dir() -> PathBuf {
use std::sync::atomic::{AtomicU32, Ordering};
static COUNTER: AtomicU32 = AtomicU32::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let p = std::env::temp_dir().join(format!("loopal_tele_{}_{n}", std::process::id()));
let _ = fs::remove_dir_all(&p);
fs::create_dir_all(&p).unwrap();
p
}

#[test]
fn pid_parsing_handles_both_kinds() {
assert_eq!(
pid_from_telemetry_filename("traces-20260410-012952-64885.jsonl"),
Some(64885)
);
assert_eq!(
pid_from_telemetry_filename("metrics-20260410-012952-64885.jsonl"),
Some(64885)
);
assert_eq!(pid_from_telemetry_filename("secret_access.jsonl"), None);
assert_eq!(pid_from_telemetry_filename("traces-nopid.txt"), None);
}

#[test]
fn never_deletes_alive_pid_file() {
let dir = scoped_dir();
let alive = dir.join("traces-20260512-100000-1001.jsonl");
fs::write(&alive, b"x").unwrap();
for i in 0..MAX_FILES_PER_KIND + 5 {
fs::write(dir.join(format!("traces-20260512-110000-{i}.jsonl")), b"x").unwrap();
}
cleanup_telemetry_files(&dir, |pid| pid == 1001);
assert!(alive.exists(), "alive PID's file must survive cleanup");
let _ = fs::remove_dir_all(&dir);
}

#[test]
fn enforces_per_kind_file_count() {
let dir = scoped_dir();
for i in 0..MAX_FILES_PER_KIND + 30 {
fs::write(dir.join(format!("metrics-20260512-110000-{i}.jsonl")), b"x").unwrap();
}
cleanup_telemetry_files(&dir, |_| false);
let kept = fs::read_dir(&dir).unwrap().count();
assert!(kept <= MAX_FILES_PER_KIND, "expected ≤cap, got {kept}");
let _ = fs::remove_dir_all(&dir);
}

#[test]
fn leaves_unrelated_files_untouched() {
let dir = scoped_dir();
let secret = dir.join("secret_access.jsonl");
let outraced = dir.join("classifier_outraced.jsonl");
fs::write(&secret, b"s").unwrap();
fs::write(&outraced, b"o").unwrap();
for i in 0..MAX_FILES_PER_KIND + 10 {
fs::write(dir.join(format!("traces-20260512-110000-{i}.jsonl")), b"x").unwrap();
}
cleanup_telemetry_files(&dir, |_| false);
assert!(secret.exists(), "secret_access.jsonl must never be touched");
assert!(outraced.exists(), "unrelated telemetry must be left alone");
let _ = fs::remove_dir_all(&dir);
}
}
15 changes: 10 additions & 5 deletions src/bootstrap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,16 @@ pub async fn run() -> anyhow::Result<()> {
loopal_git::cleanup_stale_worktrees(&repo_root);
}
discovery::cleanup_stale();
// reason: orphan cleanup must run in every process entry, not just
// multiprocess parent. daemon modes (serve / hub-only / acp) own
// sessions too and would otherwise accumulate tmp dirs indefinitely.
// Cost is a single fs read_dir + HashSet compare — negligible.
cleanup_bash_log_orphans().await;
// reason: orphan cleanup scans every session dir (list_sessions reads +
// parses every session.json). The hub-only child is always spawned by a
// parent that ran this same scan seconds earlier, so re-running it here is
// pure duplication — and on a large ~/.loopal/sessions tree it can exceed
// the 30s handshake timeout (hub_spawn::HANDSHAKE_TIMEOUT), getting the
// child killed as an unreachable orphan. Every other entry (incl. serve /
// acp daemons) still owns sessions and must run it to bound tmp growth.
if !cli.parent_only.hub_only {
cleanup_bash_log_orphans().await;
}

let mut config = load_config(&cwd)?;
cli.apply_overrides(&mut config.settings);
Expand Down
Loading
Loading