Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 6 additions & 33 deletions lib/clauck
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ finally:

HOME = Path(os.environ.get("HOME", str(Path.home())))
JOBS_DIR = HOME / ".clauck"
DAG_LOGS_DIR = JOBS_DIR / ".dag-logs"
STATE_DIR = JOBS_DIR / ".state"
BROKEN_DIR = JOBS_DIR / ".broken"
MANIFEST = JOBS_DIR / ".manifest.json"
Expand Down Expand Up @@ -542,27 +543,13 @@ def cmd_cost(name: str = "", days: int = 30, all_time: bool = False) -> None:
now = datetime.now(timezone.utc)
cutoff = None if all_time else now - timedelta(days=days)

# Load known job names once so we can tell apart a DAG orchestration log
# (e.g. standup-dag-<ts>.log, where standup is a real job) from a log
# produced by a job that happens to be named foo-dag.
known_jobs: "set[str]" = set()
if MANIFEST.exists():
try:
known_jobs = {j["name"] for j in json.loads(MANIFEST.read_text()).get("jobs", [])}
except (ValueError, KeyError):
pass

stats: dict[str, dict] = {}

for log_file in JOBS_DIR.glob("*-[0-9]*T[0-9]*Z-[0-9]*.log"):
m = log_pat.match(log_file.name)
if not m:
continue
job_name, ts_str = m.group(1), m.group(2)
# Skip DAG orchestration logs only when the -dag suffix is not itself a
# real job name — a job named e.g. "standup-dag" should not be excluded.
if job_name.endswith("-dag") and job_name not in known_jobs:
continue
if name and job_name != name:
Comment on lines 550 to 553
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve legacy DAG-log exclusion in cost aggregation

This loop now aggregates every top-level *-<ts>-<pid>.log under ~/.clauck, so users who upgrade with pre-change orchestration logs like <root>-dag-...log will have those legacy DAG runs counted as normal jobs (e.g., pipe-dag) in clauck cost. Before this commit those files were explicitly excluded unless -dag was a real job name, so this is a correctness regression for historical spend totals on existing installations.

Useful? React with 👍 / 👎.

continue
try:
Expand Down Expand Up @@ -959,14 +946,14 @@ def _active_log(name: str) -> "Path | None":
def _active_dag_log(name: str) -> "Path | None":
"""Return the currently-running DAG orchestration log for *name*, or None.

DAG logs: ~/.clauck/<name>-dag-<ts>-<pid>.log
DAG logs: ~/.clauck/.dag-logs/<name>-<ts>-<pid>.log
Active detection: most recent log without an exit_code tombstone whose
pid= header line refers to a live process.
"""
import os as _os

