diff --git a/lib/clauck b/lib/clauck index d1f882d..3229404 100755 --- a/lib/clauck +++ b/lib/clauck @@ -96,6 +96,7 @@ finally: HOME = Path(os.environ.get("HOME", str(Path.home()))) JOBS_DIR = HOME / ".clauck" +DAG_LOGS_DIR = JOBS_DIR / ".dag-logs" STATE_DIR = JOBS_DIR / ".state" BROKEN_DIR = JOBS_DIR / ".broken" MANIFEST = JOBS_DIR / ".manifest.json" @@ -542,16 +543,6 @@ def cmd_cost(name: str = "", days: int = 30, all_time: bool = False) -> None: now = datetime.now(timezone.utc) cutoff = None if all_time else now - timedelta(days=days) - # Load known job names once so we can tell apart a DAG orchestration log - # (e.g. standup-dag-.log, where standup is a real job) from a log - # produced by a job that happens to be named foo-dag. - known_jobs: "set[str]" = set() - if MANIFEST.exists(): - try: - known_jobs = {j["name"] for j in json.loads(MANIFEST.read_text()).get("jobs", [])} - except (ValueError, KeyError): - pass - stats: dict[str, dict] = {} for log_file in JOBS_DIR.glob("*-[0-9]*T[0-9]*Z-[0-9]*.log"): @@ -559,10 +550,6 @@ def cmd_cost(name: str = "", days: int = 30, all_time: bool = False) -> None: if not m: continue job_name, ts_str = m.group(1), m.group(2) - # Skip DAG orchestration logs only when the -dag suffix is not itself a - # real job name — a job named e.g. "standup-dag" should not be excluded. - if job_name.endswith("-dag") and job_name not in known_jobs: - continue if name and job_name != name: continue try: @@ -959,14 +946,14 @@ def _active_log(name: str) -> "Path | None": def _active_dag_log(name: str) -> "Path | None": """Return the currently-running DAG orchestration log for *name*, or None. - DAG logs: ~/.clauck/-dag--.log + DAG logs: ~/.clauck/.dag-logs/--.log Active detection: most recent log without an exit_code tombstone whose pid= header line refers to a live process. """ import os as _os logs = sorted( - JOBS_DIR.glob(f"{name}-dag-[0-9]*.log"), + DAG_LOGS_DIR.glob(f"{name}-[0-9]*.log"), key=lambda p: p.stat().st_mtime, reverse=True, ) @@ -1071,7 +1058,7 @@ def cmd_logs(name: str, last: int = 5, show: int = 0, follow: bool = False) -> N # pipeline timeline — none of which appears in a single node's log. dag_active = _active_dag_log(name) dag_logs = sorted( - JOBS_DIR.glob(f"{name}-dag-[0-9]*.log"), + DAG_LOGS_DIR.glob(f"{name}-[0-9]*.log"), key=lambda p: p.stat().st_mtime, reverse=True, ) @@ -1168,16 +1155,6 @@ def cmd_history( all_time: bool = False, ) -> None: """Cross-job invocation timeline sorted by most recent first.""" - import re as _re - - # Load known job names so -dag-suffix logs from real jobs named foo-dag - # are not misclassified as DAG orchestration logs. - known_jobs: "set[str]" = set() - if MANIFEST.exists(): - try: - known_jobs = {j["name"] for j in json.loads(MANIFEST.read_text()).get("jobs", [])} - except (ValueError, KeyError): - pass # Glob all log files, sorted newest first by mtime logs = sorted(JOBS_DIR.glob("*-[0-9]*.log"), key=lambda p: p.stat().st_mtime, reverse=True) @@ -1189,11 +1166,6 @@ def cmd_history( summaries = [] for log in logs: - # Skip DAG orchestration logs — they describe pipeline coordination, not - # job runs. But don't skip logs from a real job named foo-dag. - m_dag = _re.search(r"^(.+)-dag-\d{8}T\d{6}Z-\d+\.log$", log.name) - if m_dag and m_dag.group(1) + "-dag" not in known_jobs: - continue if cutoff_mtime and log.stat().st_mtime < cutoff_mtime: # Logs are sorted newest-first; once we go past the cutoff we're done break @@ -1920,6 +1892,7 @@ def cmd_peek() -> None: def watch_new_logs(): while True: current = set(glob.glob(str(JOBS_DIR / "*-[0-9]*.log"))) + current.update(glob.glob(str(DAG_LOGS_DIR / "*-[0-9]*.log"))) new = current - seen_files for f in sorted(new): seen_files.add(f) @@ -3040,7 +3013,7 @@ Priority order — not a mandatory checklist: 4. **Manifest errors** — read `{MANIFEST}` and check for `dag_errors` key (cycle detection or missing-producer errors). 5. **Auto-disabled jobs** — `ls {STATE_DIR}/*.auto-disabled 2>/dev/null`; read each file for the reason. 6. **Recent failures** — scan `{JOBS_DIR}/*.log` for recent non-zero exit_code tombstones. -7. **DAG invocation logs** — `{JOBS_DIR}/*-dag-*.log` for pipeline failures. +7. **DAG invocation logs** — `{DAG_LOGS_DIR}/*.log` for pipeline failures. 8. **Frontmatter errors** — parse each job .md; report malformed YAML. 9. **Version / update state** — `cat {JOBS_DIR}/.version` and `{STATE_DIR}/.update-available` if present. diff --git a/lib/dag-runner.py b/lib/dag-runner.py index 722e15f..89f68cd 100644 --- a/lib/dag-runner.py +++ b/lib/dag-runner.py @@ -31,6 +31,7 @@ HOME = Path(os.environ.get("HOME", str(Path.home()))) JOBS_DIR = HOME / ".clauck" +DAG_LOGS_DIR = JOBS_DIR / ".dag-logs" STATE_DIR = JOBS_DIR / ".state" INVOCATION_DIR = STATE_DIR / ".dag-invocations" MANIFEST_PATH = JOBS_DIR / ".manifest.json" @@ -63,10 +64,10 @@ def __init__(self, root_name: str, invocation_id: str): self.invocation_id = invocation_id ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") pid = os.getpid() - self.log_path = JOBS_DIR / f"{root_name}-dag-{ts}-{pid}.log" + self.log_path = DAG_LOGS_DIR / f"{root_name}-{ts}-{pid}.log" self.oplog: list[dict] = [] # Create log file immediately so failures are observable. - JOBS_DIR.mkdir(parents=True, exist_ok=True) + DAG_LOGS_DIR.mkdir(parents=True, exist_ok=True) self._write(f"=== dag-runner start: {root_name} @ {ts} ===") self._write(f"invocation_id={invocation_id}") self._write(f"pid={pid}") diff --git a/tests/test_clauck_history_cost.py b/tests/test_clauck_history_cost.py index fa116fc..020c1a6 100644 --- a/tests/test_clauck_history_cost.py +++ b/tests/test_clauck_history_cost.py @@ -181,11 +181,19 @@ class TestCmdCost(unittest.TestCase): def setUp(self): self.tmp = tempfile.TemporaryDirectory() self.jobs_dir = Path(self.tmp.name) + self.dag_logs_dir = self.jobs_dir / ".dag-logs" + self.dag_logs_dir.mkdir() self._orig = _mod.JOBS_DIR + self._orig_dag_logs = getattr(_mod, "DAG_LOGS_DIR", None) _mod.JOBS_DIR = self.jobs_dir + _mod.DAG_LOGS_DIR = self.dag_logs_dir def tearDown(self): _mod.JOBS_DIR = self._orig + if self._orig_dag_logs is None: + delattr(_mod, "DAG_LOGS_DIR") + else: + _mod.DAG_LOGS_DIR = self._orig_dag_logs self.tmp.cleanup() def _log(self, name: str, ts: str, pid: str, content: str) -> Path: @@ -258,14 +266,27 @@ def test_failure_run_counted_in_fails_column(self): self.assertIn("1", out) def test_dag_log_excluded(self): - # A "-dag-" pattern in the filename marks orchestration logs - dag_log = self.jobs_dir / "my-pipeline-dag-20260418T170000Z-400.log" + dag_log = self.dag_logs_dir / "my-pipeline-20260418T170000Z-400.log" dag_log.write_text( '{"total_cost_usd":9.9999}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n' ) out = self._run() self.assertIn("no log data found", out) + def test_real_dash_dag_job_is_not_polluted_by_dag_root_log(self): + self._log( + "standup-dag", "20260418T170000Z", "410", + '{"total_cost_usd":0.1250}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n', + ) + dag_log = self.dag_logs_dir / "standup-20260418T170500Z-411.log" + dag_log.write_text( + '{"total_cost_usd":9.9999}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n' + ) + out = self._run(name="standup-dag", all_time=True) + self.assertIn("standup-dag", out) + self.assertIn("0.1250", out) + self.assertNotIn("9.9999", out) + def test_days_filter_excludes_old_log(self): # Log timestamped 60 days ago — outside default 30-day window old_ts = (datetime.now(timezone.utc) - timedelta(days=60)).strftime("%Y%m%dT%H%M%SZ") @@ -275,8 +296,8 @@ def test_days_filter_excludes_old_log(self): ) # Force mtime to match the timestamp (cmd_cost uses mtime for --days filter) import time as _time - sixty_days_ago = _time.time() - 60 * 86400 import os + sixty_days_ago = _time.time() - 60 * 86400 os.utime(log, (sixty_days_ago, sixty_days_ago)) out = self._run(days=30) self.assertIn("no log data found", out) @@ -294,6 +315,50 @@ def test_all_time_includes_old_log(self): self.assertIn("old-job", out) +class TestCmdHistory(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.jobs_dir = Path(self.tmp.name) + self.dag_logs_dir = self.jobs_dir / ".dag-logs" + self.dag_logs_dir.mkdir() + self._orig_jobs_dir = _mod.JOBS_DIR + self._orig_dag_logs = getattr(_mod, "DAG_LOGS_DIR", None) + _mod.JOBS_DIR = self.jobs_dir + _mod.DAG_LOGS_DIR = self.dag_logs_dir + + def tearDown(self): + _mod.JOBS_DIR = self._orig_jobs_dir + if self._orig_dag_logs is None: + delattr(_mod, "DAG_LOGS_DIR") + else: + _mod.DAG_LOGS_DIR = self._orig_dag_logs + self.tmp.cleanup() + + def _log(self, name: str, ts: str, pid: str, content: str) -> Path: + return _make_log(self.jobs_dir, name, ts, pid, content) + + def _run(self, **kwargs) -> str: + buf = io.StringIO() + with patch("sys.stdout", buf): + _mod.cmd_history(**kwargs) + return buf.getvalue() + + def test_real_dash_dag_job_history_excludes_orchestration_log(self): + self._log( + "standup-dag", "20260418T170000Z", "420", + '{"total_cost_usd":0.1250}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n', + ) + dag_log = self.dag_logs_dir / "standup-20260418T170500Z-421.log" + dag_log.write_text( + '{"total_cost_usd":9.9999}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n' + ) + out = self._run(job_filter="standup-dag", all_time=True) + self.assertIn("standup-dag", out) + self.assertIn("20260418T170000Z-420", out) + self.assertNotIn("20260418T170500Z-421", out) + + # --------------------------------------------------------------------------- # CLI parsing for `history` # --------------------------------------------------------------------------- diff --git a/tests/test_clauck_logs.py b/tests/test_clauck_logs.py index 7ffacec..db93a1b 100644 --- a/tests/test_clauck_logs.py +++ b/tests/test_clauck_logs.py @@ -33,8 +33,10 @@ class TestActiveLog(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() self.jobs_dir = Path(self.tmpdir) / ".clauck" + self.dag_logs_dir = self.jobs_dir / ".dag-logs" self.state_dir = self.jobs_dir / ".state" self.jobs_dir.mkdir() + self.dag_logs_dir.mkdir() self.state_dir.mkdir() def tearDown(self): @@ -45,6 +47,7 @@ def _patch_dirs(self): return patch.multiple( "clauck", JOBS_DIR=self.jobs_dir, + DAG_LOGS_DIR=self.dag_logs_dir, STATE_DIR=self.state_dir, ) @@ -112,6 +115,13 @@ def test_pid_file_missing_from_lock_dir_returns_none(self): with self._patch_dirs(): self.assertIsNone(clauck._active_log("myjob")) + def test_active_dag_log_uses_dedicated_dag_log_dir(self): + dag_log = self.dag_logs_dir / "pipeline-20260417T170000Z-123.log" + dag_log.write_text("=== dag start ===\npid=%d\n" % os.getpid()) + with self._patch_dirs(): + result = clauck._active_dag_log("pipeline") + self.assertEqual(result, dag_log) + class TestFollowLog(unittest.TestCase): """Tests for _follow_log().""" diff --git a/tests/test_dag_runner.py b/tests/test_dag_runner.py index 61ce0a0..11ce3bf 100644 --- a/tests/test_dag_runner.py +++ b/tests/test_dag_runner.py @@ -6,6 +6,7 @@ from __future__ import annotations import json +import os import sys import tempfile import time @@ -57,6 +58,21 @@ def _logger() -> _NullLogger: return _NullLogger() +class TestDagLogger(unittest.TestCase): + def test_dag_logger_writes_to_dedicated_dag_log_dir(self): + with tempfile.TemporaryDirectory() as tmp: + jobs_dir = Path(tmp) / ".clauck" + dag_logs_dir = jobs_dir / ".dag-logs" + with patch.object(dr, "JOBS_DIR", jobs_dir), \ + patch.object(dr, "DAG_LOGS_DIR", dag_logs_dir, create=True), \ + patch.object(dr.os, "getpid", return_value=123): + logger = dr.DagLogger("standup", "inv-1") + + self.assertEqual(logger.log_path.parent, dag_logs_dir) + self.assertTrue(logger.log_path.exists()) + self.assertIn("pid=123", logger.log_path.read_text()) + + # --------------------------------------------------------------------------- # _get_producers # ---------------------------------------------------------------------------