From 29756ef1c52d6268f658ea9f0825e625df265a5f Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Sat, 6 Jun 2026 09:08:00 +0300 Subject: [PATCH 1/4] docs: refresh MAP improvement synthesis --- docs/improvement-plan.md | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/docs/improvement-plan.md b/docs/improvement-plan.md index 2af7f24..7c392b9 100644 --- a/docs/improvement-plan.md +++ b/docs/improvement-plan.md @@ -107,18 +107,23 @@ ## Family-specific scaling analysis for LLM evaluation [2604.014] -**Benefit Hypothesis**: For at least one agent role (e.g., Predictor or Evaluator), family-specific analysis will reduce the average number of Actor→Monitor iterations per subtask by >=10% (or increase Monitor approval rate by >=5 percentage points) compared to decisions made using cross-family/general heuristics, evaluated over the same set of workflows and compared via `python scripts/analyze-metrics.py` outputs. -**Confidence**: 0.62 -**Reasoning**: This project explicitly relies on per-agent model selection and quality/cost tradeoffs: it lists current model assignments (TaskDecomposer/Actor/Monitor/Predictor/Evaluator/Reflector etc.) and the ability to downgrade models (“Safe to downgrade to Haiku: Predictor, Evaluator…”). It also defines KPI targets and tracking via `python scripts/analyze-metrics.py` and mentions metrics stored in `.claude/metrics/agent_metrics.jsonl` plus workflow logs in `.map/workflow_logs/`. However, the architecture brief does not describe any method for analyzing scaling behavior by model family; the pre-approved idea argues parameter count/family effects matter more than size alone. Therefore, adding family-specific scaling analysis aligns with the architecture’s existing metrics/selection mechanisms and improves the model-choice loop that drives token/performance tradeoffs. -**Why Not Already Tried**: The brief provides a roadmap for context engineering (checkpointing, MCP caching, search, pattern variation) and template maintenance, but it does not mention any prior analytics that segment performance by model family. The only analytics mentioned are general KPI tracking (`scripts/analyze-metrics.py`) without family-stratified scaling curves, so this family-specific scaling approach has not yet been implemented. +**Source Idea Reference**: [[model-tier-vs-prompt-for-llm-skill-routing-map-replication]] (source note); [[measuring-the-sensitivity-of-llm-based-structured-extraction-to-prompt-model-and-86e5e4142058]] (paper note) +**Implementation Layer**: `src/mapify_cli/skills_eval/`, `src/mapify_cli/templates/skills/*/SKILL.md`, `src/mapify_cli/skill_ir.py`, skill-eval artifacts under `.map/eval-runs//`, and metrics analysis/reporting helpers. +**Missing Capability**: The skill-eval and model-selection loop does not yet separate three decisions that the MAP replication showed behave differently: skill-routing model tier, skill description quality, and execution-agent model tier. +**Architecture Evidence**: `docs/architecture.md` documents `mapify skill-eval run/optimize/view`, durable `.map/eval-runs//` artifacts, optional template patching for optimized skill descriptions, SkillIR/template audits, and per-agent effort/model policy surfaces. +**Benefit Hypothesis**: For skill routing, recording model-tier sweeps and description-quality diagnostics will keep the dispatcher on the cheapest reliable tier for each skill class and prevent wasted model upgrades when the real bottleneck is a weak `description:`. Pass criteria: a held-out skill-eval suite records per-skill Haiku/Sonnet/Opus trigger accuracy and latency, flags descriptions whose accuracy stays below threshold across all tiers, and demonstrates that accepted description edits improve held-out trigger accuracy without increasing negative-trigger false positives. +**Confidence**: 0.68 +**Reasoning**: The MAP replication gives project-specific evidence instead of a generic scaling claim: in skill routing, Haiku underperformed, Sonnet was the best accuracy/latency point overall, Opus did not dominate Sonnet, and weak descriptions such as `map-explain` remained low-accuracy across every tier. Murin's broader extraction result explains the same pattern as "redistribution, not uniform improvement" when model operating points change. This maps directly to MAP because the architecture now owns skill trigger evaluation, optimizer train/test splits, HTML reports, SkillIR checks, and provider template patching. The useful product move is not just "try bigger models"; it is to make skill routing tier, description edits, and execution model choice separately measurable. +**Why Not Already Tried**: The current plan item asked for generic model-family scaling curves. The shipped architecture added skill-eval run/optimize/view, but there is no acceptance contract that stores a tier sweep per skill, detects all-tier description failures before model escalation, or writes model-tier guidance back into skill/template maintenance docs. ### Proposed Changes -- Add a per-agent, per-model-family evaluation mode to `scripts/analyze-metrics.py` that groups metrics by the agent’s configured model (from `.claude/agents/{agent}.md` frontmatter) and by a “model family” label (derived from model IDs). -- Extend the logged metrics schema inputs (from the existing `.claude/metrics/agent_metrics.jsonl` and the workflow logs in `.map/workflow_logs/*.json`) to include `model_id`, `model_family`, and `agent_name` for each agent run; ensure these values are captured at orchestration time (where the architecture already tracks metrics under `.claude/metrics/agent_metrics.jsonl`). -- Introduce a new report command: `python scripts/scale_analysis.py --metric monitor_approval_rate --group-by model_family --workflow map-efficient` that outputs family-specific curves such as (a) first-try Monitor approval rate vs. model_family/model_id, and (b) “iterations per subtask” vs. model_family/model_id, using the existing KPIs: Monitor approval rate >80%, Evaluator >7.0/10, iterations <3 per subtask. -- Update model selection guidance in `Customization Guide`/`USAGE` so that when choosing between models for Predictors/Evaluators (already described as `Sonnet` with `Opus` for `DebateArbiter` and the ability to downgrade to `Haiku`), the decision is based on family-specific baselines rather than general parameter-count assumptions; the selection heuristic should use the new family reports to decide “downgrade within same family is safe” vs “cross-family downgrade is risky.” -- Add regression tests for the analytics tooling (not the agents) that validate the grouping logic and curve output determinism using a small fixture dataset that mimics `.claude/metrics/agent_metrics.jsonl` records and workflow-log JSON structure from `.map/workflow_logs/`.json. +- Extend `mapify skill-eval run` with an explicit tier-sweep mode that records `model_tier`, `model_id`, trigger accuracy, negative-trigger false positive rate, timeout count, and per-case latency in the existing eval-run JSONL. +- Add a description-diagnostic report that identifies skills whose positive-trigger accuracy remains below threshold across all tested tiers; route those to description optimization instead of recommending Opus or another stronger routing model. +- Keep routing-model guidance separate from execution-model guidance: skill dispatch reports should recommend the routing tier, while workflow execution metrics should continue to evaluate Actor/Monitor/Predictor/Evaluator outcome quality. +- Extend `mapify skill-eval optimize` acceptance so a description edit must improve held-out trigger accuracy or reduce false positives under the selected routing tier, not merely overfit the train split. +- Update SkillIR or template-maintenance docs with a small model-tier decision table: default routing tier, when to run a fresh sweep, and how to treat all-tier description failures. +- Add regression fixtures that reproduce the source-note pattern: one skill where Sonnet beats Haiku for routing, one where Opus does not improve over Sonnet, and one weak-description skill whose failure persists across tiers until the description changes. ## Address observability and resiliency as critical API NFRs [2604.017] From ede1d2f69672a7e673973b5a4c82f4c451f82515 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Sat, 6 Jun 2026 12:07:24 +0300 Subject: [PATCH 2/4] docs: refresh MAP workflow evaluation plan --- docs/improvement-plan.md | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/docs/improvement-plan.md b/docs/improvement-plan.md index 7c392b9..0e39fe4 100644 --- a/docs/improvement-plan.md +++ b/docs/improvement-plan.md @@ -109,22 +109,44 @@ **Source Idea Reference**: [[model-tier-vs-prompt-for-llm-skill-routing-map-replication]] (source note); [[measuring-the-sensitivity-of-llm-based-structured-extraction-to-prompt-model-and-86e5e4142058]] (paper note) **Implementation Layer**: `src/mapify_cli/skills_eval/`, `src/mapify_cli/templates/skills/*/SKILL.md`, `src/mapify_cli/skill_ir.py`, skill-eval artifacts under `.map/eval-runs//`, and metrics analysis/reporting helpers. -**Missing Capability**: The skill-eval and model-selection loop does not yet separate three decisions that the MAP replication showed behave differently: skill-routing model tier, skill description quality, and execution-agent model tier. -**Architecture Evidence**: `docs/architecture.md` documents `mapify skill-eval run/optimize/view`, durable `.map/eval-runs//` artifacts, optional template patching for optimized skill descriptions, SkillIR/template audits, and per-agent effort/model policy surfaces. -**Benefit Hypothesis**: For skill routing, recording model-tier sweeps and description-quality diagnostics will keep the dispatcher on the cheapest reliable tier for each skill class and prevent wasted model upgrades when the real bottleneck is a weak `description:`. Pass criteria: a held-out skill-eval suite records per-skill Haiku/Sonnet/Opus trigger accuracy and latency, flags descriptions whose accuracy stays below threshold across all tiers, and demonstrates that accepted description edits improve held-out trigger accuracy without increasing negative-trigger false positives. -**Confidence**: 0.68 -**Reasoning**: The MAP replication gives project-specific evidence instead of a generic scaling claim: in skill routing, Haiku underperformed, Sonnet was the best accuracy/latency point overall, Opus did not dominate Sonnet, and weak descriptions such as `map-explain` remained low-accuracy across every tier. Murin's broader extraction result explains the same pattern as "redistribution, not uniform improvement" when model operating points change. This maps directly to MAP because the architecture now owns skill trigger evaluation, optimizer train/test splits, HTML reports, SkillIR checks, and provider template patching. The useful product move is not just "try bigger models"; it is to make skill routing tier, description edits, and execution model choice separately measurable. -**Why Not Already Tried**: The current plan item asked for generic model-family scaling curves. The shipped architecture added skill-eval run/optimize/view, but there is no acceptance contract that stores a tier sweep per skill, detects all-tier description failures before model escalation, or writes model-tier guidance back into skill/template maintenance docs. +**Missing Capability**: The skill-eval and model-selection loop does not yet separate three decisions that the MAP replication showed behave differently: skill-routing model tier, skill description quality, and execution-agent model tier. The newer actor-direct replication adds a second gap: per-agent `model:` assignments in provider skill/agent files are not proven to affect headless execution, so MAP cannot yet guarantee that Actor, Monitor, research, or verifier phases actually run on their intended tier. +**Architecture Evidence**: `docs/architecture.md` documents `mapify skill-eval run/optimize/view`, durable `.map/eval-runs//` artifacts, optional template patching for optimized skill descriptions, SkillIR/template audits, generated provider surfaces, and per-agent effort/model policy surfaces. +**Benefit Hypothesis**: For skill routing, recording model-tier sweeps and description-quality diagnostics will keep the dispatcher on the cheapest reliable tier for each skill class and prevent wasted model upgrades when the real bottleneck is a weak `description:`. For execution, a direct per-phase harness will prove whether configured agent tiers are actually enforced and whether Actor can safely run below Sonnet. Pass criteria: a held-out skill-eval suite records per-skill Haiku/Sonnet/Opus trigger accuracy and latency, flags descriptions whose accuracy stays below threshold across all tiers, and demonstrates that accepted description edits improve held-out trigger accuracy without increasing negative-trigger false positives; a separate actor-direct fixture proves the selected phase model appears in the transcript/artifact and preserves hidden edge-case quality. +**Confidence**: 0.72 +**Reasoning**: The MAP replication gives project-specific evidence instead of a generic scaling claim: in skill routing, Haiku underperformed, Sonnet was the best accuracy/latency point overall, Opus did not dominate Sonnet, and weak descriptions such as `map-explain` remained low-accuracy across every tier. The expanded actor-direct result is even more actionable for MAP: when the agent prompt was invoked directly with `claude -p --append-system-prompt ... --model `, Haiku systematically missed edge cases while Sonnet/Opus were stable, and the source note records that headless skill/subagent dispatch can otherwise mask per-agent model assignments. Murin's broader extraction result explains the same pattern as "redistribution, not uniform improvement" when model operating points change. This maps directly to MAP because the architecture now owns skill trigger evaluation, optimizer train/test splits, HTML reports, SkillIR checks, provider template patching, and generated provider surfaces. The useful product move is not just "try bigger models"; it is to make routing tier, description edits, execution model choice, and enforcement path separately measurable. +**Why Not Already Tried**: The current plan item asked for generic model-family scaling curves. The shipped architecture added skill-eval run/optimize/view, but there is no acceptance contract that stores a tier sweep per skill, detects all-tier description failures before model escalation, writes model-tier guidance back into skill/template maintenance docs, or verifies that generated agent model assignments are honored in the actual headless execution path. ### Proposed Changes - Extend `mapify skill-eval run` with an explicit tier-sweep mode that records `model_tier`, `model_id`, trigger accuracy, negative-trigger false positive rate, timeout count, and per-case latency in the existing eval-run JSONL. - Add a description-diagnostic report that identifies skills whose positive-trigger accuracy remains below threshold across all tested tiers; route those to description optimization instead of recommending Opus or another stronger routing model. - Keep routing-model guidance separate from execution-model guidance: skill dispatch reports should recommend the routing tier, while workflow execution metrics should continue to evaluate Actor/Monitor/Predictor/Evaluator outcome quality. +- Add an actor-direct execution harness that renders the same agent prompt MAP would use, calls the provider runtime with an explicit `--model`/tier override, verifies the selected model in the transcript or run artifact, and evaluates hidden edge-case fixtures before changing default agent tier guidance. +- Add a provider-surface audit that marks per-agent `model:` metadata as advisory unless the corresponding provider path proves enforcement. If enforcement is unavailable, route model-sensitive phases through the direct invocation path instead of relying on dead config. - Extend `mapify skill-eval optimize` acceptance so a description edit must improve held-out trigger accuracy or reduce false positives under the selected routing tier, not merely overfit the train split. -- Update SkillIR or template-maintenance docs with a small model-tier decision table: default routing tier, when to run a fresh sweep, and how to treat all-tier description failures. +- Update SkillIR or template-maintenance docs with a small model-tier decision table: default routing tier, default execution tiers by phase, when to run a fresh sweep, how to verify model enforcement, and how to treat all-tier description failures. - Add regression fixtures that reproduce the source-note pattern: one skill where Sonnet beats Haiku for routing, one where Opus does not improve over Sonnet, and one weak-description skill whose failure persists across tiers until the description changes. +## Protocol-aligned workflow lift benchmark for MAP agent topologies [2606.05670] + +**Source Idea Reference**: [[do-more-agents-help-controlled-and-protocol-aligned-evaluation-of-llm-agent-workflows]] (paper note); [[entropy-based-evaluation-of-ai-agents-a-lightweight-framework-for-measuring-behavioral-patterns]] (paper note) +**Implementation Layer**: `src/mapify_cli/templates/skills/*/SKILL.md`, `.map/scripts/` workflow runners, `.map//` run artifacts, `.map/workflow_logs/`, `token_accounting.json`, and future eval/report helpers next to `src/mapify_cli/skills_eval/`. +**Missing Capability**: MAP can measure skill-trigger routing, token budgets, and per-run artifacts, but it does not yet have a protocol-aligned workflow benchmark that proves when a heavier multi-agent MAP surface beats a cheaper single-agent or lightweight workflow anchor under the same task, tool permissions, answer contract, accounting, and trace schema. +**Architecture Evidence**: `docs/architecture.md` defines MAP as a local workflow/tooling layer for `SPEC -> PLAN -> TEST -> CODE -> REVIEW -> LEARN`, with multiple workflow surfaces (`/map-fast`, `/map-efficient`, `/map-review`, `/map-check`, `/map-skill-eval`), durable `.map//` artifacts, structured workflow logs, token accounting, review/verification gates, and a quality goal of low overhead. It also states that `map-skill-eval` is measurement-only and focused on trigger/cost behavior, not end-to-end workflow lift. +**Benefit Hypothesis**: A MAP workflow-lift benchmark will prevent multi-agent ceremony from becoming default without evidence. Pass criteria: for a fixed repo-task fixture set, the report compares a single-agent/lightweight anchor against selected MAP workflows using the same permissions, validation contract, token accounting, and trace schema; reports success, validation quality, cost, latency, retry count, and trajectory/tool entropy; and keeps or promotes a heavier workflow only when it wins on reviewer-visible quality or failure recovery enough to justify its cost. +**Confidence**: 0.63 +**Reasoning**: BenchAgent's useful mechanism is not its exact benchmarks; it is controlled comparison of agent workflow topologies after normalizing tools, answer contracts, usage accounting, and trajectory logging. EEA adds a low-cost trace layer that can distinguish rigid, over-exploratory, or tool-inefficient behavior when pass/fail is tied. This maps directly to MAP because the architecture already owns workflow prompts, generated hooks/scripts, run artifacts, token accounting, structured logs, and verification gates. The missing product decision is whether `/map-efficient`, `/map-review`, or future multi-agent topologies earn their extra coordination cost against `/map-fast` or a direct provider run on the same task. +**Why Not Already Tried**: Existing MAP evaluation surfaces cover skill triggering, prompt/template validation, token-budget invariants, and selected context experiments. The planning history does not define a same-task workflow benchmark with a matched single-agent anchor, normalized permission/tool profile, shared final-output contract, and trajectory entropy/cost reporting. + +### Proposed Changes + +- Add a small benchmark fixture format for repository tasks with the same input, allowed tools, validation command, expected artifact contract, and hidden quality checks across all workflow variants. +- Implement a workflow runner that can execute at least three variants: direct/single-agent anchor, `/map-fast` or equivalent lightweight MAP path, and one heavier multi-agent workflow such as `/map-efficient` or `/map-review` when applicable. +- Normalize accounting across variants by writing duration, model tier, token/cost estimates, retry count, validation result, and terminal status into one report schema. Reuse `token_accounting.json` and `.map/workflow_logs/` rather than inventing a separate observability path. +- Add trajectory metrics derived from the normalized trace: action/tool entropy, repeated-action rate, validation-loop count, and exploration efficiency. Treat these as diagnostics alongside task success, not as standalone success criteria. +- Add a protocol-alignment checklist to every benchmark run: same task input, same permission profile, same validation command, same final-output contract, same accounting estimator, and no hidden extra tools for the heavier workflow. +- Document a decision rule for workflow defaults: heavier MAP surfaces stay recommended only when the benchmark shows measurable lift in success, review quality, failure recovery, or auditability that outweighs added cost/latency for the target task class. + ## Address observability and resiliency as critical API NFRs [2604.017] From bef6899cb072994e3bc999111f15ce6dafe0b384 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Tue, 9 Jun 2026 22:13:36 +0300 Subject: [PATCH 3/4] fix(token-meter): dedup repeated msg_id within a read window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Claude Code writes one assistant turn as several JSONL lines (one per content/tool_use block), all sharing the same message.id and the same cumulative usage. _iter_new_usage deduped new turns only against the persisted seen_ids, so every repeated line in a read window was logged as a separate token event — roughly doubling est_cost on real sessions (observed: ai-sre token_log 2822 rows / 1409 distinct msg_id -> $993 reported vs ~$415 deduped; gecko-ristra 1260/613 -> $868 vs ~$422). Dedup new_usages by msg_id within the read window, keeping the copy with the most total tokens (the figure the API bills) when a streaming partial and the final line disagree. seen_ids stays as the cross-call safety net. Adds _usage_token_total helper + 3 regression tests (counted-once, keep-most-complete-copy). Rendered to all generated trees. Co-Authored-By: Claude Opus 4.8 (1M context) --- .map/scripts/map_step_runner.py | 49 +++++++++++-- .../templates/map/scripts/map_step_runner.py | 49 +++++++++++-- .../map/scripts/map_step_runner.py.jinja | 49 +++++++++++-- tests/test_map_token_meter.py | 68 +++++++++++++++++++ 4 files changed, 194 insertions(+), 21 deletions(-) diff --git a/.map/scripts/map_step_runner.py b/.map/scripts/map_step_runner.py index aab74a2..b1d6642 100755 --- a/.map/scripts/map_step_runner.py +++ b/.map/scripts/map_step_runner.py @@ -592,6 +592,27 @@ def _int(key: str) -> int: } +def _usage_token_total(usage: Mapping[str, object]) -> int: + """Sum of the four token fields for one usage record. + + Used to pick the most complete copy of a turn when the transcript repeats a + msg_id with diverging usage (a streaming partial vs the final line). + """ + total = 0 + for field in _TOKEN_FIELDS: + value = usage.get(field, 0) + if isinstance(value, bool): + continue + if isinstance(value, (int, float)): + total += int(value) + elif isinstance(value, str): + try: + total += int(value) + except ValueError: + continue + return total + + def _iter_new_usage( transcript_path: Path, seen_ids: set[str], start_offset: int = 0 ) -> tuple[list[dict[str, object]], int]: @@ -601,10 +622,17 @@ def _iter_new_usage( JSONL) so a repeatedly-firing Stop/SubagentStop hook does not re-parse the whole multi-MB file each turn. Returns ``(usages, new_offset)`` where ``new_offset`` advances only past the last COMPLETE line — a partial line - from a concurrent append is left for the next call. ``msg_id`` dedup against - ``seen_ids`` is kept as a safety net (e.g. if the file is rotated and the - offset resets). Entries with an empty msg_id or malformed JSON are skipped; - a missing/unreadable transcript returns ``([], start_offset)``. + from a concurrent append is left for the next call. + + A single assistant turn is written to the transcript as SEVERAL JSONL lines + (one per content / tool_use block) that all share the same ``message.id`` + and the same cumulative ``usage``. Results are deduped by msg_id WITHIN this + read window — keeping the copy with the most total tokens — so a turn's + usage is logged exactly once; without it est_cost roughly doubles. The + persisted ``seen_ids`` is the cross-call safety net (e.g. if the file is + rotated and the offset resets, or a turn straddles two windows). Entries + with an empty msg_id or malformed JSON are skipped; a missing/unreadable + transcript returns ``([], start_offset)``. """ path = Path(transcript_path) try: @@ -629,7 +657,8 @@ def _iter_new_usage( complete = chunk[: last_newline + 1] new_offset = offset + len(complete) - out: list[dict[str, object]] = [] + by_mid: dict[str, dict[str, object]] = {} + order: list[str] = [] for raw in complete.decode("utf-8", errors="replace").splitlines(): raw = raw.strip() if not raw: @@ -644,8 +673,14 @@ def _iter_new_usage( mid = str(usage["msg_id"]) if not mid or mid in seen_ids: continue - out.append(usage) - return out, new_offset + prev = by_mid.get(mid) + if prev is None: + order.append(mid) + by_mid[mid] = usage + elif _usage_token_total(usage) > _usage_token_total(prev): + # Same turn repeated in this window — keep the most complete copy. + by_mid[mid] = usage + return [by_mid[mid] for mid in order], new_offset def _token_meter_cache_path(branch_name: str) -> Path: diff --git a/src/mapify_cli/templates/map/scripts/map_step_runner.py b/src/mapify_cli/templates/map/scripts/map_step_runner.py index aab74a2..b1d6642 100755 --- a/src/mapify_cli/templates/map/scripts/map_step_runner.py +++ b/src/mapify_cli/templates/map/scripts/map_step_runner.py @@ -592,6 +592,27 @@ def _int(key: str) -> int: } +def _usage_token_total(usage: Mapping[str, object]) -> int: + """Sum of the four token fields for one usage record. + + Used to pick the most complete copy of a turn when the transcript repeats a + msg_id with diverging usage (a streaming partial vs the final line). + """ + total = 0 + for field in _TOKEN_FIELDS: + value = usage.get(field, 0) + if isinstance(value, bool): + continue + if isinstance(value, (int, float)): + total += int(value) + elif isinstance(value, str): + try: + total += int(value) + except ValueError: + continue + return total + + def _iter_new_usage( transcript_path: Path, seen_ids: set[str], start_offset: int = 0 ) -> tuple[list[dict[str, object]], int]: @@ -601,10 +622,17 @@ def _iter_new_usage( JSONL) so a repeatedly-firing Stop/SubagentStop hook does not re-parse the whole multi-MB file each turn. Returns ``(usages, new_offset)`` where ``new_offset`` advances only past the last COMPLETE line — a partial line - from a concurrent append is left for the next call. ``msg_id`` dedup against - ``seen_ids`` is kept as a safety net (e.g. if the file is rotated and the - offset resets). Entries with an empty msg_id or malformed JSON are skipped; - a missing/unreadable transcript returns ``([], start_offset)``. + from a concurrent append is left for the next call. + + A single assistant turn is written to the transcript as SEVERAL JSONL lines + (one per content / tool_use block) that all share the same ``message.id`` + and the same cumulative ``usage``. Results are deduped by msg_id WITHIN this + read window — keeping the copy with the most total tokens — so a turn's + usage is logged exactly once; without it est_cost roughly doubles. The + persisted ``seen_ids`` is the cross-call safety net (e.g. if the file is + rotated and the offset resets, or a turn straddles two windows). Entries + with an empty msg_id or malformed JSON are skipped; a missing/unreadable + transcript returns ``([], start_offset)``. """ path = Path(transcript_path) try: @@ -629,7 +657,8 @@ def _iter_new_usage( complete = chunk[: last_newline + 1] new_offset = offset + len(complete) - out: list[dict[str, object]] = [] + by_mid: dict[str, dict[str, object]] = {} + order: list[str] = [] for raw in complete.decode("utf-8", errors="replace").splitlines(): raw = raw.strip() if not raw: @@ -644,8 +673,14 @@ def _iter_new_usage( mid = str(usage["msg_id"]) if not mid or mid in seen_ids: continue - out.append(usage) - return out, new_offset + prev = by_mid.get(mid) + if prev is None: + order.append(mid) + by_mid[mid] = usage + elif _usage_token_total(usage) > _usage_token_total(prev): + # Same turn repeated in this window — keep the most complete copy. + by_mid[mid] = usage + return [by_mid[mid] for mid in order], new_offset def _token_meter_cache_path(branch_name: str) -> Path: diff --git a/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja b/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja index aab74a2..b1d6642 100755 --- a/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja +++ b/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja @@ -592,6 +592,27 @@ def _extract_turn_usage(entry: object) -> Optional[dict[str, object]]: } +def _usage_token_total(usage: Mapping[str, object]) -> int: + """Sum of the four token fields for one usage record. + + Used to pick the most complete copy of a turn when the transcript repeats a + msg_id with diverging usage (a streaming partial vs the final line). + """ + total = 0 + for field in _TOKEN_FIELDS: + value = usage.get(field, 0) + if isinstance(value, bool): + continue + if isinstance(value, (int, float)): + total += int(value) + elif isinstance(value, str): + try: + total += int(value) + except ValueError: + continue + return total + + def _iter_new_usage( transcript_path: Path, seen_ids: set[str], start_offset: int = 0 ) -> tuple[list[dict[str, object]], int]: @@ -601,10 +622,17 @@ def _iter_new_usage( JSONL) so a repeatedly-firing Stop/SubagentStop hook does not re-parse the whole multi-MB file each turn. Returns ``(usages, new_offset)`` where ``new_offset`` advances only past the last COMPLETE line — a partial line - from a concurrent append is left for the next call. ``msg_id`` dedup against - ``seen_ids`` is kept as a safety net (e.g. if the file is rotated and the - offset resets). Entries with an empty msg_id or malformed JSON are skipped; - a missing/unreadable transcript returns ``([], start_offset)``. + from a concurrent append is left for the next call. + + A single assistant turn is written to the transcript as SEVERAL JSONL lines + (one per content / tool_use block) that all share the same ``message.id`` + and the same cumulative ``usage``. Results are deduped by msg_id WITHIN this + read window — keeping the copy with the most total tokens — so a turn's + usage is logged exactly once; without it est_cost roughly doubles. The + persisted ``seen_ids`` is the cross-call safety net (e.g. if the file is + rotated and the offset resets, or a turn straddles two windows). Entries + with an empty msg_id or malformed JSON are skipped; a missing/unreadable + transcript returns ``([], start_offset)``. """ path = Path(transcript_path) try: @@ -629,7 +657,8 @@ def _iter_new_usage( complete = chunk[: last_newline + 1] new_offset = offset + len(complete) - out: list[dict[str, object]] = [] + by_mid: dict[str, dict[str, object]] = {} + order: list[str] = [] for raw in complete.decode("utf-8", errors="replace").splitlines(): raw = raw.strip() if not raw: @@ -644,8 +673,14 @@ def _iter_new_usage( mid = str(usage["msg_id"]) if not mid or mid in seen_ids: continue - out.append(usage) - return out, new_offset + prev = by_mid.get(mid) + if prev is None: + order.append(mid) + by_mid[mid] = usage + elif _usage_token_total(usage) > _usage_token_total(prev): + # Same turn repeated in this window — keep the most complete copy. + by_mid[mid] = usage + return [by_mid[mid] for mid in order], new_offset def _token_meter_cache_path(branch_name: str) -> Path: diff --git a/tests/test_map_token_meter.py b/tests/test_map_token_meter.py index b445a4d..8768c4f 100644 --- a/tests/test_map_token_meter.py +++ b/tests/test_map_token_meter.py @@ -136,3 +136,71 @@ def test_stop_meters_main_transcript_as_orchestrator(tmp_path): assert payload["aggregate"]["input"] == 1300 assert "ST-005" in payload["by_subtask"] assert "orchestrator" in payload["by_agent"] + + +# Claude Code writes ONE assistant turn as several JSONL lines (one per +# content / tool_use block), all sharing the same message.id and the same +# cumulative usage. The meter must count such a turn exactly once. +_REPEATED_TURN = ( + '{"type":"assistant","uuid":"u1a","message":{"role":"assistant","id":"msg_R",' + '"model":"claude-opus-4-7","usage":{"input_tokens":1000,"output_tokens":200,' + '"cache_creation_input_tokens":500,"cache_read_input_tokens":8000}}}\n' + '{"type":"assistant","uuid":"u1b","message":{"role":"assistant","id":"msg_R",' + '"model":"claude-opus-4-7","usage":{"input_tokens":1000,"output_tokens":200,' + '"cache_creation_input_tokens":500,"cache_read_input_tokens":8000}}}\n' + '{"type":"assistant","uuid":"u1c","message":{"role":"assistant","id":"msg_R",' + '"model":"claude-opus-4-7","usage":{"input_tokens":1000,"output_tokens":200,' + '"cache_creation_input_tokens":500,"cache_read_input_tokens":8000}}}\n' +) + + +@pytest.mark.skipif(not SHIPPED_RUNNER.is_file(), reason="shipped runner missing") +def test_repeated_msgid_in_window_counted_once(tmp_path): + """A turn split across 3 JSONL lines (same msg_id) must be metered ONCE. + + Regression: dedup against the persisted seen_ids only let every repeated + line through, doubling/tripling est_cost on real sessions.""" + branch = "feat-meter" + branch_dir = _setup_project(tmp_path, branch) + transcript = tmp_path / "main.jsonl" + transcript.write_text(_REPEATED_TURN) + + result = _run_hook(json.dumps({"transcript_path": str(transcript)}), tmp_path) + assert result.returncode == 0 + + payload = json.loads((branch_dir / "token_accounting.json").read_text()) + agg = payload["aggregate"] + assert agg["input"] == 1000, "repeated msg_id counted >1x (input)" + assert agg["output"] == 200, "repeated msg_id counted >1x (output)" + assert agg["cache_read"] == 8000, "repeated msg_id counted >1x (cache_read)" + assert payload["event_count"] == 1, "one logical turn must be one event" + # token_log holds exactly one row for the turn. + rows = [ + line for line in (branch_dir / "token_log.jsonl").read_text().splitlines() if line.strip() + ] + assert len(rows) == 1 + + +@pytest.mark.skipif(not SHIPPED_RUNNER.is_file(), reason="shipped runner missing") +def test_repeated_msgid_keeps_most_complete_copy(tmp_path): + """When repeated lines for one msg_id disagree (a streaming partial vs the + final line), the meter keeps the copy with the most total tokens.""" + branch = "feat-meter" + branch_dir = _setup_project(tmp_path, branch) + transcript = tmp_path / "main.jsonl" + transcript.write_text( + # Partial line first (small usage), then the final cumulative line. + '{"type":"assistant","uuid":"p1","message":{"role":"assistant","id":"msg_P",' + '"model":"claude-opus-4-7","usage":{"input_tokens":100,"output_tokens":10,' + '"cache_creation_input_tokens":0,"cache_read_input_tokens":0}}}\n' + '{"type":"assistant","uuid":"p2","message":{"role":"assistant","id":"msg_P",' + '"model":"claude-opus-4-7","usage":{"input_tokens":100,"output_tokens":200,' + '"cache_creation_input_tokens":500,"cache_read_input_tokens":8000}}}\n' + ) + + result = _run_hook(json.dumps({"transcript_path": str(transcript)}), tmp_path) + assert result.returncode == 0 + + agg = json.loads((branch_dir / "token_accounting.json").read_text())["aggregate"] + assert agg["output"] == 200, "must keep the most complete copy, not the partial" + assert agg["cache_read"] == 8000 From 9ba09b9b6042bd6f775febde526f3a3ca8b860bc Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Tue, 9 Jun 2026 22:24:18 +0300 Subject: [PATCH 4/4] fix(token-meter): dedup by msg_id at rollup too; self-heal old logs _rebuild_token_accounting summed every token_log row, so logs already written by the pre-fix runner (one turn split across rows) still rolled up to ~2x cost even after the write-time dedup. Dedup by msg_id at rollup as well (keep the most-complete copy per turn); event_count now reports distinct turns. Extracts _coerce_token_int helper (shared with _usage_token_total). Adds 2 regression tests (write-path repeat, rollup of a dup-containing log). Co-Authored-By: Claude Opus 4.8 (1M context) --- .map/scripts/map_step_runner.py | 66 +++++++++++++------ .../templates/map/scripts/map_step_runner.py | 66 +++++++++++++------ .../map/scripts/map_step_runner.py.jinja | 66 +++++++++++++------ tests/test_map_step_runner.py | 59 +++++++++++++++++ 4 files changed, 197 insertions(+), 60 deletions(-) diff --git a/.map/scripts/map_step_runner.py b/.map/scripts/map_step_runner.py index b1d6642..832a7b4 100755 --- a/.map/scripts/map_step_runner.py +++ b/.map/scripts/map_step_runner.py @@ -592,25 +592,27 @@ def _int(key: str) -> int: } +def _coerce_token_int(value: object) -> int: + """Best-effort int from a token field that may be int / float / str / None.""" + if isinstance(value, bool): + return 0 + if isinstance(value, (int, float)): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return 0 + return 0 + + def _usage_token_total(usage: Mapping[str, object]) -> int: """Sum of the four token fields for one usage record. Used to pick the most complete copy of a turn when the transcript repeats a msg_id with diverging usage (a streaming partial vs the final line). """ - total = 0 - for field in _TOKEN_FIELDS: - value = usage.get(field, 0) - if isinstance(value, bool): - continue - if isinstance(value, (int, float)): - total += int(value) - elif isinstance(value, str): - try: - total += int(value) - except ValueError: - continue - return total + return sum(_coerce_token_int(usage.get(field, 0)) for field in _TOKEN_FIELDS) def _iter_new_usage( @@ -824,7 +826,11 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] Groups by subtask, agent, and phase, plus an aggregate carrying ``cache_hit_ratio`` (cache_read / (input + cache_read)) and - ``est_cost_usd``. Returns the written payload. + ``est_cost_usd``. Rows are deduped by msg_id (keeping the most complete + copy) before rollup, so a log written by an older runner — one assistant + turn split across several rows — still produces a correct total instead of + a doubled one. ``event_count`` is therefore the number of distinct turns. + Returns the written payload. """ branch_name = _sanitize_branch(branch) if branch else get_branch_name() log_path = get_branch_dir(branch_name) / TOKEN_LOG_NAME @@ -840,6 +846,14 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] lines = log_path.read_text(encoding="utf-8").splitlines() except (OSError, UnicodeDecodeError): lines = [] + # One assistant turn can occupy several token_log rows (Claude Code + # writes one JSONL line per content/tool_use block, all sharing a + # msg_id). Logs written before the write-time dedup landed still hold + # those repeats, so collapse by msg_id here too — keep the row with the + # most total tokens (the figure the API bills) — and stay correct. + deduped: dict[str, dict[str, object]] = {} + order: list[str] = [] + anon = 0 for raw in lines: raw = raw.strip() if not raw: @@ -850,14 +864,26 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] continue if not isinstance(row, dict): continue + mid = str(row.get("msg_id") or "") + if not mid: + key = f"__anon_{anon}" + anon += 1 + else: + key = mid + prev = deduped.get(key) + if prev is None: + order.append(key) + deduped[key] = row + elif _usage_token_total(row) > _usage_token_total(prev): + deduped[key] = row + + for key in order: + row = deduped[key] event_count += 1 model = str(row.get("model") or "") - usage: dict[str, int] = {} - for field in _TOKEN_FIELDS: - try: - usage[field] = int(row.get(field, 0) or 0) - except (TypeError, ValueError): - usage[field] = 0 + usage: dict[str, int] = { + field: _coerce_token_int(row.get(field, 0)) for field in _TOKEN_FIELDS + } row_cost = _token_cost(usage, model) total_cost += row_cost for dim_key, dim in ( diff --git a/src/mapify_cli/templates/map/scripts/map_step_runner.py b/src/mapify_cli/templates/map/scripts/map_step_runner.py index b1d6642..832a7b4 100755 --- a/src/mapify_cli/templates/map/scripts/map_step_runner.py +++ b/src/mapify_cli/templates/map/scripts/map_step_runner.py @@ -592,25 +592,27 @@ def _int(key: str) -> int: } +def _coerce_token_int(value: object) -> int: + """Best-effort int from a token field that may be int / float / str / None.""" + if isinstance(value, bool): + return 0 + if isinstance(value, (int, float)): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return 0 + return 0 + + def _usage_token_total(usage: Mapping[str, object]) -> int: """Sum of the four token fields for one usage record. Used to pick the most complete copy of a turn when the transcript repeats a msg_id with diverging usage (a streaming partial vs the final line). """ - total = 0 - for field in _TOKEN_FIELDS: - value = usage.get(field, 0) - if isinstance(value, bool): - continue - if isinstance(value, (int, float)): - total += int(value) - elif isinstance(value, str): - try: - total += int(value) - except ValueError: - continue - return total + return sum(_coerce_token_int(usage.get(field, 0)) for field in _TOKEN_FIELDS) def _iter_new_usage( @@ -824,7 +826,11 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] Groups by subtask, agent, and phase, plus an aggregate carrying ``cache_hit_ratio`` (cache_read / (input + cache_read)) and - ``est_cost_usd``. Returns the written payload. + ``est_cost_usd``. Rows are deduped by msg_id (keeping the most complete + copy) before rollup, so a log written by an older runner — one assistant + turn split across several rows — still produces a correct total instead of + a doubled one. ``event_count`` is therefore the number of distinct turns. + Returns the written payload. """ branch_name = _sanitize_branch(branch) if branch else get_branch_name() log_path = get_branch_dir(branch_name) / TOKEN_LOG_NAME @@ -840,6 +846,14 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] lines = log_path.read_text(encoding="utf-8").splitlines() except (OSError, UnicodeDecodeError): lines = [] + # One assistant turn can occupy several token_log rows (Claude Code + # writes one JSONL line per content/tool_use block, all sharing a + # msg_id). Logs written before the write-time dedup landed still hold + # those repeats, so collapse by msg_id here too — keep the row with the + # most total tokens (the figure the API bills) — and stay correct. + deduped: dict[str, dict[str, object]] = {} + order: list[str] = [] + anon = 0 for raw in lines: raw = raw.strip() if not raw: @@ -850,14 +864,26 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] continue if not isinstance(row, dict): continue + mid = str(row.get("msg_id") or "") + if not mid: + key = f"__anon_{anon}" + anon += 1 + else: + key = mid + prev = deduped.get(key) + if prev is None: + order.append(key) + deduped[key] = row + elif _usage_token_total(row) > _usage_token_total(prev): + deduped[key] = row + + for key in order: + row = deduped[key] event_count += 1 model = str(row.get("model") or "") - usage: dict[str, int] = {} - for field in _TOKEN_FIELDS: - try: - usage[field] = int(row.get(field, 0) or 0) - except (TypeError, ValueError): - usage[field] = 0 + usage: dict[str, int] = { + field: _coerce_token_int(row.get(field, 0)) for field in _TOKEN_FIELDS + } row_cost = _token_cost(usage, model) total_cost += row_cost for dim_key, dim in ( diff --git a/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja b/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja index b1d6642..832a7b4 100755 --- a/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja +++ b/src/mapify_cli/templates_src/map/scripts/map_step_runner.py.jinja @@ -592,25 +592,27 @@ def _extract_turn_usage(entry: object) -> Optional[dict[str, object]]: } +def _coerce_token_int(value: object) -> int: + """Best-effort int from a token field that may be int / float / str / None.""" + if isinstance(value, bool): + return 0 + if isinstance(value, (int, float)): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return 0 + return 0 + + def _usage_token_total(usage: Mapping[str, object]) -> int: """Sum of the four token fields for one usage record. Used to pick the most complete copy of a turn when the transcript repeats a msg_id with diverging usage (a streaming partial vs the final line). """ - total = 0 - for field in _TOKEN_FIELDS: - value = usage.get(field, 0) - if isinstance(value, bool): - continue - if isinstance(value, (int, float)): - total += int(value) - elif isinstance(value, str): - try: - total += int(value) - except ValueError: - continue - return total + return sum(_coerce_token_int(usage.get(field, 0)) for field in _TOKEN_FIELDS) def _iter_new_usage( @@ -824,7 +826,11 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] Groups by subtask, agent, and phase, plus an aggregate carrying ``cache_hit_ratio`` (cache_read / (input + cache_read)) and - ``est_cost_usd``. Returns the written payload. + ``est_cost_usd``. Rows are deduped by msg_id (keeping the most complete + copy) before rollup, so a log written by an older runner — one assistant + turn split across several rows — still produces a correct total instead of + a doubled one. ``event_count`` is therefore the number of distinct turns. + Returns the written payload. """ branch_name = _sanitize_branch(branch) if branch else get_branch_name() log_path = get_branch_dir(branch_name) / TOKEN_LOG_NAME @@ -840,6 +846,14 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] lines = log_path.read_text(encoding="utf-8").splitlines() except (OSError, UnicodeDecodeError): lines = [] + # One assistant turn can occupy several token_log rows (Claude Code + # writes one JSONL line per content/tool_use block, all sharing a + # msg_id). Logs written before the write-time dedup landed still hold + # those repeats, so collapse by msg_id here too — keep the row with the + # most total tokens (the figure the API bills) — and stay correct. + deduped: dict[str, dict[str, object]] = {} + order: list[str] = [] + anon = 0 for raw in lines: raw = raw.strip() if not raw: @@ -850,14 +864,26 @@ def _rebuild_token_accounting(branch: Optional[str] = None) -> dict[str, object] continue if not isinstance(row, dict): continue + mid = str(row.get("msg_id") or "") + if not mid: + key = f"__anon_{anon}" + anon += 1 + else: + key = mid + prev = deduped.get(key) + if prev is None: + order.append(key) + deduped[key] = row + elif _usage_token_total(row) > _usage_token_total(prev): + deduped[key] = row + + for key in order: + row = deduped[key] event_count += 1 model = str(row.get("model") or "") - usage: dict[str, int] = {} - for field in _TOKEN_FIELDS: - try: - usage[field] = int(row.get(field, 0) or 0) - except (TypeError, ValueError): - usage[field] = 0 + usage: dict[str, int] = { + field: _coerce_token_int(row.get(field, 0)) for field in _TOKEN_FIELDS + } row_cost = _token_cost(usage, model) total_cost += row_cost for dim_key, dim in ( diff --git a/tests/test_map_step_runner.py b/tests/test_map_step_runner.py index c17c096..61e9f28 100644 --- a/tests/test_map_step_runner.py +++ b/tests/test_map_step_runner.py @@ -6713,6 +6713,65 @@ def test_incremental_offset_captures_only_new_turns(self, branch_workspace): ] assert len(log_rows) == 3 + def test_record_dedups_repeated_msgid_in_window(self, branch_workspace): + """One assistant turn written as 3 JSONL lines (same msg_id, as Claude + Code does per content/tool block) is ONE event — not three. Regression + for the ~2x est_cost inflation.""" + repo = branch_workspace.parents[1] + transcript = repo / "tr.jsonl" + line = ( + '{"type":"assistant","uuid":"%s","message":{"role":"assistant","id":"msg_R",' + '"model":"claude-opus-4-7","usage":{"input_tokens":1000,"output_tokens":200,' + '"cache_creation_input_tokens":500,"cache_read_input_tokens":8000}}}\n' + ) + transcript.write_text((line % "a") + (line % "b") + (line % "c")) + self._state(branch_workspace) + + result = map_step_runner.record_token_event( + "test-branch", transcript_path=str(transcript) + ) + assert result["recorded"] == 1 + assert result["input"] == 1000 and result["output"] == 200 + rows = [ + r + for r in (branch_workspace / "token_log.jsonl").read_text().splitlines() + if r.strip() + ] + assert len(rows) == 1, "repeated msg_id must be logged once" + acct = json.loads((branch_workspace / "token_accounting.json").read_text()) + assert acct["aggregate"]["input"] == 1000 + assert acct["event_count"] == 1 + + def test_rebuild_dedups_dup_rows_in_existing_log(self, branch_workspace): + """A token_log written by an older runner (one turn duplicated across + rows) still rolls up to a single correct total — rebuild dedups by + msg_id and keeps the most complete copy of each turn.""" + base = { + "ts": "2026-01-01T00:00:00Z", + "subtask_id": "ST-003", + "phase": "ACTOR", + "agent": "actor", + "model": "claude-opus-4-7", + "msg_id": "msg_dup", + "input": 1000, + "output": 200, + "cache_creation": 500, + "cache_read": 8000, + } + partial = {**base, "output": 10, "cache_creation": 0, "cache_read": 0} + other = {**base, "msg_id": "msg_other", "output": 50} + (branch_workspace / "token_log.jsonl").write_text( + "\n".join(json.dumps(r) for r in (partial, base, base, other)) + "\n" + ) + + payload = map_step_runner._rebuild_token_accounting("test-branch") + + assert payload["event_count"] == 2, "two distinct msg_ids, not four rows" + agg = payload["aggregate"] + assert agg["input"] == 2000 # msg_dup 1000 + msg_other 1000 + assert agg["output"] == 250, "msg_dup kept at output 200 (not the partial 10)" + assert agg["cache_read"] == 16000 + def test_explicit_branch_is_sanitized_against_path_traversal( self, branch_workspace ):