logs = sorted(
JOBS_DIR.glob(f"{name}-dag-[0-9]*.log"),
DAG_LOGS_DIR.glob(f"{name}-[0-9]*.log"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
Expand Down Expand Up @@ -1071,7 +1058,7 @@ def cmd_logs(name: str, last: int = 5, show: int = 0, follow: bool = False) -> N
# pipeline timeline — none of which appears in a single node's log.
dag_active = _active_dag_log(name)
dag_logs = sorted(
JOBS_DIR.glob(f"{name}-dag-[0-9]*.log"),
DAG_LOGS_DIR.glob(f"{name}-[0-9]*.log"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
Expand Down Expand Up @@ -1168,16 +1155,6 @@ def cmd_history(
all_time: bool = False,
) -> None:
"""Cross-job invocation timeline sorted by most recent first."""
import re as _re

# Load known job names so -dag-suffix logs from real jobs named foo-dag
# are not misclassified as DAG orchestration logs.
known_jobs: "set[str]" = set()
if MANIFEST.exists():
try:
known_jobs = {j["name"] for j in json.loads(MANIFEST.read_text()).get("jobs", [])}
except (ValueError, KeyError):
pass

# Glob all log files, sorted newest first by mtime
logs = sorted(JOBS_DIR.glob("*-[0-9]*.log"), key=lambda p: p.stat().st_mtime, reverse=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep legacy DAG logs out of history listings

cmd_history now scans all top-level logs in JOBS_DIR without filtering out legacy orchestration files (<root>-dag-<ts>-<pid>.log), so upgraded users can see old DAG coordinator runs reported as regular job invocations. This reintroduces ambiguous/misleading history entries for pre-migration data even though the command previously had explicit exclusion logic for these files.

Useful? React with 👍 / 👎.

Expand All @@ -1189,11 +1166,6 @@ def cmd_history(

summaries = []
for log in logs:
# Skip DAG orchestration logs — they describe pipeline coordination, not
# job runs. But don't skip logs from a real job named foo-dag.
m_dag = _re.search(r"^(.+)-dag-\d{8}T\d{6}Z-\d+\.log$", log.name)
if m_dag and m_dag.group(1) + "-dag" not in known_jobs:
continue
if cutoff_mtime and log.stat().st_mtime < cutoff_mtime:
# Logs are sorted newest-first; once we go past the cutoff we're done
break
Expand Down Expand Up @@ -1920,6 +1892,7 @@ def cmd_peek() -> None:
def watch_new_logs():
while True:
current = set(glob.glob(str(JOBS_DIR / "*-[0-9]*.log")))
current.update(glob.glob(str(DAG_LOGS_DIR / "*-[0-9]*.log")))
new = current - seen_files
for f in sorted(new):
seen_files.add(f)
Expand Down Expand Up @@ -3040,7 +3013,7 @@ Priority order — not a mandatory checklist:
4. **Manifest errors** — read `{MANIFEST}` and check for `dag_errors` key (cycle detection or missing-producer errors).
5. **Auto-disabled jobs** — `ls {STATE_DIR}/*.auto-disabled 2>/dev/null`; read each file for the reason.
6. **Recent failures** — scan `{JOBS_DIR}/*.log` for recent non-zero exit_code tombstones.
7. **DAG invocation logs** — `{JOBS_DIR}/*-dag-*.log` for pipeline failures.
7. **DAG invocation logs** — `{DAG_LOGS_DIR}/*.log` for pipeline failures.
8. **Frontmatter errors** — parse each job .md; report malformed YAML.
9. **Version / update state** — `cat {JOBS_DIR}/.version` and `{STATE_DIR}/.update-available` if present.

Expand Down
5 changes: 3 additions & 2 deletions lib/dag-runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

HOME = Path(os.environ.get("HOME", str(Path.home())))
JOBS_DIR = HOME / ".clauck"
DAG_LOGS_DIR = JOBS_DIR / ".dag-logs"
STATE_DIR = JOBS_DIR / ".state"
INVOCATION_DIR = STATE_DIR / ".dag-invocations"
MANIFEST_PATH = JOBS_DIR / ".manifest.json"
Expand Down Expand Up @@ -63,10 +64,10 @@ def __init__(self, root_name: str, invocation_id: str):
self.invocation_id = invocation_id
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
pid = os.getpid()
self.log_path = JOBS_DIR / f"{root_name}-dag-{ts}-{pid}.log"
self.log_path = DAG_LOGS_DIR / f"{root_name}-{ts}-{pid}.log"
self.oplog: list[dict] = []
# Create log file immediately so failures are observable.
JOBS_DIR.mkdir(parents=True, exist_ok=True)
DAG_LOGS_DIR.mkdir(parents=True, exist_ok=True)
self._write(f"=== dag-runner start: {root_name} @ {ts} ===")
self._write(f"invocation_id={invocation_id}")
self._write(f"pid={pid}")
Expand Down
71 changes: 68 additions & 3 deletions tests/test_clauck_history_cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,19 @@ class TestCmdCost(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.TemporaryDirectory()
self.jobs_dir = Path(self.tmp.name)
self.dag_logs_dir = self.jobs_dir / ".dag-logs"
self.dag_logs_dir.mkdir()
self._orig = _mod.JOBS_DIR
self._orig_dag_logs = getattr(_mod, "DAG_LOGS_DIR", None)
_mod.JOBS_DIR = self.jobs_dir
_mod.DAG_LOGS_DIR = self.dag_logs_dir

def tearDown(self):
_mod.JOBS_DIR = self._orig
if self._orig_dag_logs is None:
delattr(_mod, "DAG_LOGS_DIR")
else:
_mod.DAG_LOGS_DIR = self._orig_dag_logs
self.tmp.cleanup()

def _log(self, name: str, ts: str, pid: str, content: str) -> Path:
Expand Down Expand Up @@ -258,14 +266,27 @@ def test_failure_run_counted_in_fails_column(self):
self.assertIn("1", out)

def test_dag_log_excluded(self):
# A "-dag-" pattern in the filename marks orchestration logs
dag_log = self.jobs_dir / "my-pipeline-dag-20260418T170000Z-400.log"
dag_log = self.dag_logs_dir / "my-pipeline-20260418T170000Z-400.log"
dag_log.write_text(
'{"total_cost_usd":9.9999}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n'
)
out = self._run()
self.assertIn("no log data found", out)

def test_real_dash_dag_job_is_not_polluted_by_dag_root_log(self):
self._log(
"standup-dag", "20260418T170000Z", "410",
'{"total_cost_usd":0.1250}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n',
)
dag_log = self.dag_logs_dir / "standup-20260418T170500Z-411.log"
dag_log.write_text(
'{"total_cost_usd":9.9999}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n'
)
out = self._run(name="standup-dag", all_time=True)
self.assertIn("standup-dag", out)
self.assertIn("0.1250", out)
self.assertNotIn("9.9999", out)

def test_days_filter_excludes_old_log(self):
# Log timestamped 60 days ago — outside default 30-day window
old_ts = (datetime.now(timezone.utc) - timedelta(days=60)).strftime("%Y%m%dT%H%M%SZ")
Expand All @@ -275,8 +296,8 @@ def test_days_filter_excludes_old_log(self):
)
# Force mtime to match the timestamp (cmd_cost uses mtime for --days filter)
import time as _time
sixty_days_ago = _time.time() - 60 * 86400
import os
sixty_days_ago = _time.time() - 60 * 86400
os.utime(log, (sixty_days_ago, sixty_days_ago))
out = self._run(days=30)
self.assertIn("no log data found", out)
Expand All @@ -294,6 +315,50 @@ def test_all_time_includes_old_log(self):
self.assertIn("old-job", out)


class TestCmdHistory(unittest.TestCase):

def setUp(self):
self.tmp = tempfile.TemporaryDirectory()
self.jobs_dir = Path(self.tmp.name)
self.dag_logs_dir = self.jobs_dir / ".dag-logs"
self.dag_logs_dir.mkdir()
self._orig_jobs_dir = _mod.JOBS_DIR
self._orig_dag_logs = getattr(_mod, "DAG_LOGS_DIR", None)
_mod.JOBS_DIR = self.jobs_dir
_mod.DAG_LOGS_DIR = self.dag_logs_dir

def tearDown(self):
_mod.JOBS_DIR = self._orig_jobs_dir
if self._orig_dag_logs is None:
delattr(_mod, "DAG_LOGS_DIR")
else:
_mod.DAG_LOGS_DIR = self._orig_dag_logs
self.tmp.cleanup()

def _log(self, name: str, ts: str, pid: str, content: str) -> Path:
return _make_log(self.jobs_dir, name, ts, pid, content)

def _run(self, **kwargs) -> str:
buf = io.StringIO()
with patch("sys.stdout", buf):
_mod.cmd_history(**kwargs)
return buf.getvalue()

def test_real_dash_dag_job_history_excludes_orchestration_log(self):
self._log(
"standup-dag", "20260418T170000Z", "420",
'{"total_cost_usd":0.1250}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n',
)
dag_log = self.dag_logs_dir / "standup-20260418T170500Z-421.log"
dag_log.write_text(
'{"total_cost_usd":9.9999}\n{"terminal_reason":"completed"}\n--- exit_code=0 ---\n'
)
out = self._run(job_filter="standup-dag", all_time=True)
self.assertIn("standup-dag", out)
self.assertIn("20260418T170000Z-420", out)
self.assertNotIn("20260418T170500Z-421", out)


# ---------------------------------------------------------------------------
# CLI parsing for `history`
# ---------------------------------------------------------------------------
Expand Down
10 changes: 10 additions & 0 deletions tests/test_clauck_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ class TestActiveLog(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.jobs_dir = Path(self.tmpdir) / ".clauck"
self.dag_logs_dir = self.jobs_dir / ".dag-logs"
self.state_dir = self.jobs_dir / ".state"
self.jobs_dir.mkdir()
self.dag_logs_dir.mkdir()
self.state_dir.mkdir()

def tearDown(self):
Expand All @@ -45,6 +47,7 @@ def _patch_dirs(self):
return patch.multiple(
"clauck",
JOBS_DIR=self.jobs_dir,
DAG_LOGS_DIR=self.dag_logs_dir,
STATE_DIR=self.state_dir,
)

Expand Down Expand Up @@ -112,6 +115,13 @@ def test_pid_file_missing_from_lock_dir_returns_none(self):
with self._patch_dirs():
self.assertIsNone(clauck._active_log("myjob"))

def test_active_dag_log_uses_dedicated_dag_log_dir(self):
dag_log = self.dag_logs_dir / "pipeline-20260417T170000Z-123.log"
dag_log.write_text("=== dag start ===\npid=%d\n" % os.getpid())
with self._patch_dirs():
result = clauck._active_dag_log("pipeline")
self.assertEqual(result, dag_log)


class TestFollowLog(unittest.TestCase):
"""Tests for _follow_log()."""
Expand Down
16 changes: 16 additions & 0 deletions tests/test_dag_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import json
import os
import sys
import tempfile
import time
Expand Down Expand Up @@ -57,6 +58,21 @@ def _logger() -> _NullLogger:
return _NullLogger()


class TestDagLogger(unittest.TestCase):
def test_dag_logger_writes_to_dedicated_dag_log_dir(self):
with tempfile.TemporaryDirectory() as tmp:
jobs_dir = Path(tmp) / ".clauck"
dag_logs_dir = jobs_dir / ".dag-logs"
with patch.object(dr, "JOBS_DIR", jobs_dir), \
patch.object(dr, "DAG_LOGS_DIR", dag_logs_dir, create=True), \
patch.object(dr.os, "getpid", return_value=123):
logger = dr.DagLogger("standup", "inv-1")

self.assertEqual(logger.log_path.parent, dag_logs_dir)
self.assertTrue(logger.log_path.exists())
self.assertIn("pid=123", logger.log_path.read_text())


# ---------------------------------------------------------------------------
# _get_producers
# ---------------------------------------------------------------------------
Expand Down
Loading