From 0ad8cd30f0b64ac6c602d6811bfadbb7e5f65e97 Mon Sep 17 00:00:00 2001 From: Jaime Cernuda Date: Sat, 23 May 2026 23:47:41 -0500 Subject: [PATCH] feat(agent): record hierarchical handoff evidence --- TASK.md | 3 + docs/HIERARCHICAL_STRESS_BENCHMARK_PLAN.md | 21 +- src/clio_agent/agent.py | 180 +++++++++++++++++- src/clio_agent/gact/app.py | 6 + src/clio_agent/harness.py | 65 +++++++ tests/test_core/test_agent_dispatch.py | 51 +++++ tests/test_gact/test_tools_called.py | 36 ++++ .../test_local_scientific_workflows.py | 17 +- 8 files changed, 365 insertions(+), 14 deletions(-) diff --git a/TASK.md b/TASK.md index 5dc5bbb..bee03e4 100644 --- a/TASK.md +++ b/TASK.md @@ -10,6 +10,9 @@ - [x] Nested expert framework slice: registry capabilities now support `parent_id`, `source`, and planner visibility; NDP catalog tools are owned by a tier-3 `ndp_catalog` child under `data`; SAC archive/statistics/plotting tools are owned by a tier-3 `sac_format` child under `analysis`; GACT `/v1/agents?tier=3` exposes these rows so the TUI can render hierarchy/details. - [x] Nested expert execution slice: extracted executable `NDPExpert` and `SACFormatExpert` classes with their own tool filters/result metadata, registered them as real child agents, and made Data/Analysis delegate to those child experts instead of directly owning NDP/SAC tool lists. Caveat: this is still one-process/shared-model execution; the long-term target remains independently configurable/finetunable child models and a more generic multi-level traversal policy. - [ ] Architecture warning for upcoming hidden user tests: do not mark benchmark-specific prompt/tool tuning as success. The framework must pass unseen scientific tasks by using generic registered experts, tool tags, artifact contracts, and visible handoffs. Avoid flat namespaces and route strings that only work for one named benchmark. The desired traversal is hierarchical: orchestrator -> data manager -> provider-specific discovery experts such as NDP/EarthScope/local search -> staging/download utility -> format-specific experts such as SAC -> analysis -> visualization. SAC tools should be a narrowly named SAC MCP/tool surface, not a generic seismic catch-all; NDP should become a nested expert with its own prompt/context and future model boundary. Benchmark prompts should be inspired by real scientific workflows, including clio-kit/NDP/EarthScope style tasks, but implementation must remain scalable and reusable. + - [ ] Compaction anchor for the next benchmark/hierarchy pass: the user will test hidden scientific tasks, so success requires a generic hierarchy, not a demo-tuned path. The target pattern is `orchestrator -> data -> ndp_catalog/EarthScope/local_search -> utility staging/download -> analysis -> sac_format or other format experts -> visualization`, with each expert receiving scoped context and returning explicit evidence/artifacts/errors. `data` should coordinate discovery and staging rather than directly owning every provider's domain semantics; `ndp_catalog` should live under `data` with NDP-specific prompt/context/tools and eventually an independent model boundary. A seismic workflow should be represented through format/provider specialists such as a SAC MCP/expert, not a broad hardcoded `seismic_server` that exists only for one benchmark. + - [ ] Benchmark design rule: complex prompts should be natural scientific requests, not instructions that spell out CLIO internals such as "spawn nanoagents" or "use NDP then SAC". Good benchmark cases should force the system to infer delegation, issue multiple tool calls, use nested experts, stage or reject real data, pass discoveries across expert boundaries, produce plots when appropriate, and surface failures honestly. Include stressors that mocks miss: parallel calls, many tool events, large/bad files, context pressure/compaction, provider/model swaps during active work, unavailable resources, permission boundaries, and hidden tool-ownership mistakes. + - [ ] NDP/EarthScope direction: prioritize real scientific data workflows, especially EarthScope/NDP-style seismic discovery. A representative target is: find a relevant seismic dataset through NDP/EarthScope, inspect candidate resources, stage a bounded waveform file, analyze three-component or SAC traces through a format-specific expert, and visualize the result. `clio-kit` should be treated as a reference for provider semantics and MCP integration, but CLIO should expose reusable expert/tool boundaries instead of cloning benchmark-specific prompts or routes. - [x] No-guard cross-file routing could still fail after a correct planner attempt. ALCF/gpt-oss selected/attempted `analysis` for the four-file triage, but `_expert_file_compatibility_error()` rejected the analysis coordinator because the current-file context held only the first HDF5 path and ignored `coordinated_file_suffixes`. The compatibility check now evaluates question + file context together and allows registered coordinator experts for natural multi-file bundles. Evidence: first 14-case demo run failed `reasoning_cross_file_triage_nanoagents` with repeated compatibility errors; focused unit coverage added; rerun passed with `route_source=dspy`, selected `analysis`, six tool calls, and four tier-3 child sessions. - [x] Natural HDF5 dataset prompts required tool-shaped wording. A prompt like "Focus on plasma/electron_temperature..." should call `hdf5_analyze_dataset` without the user naming the tool. DataExpert now treats named dataset paths plus natural focus/chunk/statistics language as dataset-level analysis. Evidence: `test_natural_dataset_focus_uses_dataset_tool` and ALCF demo case `hdf5_dataset_focus` passed with `hdf5_analyze_dataset`. - [x] Memory demo prompt was too weak and allowed a chat answer with no fresh evidence. The demo runner now asks CLIO to compute schema/statistics while relying on the prior Parquet path, producing a real memory + tool-use case. Evidence: first run failed `workflow_memory_followup` as `chat` with no tools; rerun passed as `analysis` with `parquet_analyze_schema` plus five `parquet_compute_statistics` calls. diff --git a/docs/HIERARCHICAL_STRESS_BENCHMARK_PLAN.md b/docs/HIERARCHICAL_STRESS_BENCHMARK_PLAN.md index 5b87ae0..a2aa366 100644 --- a/docs/HIERARCHICAL_STRESS_BENCHMARK_PLAN.md +++ b/docs/HIERARCHICAL_STRESS_BENCHMARK_PLAN.md @@ -94,17 +94,21 @@ Current implementation evidence: `sac_compute_trace_statistics`, and `sac_plot_traces`. - The format tool surface is deliberately SAC-specific. It is exposed as a `sac` FastMCP server with `sac_*` tools, not as a generic seismic namespace. +- CLIO now emits `expert_handoffs` metadata on GACT assistant messages and the + stress audit log. This is required evidence for staged workflows whose final + public route is `visualization` but whose work actually traversed `data`, + `ndp_catalog`, `analysis`, `sac_format`, and `visualization`. - Caveat: the completed staged waveform demo is SAC archive based. The original Salton Sea three-component MiniSEED path remains a future target because the discovered OSDF resource is large and requires a bounded Pelican/object selection path. -- Architecture caveat: this implementation proves data-owned NDP discovery, but - NDP semantics still live inside the top-level DataExpert. The intended CLIO - hierarchy is `data -> ndp_catalog` or `data -> ndp_access`, where the nested - NDP expert owns NDP-specific prompt context, tools, dataset/resource ranking, - and eventually its own tuned model. Future benchmarks should include - EarthScope-oriented prompts and verify that NDP work is delegated to that - nested expert rather than handled directly by DataExpert. +- Architecture caveat: `ndp_catalog` and `sac_format` are now executable nested + experts under `data` and `analysis`, respectively, but they still run in the + same process and usually share the same provider/model. The intended long-term + CLIO hierarchy keeps their prompt/context/tool surfaces separate and eventually + lets each nested expert use its own tuned model. Future benchmarks should + include EarthScope-oriented prompts and verify that NDP work is delegated to + `ndp_catalog` rather than handled directly by DataExpert. ### 2. Mixed Scientific Run Audit @@ -233,7 +237,8 @@ Every benchmark run should save: - Prompt and scenario ID. - Provider/model/context settings. - Route decision and route source. -- Expert handoff graph. +- Expert handoff graph from `metadata.expert_handoffs`; final selected route is + not sufficient evidence for hierarchy. - Per-expert context summary. - Tool calls with arguments, results, errors, and duration. - Child/nanoagent sessions and their status. diff --git a/src/clio_agent/agent.py b/src/clio_agent/agent.py index 5e947cb..1d024cc 100644 --- a/src/clio_agent/agent.py +++ b/src/clio_agent/agent.py @@ -29,7 +29,7 @@ from collections.abc import Mapping from contextlib import contextmanager from pathlib import Path -from typing import Any, Callable, Dict, Iterator, List +from typing import Any, Callable, Dict, Iterator, List, Literal import dspy @@ -556,7 +556,7 @@ def forward( ) trace.route = route success = True - if selected in ("data", "analysis", "visualization") and error_info is None: + if selected not in SPECIAL_ROUTE_TARGETS and error_info is None: error_info = self._tool_error_info_from_trace(selected, trace) if error_info and not error_info.get("details", {}).get("partial", False): success = False @@ -587,7 +587,7 @@ def forward( # Step 4b: Store tier-2 expert invocation for optimizer training data expert_duration_ms = (time.time() - start_time) * 1000 nanoagents_spawned = self._extract_nanoagents_spawned(expert_result) - if selected in ("data", "analysis", "visualization"): + if selected not in SPECIAL_ROUTE_TARGETS: self._store_expert_invocation( question=self._question_with_session_file(question, active_file), file_context=file_context, @@ -621,6 +621,7 @@ def forward( answer=answer, selected_expert=selected, tools_called=[tool.to_arc_tool_call() for tool in trace.tools], + expert_handoffs=[handoff.to_dict() for handoff in trace.expert_handoffs], file_diffs=self._file_diffs_from_trace( trace, edit_mode=session_edit_mode, @@ -1411,9 +1412,19 @@ def _dispatch_expert_action( try: with dspy.context(lm=self._main_lm, adapter=self._dspy_adapter): if dispatch_id == "data": + started = time.time() expert_result = self.data_expert( question=expert_question, file_context=file_context ) + self._record_expert_handoff( + trace, + expert_id=expert_id, + dispatch_target=dispatch_id, + stage="planner_dispatch", + input_summary=expert_question, + result=expert_result, + duration_ms=(time.time() - started) * 1000, + ) self._merge_expert_provenance(trace, expert_result) answer = ( f"{expert_result.analysis}\n\n" @@ -1431,10 +1442,20 @@ def _dispatch_expert_action( return expert_id, answer, expert_result, None if dispatch_id == "analysis": + started = time.time() expert_result = self.analysis_expert( question=expert_question, file_context=file_context, ) + self._record_expert_handoff( + trace, + expert_id=expert_id, + dispatch_target=dispatch_id, + stage="planner_dispatch", + input_summary=expert_question, + result=expert_result, + duration_ms=(time.time() - started) * 1000, + ) self._merge_expert_provenance(trace, expert_result) answer = ( f"{expert_result.analysis}\n\n" @@ -1443,10 +1464,20 @@ def _dispatch_expert_action( return expert_id, answer, expert_result, None if dispatch_id == "ndp_catalog": + started = time.time() expert_result = self.ndp_catalog_expert( question=expert_question, file_context=file_context, ) + self._record_expert_handoff( + trace, + expert_id=expert_id, + dispatch_target=dispatch_id, + stage="planner_dispatch", + input_summary=expert_question, + result=expert_result, + duration_ms=(time.time() - started) * 1000, + ) self._merge_expert_provenance(trace, expert_result) answer = ( f"{expert_result.analysis}\n\n" @@ -1464,10 +1495,20 @@ def _dispatch_expert_action( return expert_id, answer, expert_result, None if dispatch_id == "sac_format": + started = time.time() expert_result = self.sac_format_expert( question=expert_question, file_context=file_context, ) + self._record_expert_handoff( + trace, + expert_id=expert_id, + dispatch_target=dispatch_id, + stage="planner_dispatch", + input_summary=expert_question, + result=expert_result, + duration_ms=(time.time() - started) * 1000, + ) self._merge_expert_provenance(trace, expert_result) answer = ( f"{expert_result.analysis}\n\n" @@ -1489,10 +1530,20 @@ def _dispatch_expert_action( ).to_dict(), ) + started = time.time() expert_result = self.visualization_expert( question=expert_question, file_context=file_context, ) + self._record_expert_handoff( + trace, + expert_id=expert_id, + dispatch_target=dispatch_id, + stage="planner_dispatch", + input_summary=expert_question, + result=expert_result, + duration_ms=(time.time() - started) * 1000, + ) self._merge_expert_provenance(trace, expert_result) description = self._coerce_text( getattr(expert_result, "visualization_description", "") @@ -1508,6 +1559,15 @@ def _dispatch_expert_action( except CancellationError: raise except Exception as exc: + self._record_expert_handoff( + trace, + expert_id=expert_id, + dispatch_target=dispatch_id, + stage="planner_dispatch", + input_summary=expert_question, + status="failure", + error=str(exc), + ) error = ExpertError( f"The {expert_id} expert encountered an issue processing your request.", details=self._recovery_details( @@ -1568,10 +1628,20 @@ def _continue_data_handoffs( f"Analyze the staged waveform/data file {staged_path}. " "Compute grounded statistics and do not invent unsupported format details." ) + started = time.time() analysis_result = self.analysis_expert( question=analysis_question, file_context=downstream_context, ) + self._record_expert_handoff( + trace, + expert_id="analysis", + dispatch_target="analysis", + stage="data_handoff_analysis", + input_summary=analysis_question, + result=analysis_result, + duration_ms=(time.time() - started) * 1000, + ) self._merge_expert_provenance(trace, analysis_result) analysis_text = self._coerce_text(getattr(analysis_result, "analysis", "")).strip() recommendations = self._coerce_text( @@ -1590,10 +1660,20 @@ def _continue_data_handoffs( f"Plot representative traces or numeric series from {staged_path}. " "Return the output artifact path and surface any plotting failure." ) + started = time.time() visualization_result = self.visualization_expert( question=visualization_question, file_context=downstream_context, ) + self._record_expert_handoff( + trace, + expert_id="visualization", + dispatch_target="visualization", + stage="data_handoff_visualization", + input_summary=visualization_question, + result=visualization_result, + duration_ms=(time.time() - started) * 1000, + ) self._merge_expert_provenance(trace, visualization_result) description = self._coerce_text( getattr(visualization_result, "visualization_description", "") @@ -2879,6 +2959,100 @@ def _merge_expert_provenance(trace: RunTrace, expert_result: Any) -> None: if hasattr(observation, "to_arc_tool_call"): trace.tools.append(observation) + def _record_expert_handoff( + self, + trace: RunTrace, + *, + expert_id: str, + dispatch_target: str, + stage: str, + input_summary: str, + result: Any | None = None, + status: Literal["success", "failure"] = "success", + duration_ms: float = 0.0, + error: str | None = None, + ) -> None: + """Record an expert-stage handoff without relying on final route labels.""" + metadata = self._expert_result_metadata(result) + output_summary = self._expert_result_summary(result) + parent_id = self._registered_parent_id(expert_id) + trace.record_expert_handoff( + agent_id=expert_id, + parent_id=parent_id, + dispatch_target=dispatch_target, + stage=stage, + status=status, + input_summary=self._compact_handoff_text(input_summary), + output_summary=output_summary, + duration_ms=duration_ms, + error=error, + metadata=metadata, + ) + + reported_expert = self._coerce_text(metadata.get("expert")).strip().lower() + if not reported_expert or reported_expert == expert_id: + return + if reported_expert in {row.agent_id for row in trace.expert_handoffs}: + return + reported_parent = self._coerce_text(metadata.get("parent_expert")).strip().lower() + trace.record_expert_handoff( + agent_id=reported_expert, + parent_id=reported_parent or expert_id, + dispatch_target=reported_expert, + stage=f"{stage}_child", + status=status, + input_summary=self._compact_handoff_text(input_summary), + output_summary=output_summary, + duration_ms=duration_ms, + error=error, + metadata={ + **metadata, + "observed_through": expert_id, + }, + ) + + def _registered_parent_id(self, expert_id: str) -> str | None: + """Return a registered parent ID for an expert, if one exists.""" + caps = self.registry.get_capabilities(expert_id) + if caps is None: + return None + return caps.parent_id + + @staticmethod + def _expert_result_metadata(result: Any | None) -> dict[str, Any]: + """Return JSON-like metadata from a native expert result.""" + metadata = getattr(result, "metadata", None) + if isinstance(metadata, Mapping): + return dict(metadata) + return {} + + @classmethod + def _expert_result_summary(cls, result: Any | None) -> str: + """Return a compact human-readable expert output summary.""" + if result is None: + return "" + candidates = ( + getattr(result, "analysis", ""), + getattr(result, "visualization_description", ""), + getattr(result, "answer", ""), + ) + for candidate in candidates: + text = cls._coerce_text(candidate).strip() + if text: + return cls._compact_handoff_text(text) + file_path = cls._coerce_text(getattr(result, "file_path", "")).strip() + if file_path: + return cls._compact_handoff_text(f"Artifact: {file_path}") + return "" + + @staticmethod + def _compact_handoff_text(text: str, *, limit: int = 500) -> str: + """Compact one handoff field for durable metadata.""" + normalized = " ".join(str(text).split()) + if len(normalized) <= limit: + return normalized + return normalized[: limit - 15].rstrip() + "...[truncated]" + def _run_chat_agent(self, question: str, session_context: str) -> str: """Generate a conversational reply through DSPy/LiteLLM.""" self._raise_if_cancelled("chat_before") diff --git a/src/clio_agent/gact/app.py b/src/clio_agent/gact/app.py index 41c96da..fded787 100644 --- a/src/clio_agent/gact/app.py +++ b/src/clio_agent/gact/app.py @@ -927,6 +927,7 @@ async def _run_turn_in_background( route_reason = "" execution_path = "" tools_called: list[dict[str, Any]] = [] + expert_handoffs: list[dict[str, Any]] = [] proposed_diffs: list[Any] = [] nanoagents: list[Any] = [] thinking_text = "" @@ -1197,6 +1198,9 @@ async def _await_turn_work(awaitable: Any) -> Any: # branches not yet migrated). execution_path = getattr(pred, "execution_path", "") or "" tools_called = _extract_tools_called(pred) + raw_handoffs = getattr(pred, "expert_handoffs", None) or [] + if isinstance(raw_handoffs, list): + expert_handoffs = [dict(row) for row in raw_handoffs if isinstance(row, dict)] # Drain the per-session observer ledger so direct-tool short- # circuits (HDF5/Parquet/fs experts that bypass ReAct) still # report tools_called on the assistant message metadata. @@ -1529,6 +1533,8 @@ def getf(k, default=None, _r=row): part.metadata["stream_fallback"] = stream_fallback if tools_called: assistant_metadata["tools_called"] = tools_called + if expert_handoffs: + assistant_metadata["expert_handoffs"] = expert_handoffs # iowarp/clio-agent#6: when streaming actually emitted chunks, # reuse its message_id + part_id so the deltas + final # message line up. Otherwise mint a fresh id (existing path). diff --git a/src/clio_agent/harness.py b/src/clio_agent/harness.py index da4ada8..3334acf 100644 --- a/src/clio_agent/harness.py +++ b/src/clio_agent/harness.py @@ -102,6 +102,40 @@ def to_arc_tool_call(self) -> ToolCall: ) +@dataclass(frozen=True) +class ExpertHandoff: + """A concrete expert or child-expert invocation observed during one CLIO run.""" + + agent_id: str + parent_id: str | None + dispatch_target: str + stage: str + status: Literal["success", "failure"] + input_summary: str + output_summary: str = "" + duration_ms: float = 0.0 + error: str | None = None + metadata: Mapping[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + """Return a JSON-safe benchmark/API representation.""" + row: dict[str, Any] = { + "agent_id": self.agent_id, + "dispatch_target": self.dispatch_target, + "stage": self.stage, + "status": self.status, + "input_summary": self.input_summary, + "output_summary": self.output_summary, + "duration_ms": self.duration_ms, + "metadata": _json_safe(self.metadata), + } + if self.parent_id: + row["parent_id"] = self.parent_id + if self.error: + row["error"] = self.error + return row + + @dataclass(frozen=True) class ExpertRequest: """Typed expert input contract used by native CLIO expert modules.""" @@ -145,6 +179,7 @@ class RunTrace: trace_id: str = field(default_factory=lambda: str(uuid.uuid4())) started_at: float = field(default_factory=time.time) tools: list[ToolObservation] = field(default_factory=list) + expert_handoffs: list[ExpertHandoff] = field(default_factory=list) def record_tool( self, @@ -166,6 +201,36 @@ def record_tool( ) ) + def record_expert_handoff( + self, + *, + agent_id: str, + parent_id: str | None, + dispatch_target: str, + stage: str, + status: Literal["success", "failure"], + input_summary: str, + output_summary: str = "", + duration_ms: float = 0.0, + error: str | None = None, + metadata: Mapping[str, Any] | None = None, + ) -> None: + """Append an expert handoff observation to this run.""" + self.expert_handoffs.append( + ExpertHandoff( + agent_id=agent_id, + parent_id=parent_id, + dispatch_target=dispatch_target, + stage=stage, + status=status, + input_summary=input_summary, + output_summary=output_summary, + duration_ms=duration_ms, + error=error, + metadata=dict(metadata or {}), + ) + ) + @property def duration_ms(self) -> float: """Elapsed wall-clock duration for this trace.""" diff --git a/tests/test_core/test_agent_dispatch.py b/tests/test_core/test_agent_dispatch.py index 76190e7..57df375 100644 --- a/tests/test_core/test_agent_dispatch.py +++ b/tests/test_core/test_agent_dispatch.py @@ -116,6 +116,7 @@ def test_dispatch_ndp_catalog_child_expert(self, agent): expert_result = dspy.Prediction( analysis="NDP catalog results", recommendations="stage a bounded resource", + metadata={"expert": "ndp_catalog", "parent_expert": "data"}, ) agent.ndp_catalog_expert = MagicMock(return_value=expert_result) agent.data_expert = MagicMock() @@ -124,9 +125,45 @@ def test_dispatch_ndp_catalog_child_expert(self, agent): assert result.selected_expert == "ndp_catalog" assert "NDP catalog results" in result.answer + assert result.expert_handoffs[0]["agent_id"] == "ndp_catalog" + assert result.expert_handoffs[0]["parent_id"] == "data" agent.ndp_catalog_expert.assert_called_once() agent.data_expert.assert_not_called() + def test_nested_expert_tool_errors_surface(self, agent): + """Nested expert routes must not bypass normal tool-error surfacing.""" + self._set_planner( + agent, + { + "action": "expert", + "expert": "ndp_catalog", + "question": "Find unavailable NDP data", + }, + ) + expert_result = dspy.Prediction( + analysis="Could not query NDP", + recommendations="retry later", + metadata={"expert": "ndp_catalog", "parent_expert": "data"}, + tool_provenance=[ + ToolObservation( + tool="ndp_search_datasets", + params={"search_terms": ["earthscope"]}, + result={"error": {"message": "catalog unavailable"}}, + duration_ms=1.0, + ok=False, + ) + ], + ) + agent.ndp_catalog_expert = MagicMock(return_value=expert_result) + + result = agent.forward(question="Find unavailable NDP data", session_id="ndp-error") + + assert result.selected_expert == "ndp_catalog" + assert result.answer == "" + assert result.error_info is not None + assert result.error_info["error"] == "tool_error" + assert result.error_info["details"]["tool"] == "ndp_search_datasets" + def test_dispatch_sac_format_child_expert(self, agent): """SAC routes should execute the nested format expert.""" self._set_planner( @@ -140,6 +177,7 @@ def test_dispatch_sac_format_child_expert(self, agent): expert_result = dspy.Prediction( analysis="SAC trace statistics", recommendations="plot representative traces", + metadata={"expert": "sac_format", "parent_expert": "analysis"}, ) agent.sac_format_expert = MagicMock(return_value=expert_result) agent.analysis_expert = MagicMock() @@ -148,6 +186,8 @@ def test_dispatch_sac_format_child_expert(self, agent): assert result.selected_expert == "sac_format" assert "SAC trace statistics" in result.answer + assert result.expert_handoffs[0]["agent_id"] == "sac_format" + assert result.expert_handoffs[0]["parent_id"] == "analysis" agent.sac_format_expert.assert_called_once() agent.analysis_expert.assert_not_called() @@ -166,6 +206,7 @@ def test_data_handoff_continues_to_analysis_and_visualization(self, agent, tmp_p data_result = dspy.Prediction( analysis="staged waveform archive", recommendations="pass downstream", + metadata={"expert": "ndp_catalog", "parent_expert": "data"}, tool_provenance=[ ToolObservation( tool="ndp_stage_resource", @@ -179,6 +220,7 @@ def test_data_handoff_continues_to_analysis_and_visualization(self, agent, tmp_p analysis_result = dspy.Prediction( analysis="computed SAC statistics", recommendations="plot representative traces", + metadata={"expert": "sac_format", "parent_expert": "analysis"}, tool_provenance=[ ToolObservation( tool="sac_compute_trace_statistics", @@ -220,6 +262,15 @@ def test_data_handoff_continues_to_analysis_and_visualization(self, agent, tmp_p "sac_compute_trace_statistics", "sac_plot_traces", ] + assert [handoff["agent_id"] for handoff in result.expert_handoffs] == [ + "data", + "ndp_catalog", + "analysis", + "sac_format", + "visualization", + ] + assert result.expert_handoffs[1]["parent_id"] == "data" + assert result.expert_handoffs[3]["parent_id"] == "analysis" agent.analysis_expert.assert_called_once() agent.visualization_expert.assert_called_once() diff --git a/tests/test_gact/test_tools_called.py b/tests/test_gact/test_tools_called.py index 51a1bd2..8f59902 100644 --- a/tests/test_gact/test_tools_called.py +++ b/tests/test_gact/test_tools_called.py @@ -30,6 +30,7 @@ class _PredWithTools: selected_expert: str = "data_expert" routing_rationale: str = "keyword match" tools_called: object = None + expert_handoffs: object = None class _Agent: @@ -113,3 +114,38 @@ def test_tools_called_propagates_to_message_and_completion(tmp_path: Path) -> No assert rows[2]["ok"] is True assert rows[2]["duration_ms"] == 8.0 assert rows[2]["telemetry_source"] == "agent_trace" + + +def test_expert_handoffs_propagate_to_message_metadata(tmp_path: Path) -> None: + from .conftest import complete_turn + + handoffs = [ + { + "agent_id": "data", + "dispatch_target": "data", + "stage": "planner_dispatch", + "status": "success", + "input_summary": "find data", + "output_summary": "staged waveform archive", + "duration_ms": 12.0, + "metadata": {"expert": "ndp_catalog"}, + }, + { + "agent_id": "ndp_catalog", + "parent_id": "data", + "dispatch_target": "ndp_catalog", + "stage": "planner_dispatch_child", + "status": "success", + "input_summary": "find data", + "output_summary": "staged waveform archive", + "duration_ms": 12.0, + "metadata": {"observed_through": "data"}, + }, + ] + pred = _PredWithTools(expert_handoffs=handoffs) + client = _client(tmp_path, pred) + + sid = client.post("/v1/sessions", json={"title": "t"}).json()["id"] + assistant = complete_turn(client, sid, "find seismic data") + + assert assistant["metadata"]["expert_handoffs"] == handoffs diff --git a/tests/test_stress_benchmark/test_local_scientific_workflows.py b/tests/test_stress_benchmark/test_local_scientific_workflows.py index 7f010af..af1d03b 100644 --- a/tests/test_stress_benchmark/test_local_scientific_workflows.py +++ b/tests/test_stress_benchmark/test_local_scientific_workflows.py @@ -100,6 +100,12 @@ def _tool_names(message: dict[str, Any]) -> list[str]: return [_tool_name(row) for row in _tools(message)] +def _expert_handoffs(message: dict[str, Any]) -> list[dict[str, Any]]: + metadata = message.get("metadata") or {} + rows = metadata.get("expert_handoffs") or [] + return rows if isinstance(rows, list) else [] + + def _normalize_scientific_text(text: str) -> str: for original, normalized in { "⁻⁰": "^-0", @@ -274,6 +280,7 @@ def _record_case( "selected_agent": _routing_agent(message), "routing_decision": _routing_decision(message), "tools_called": _tools(message), + "expert_handoffs": _expert_handoffs(message), "artifacts": _artifact_paths(message), "nanoagents_spawned": child_sessions or [], "error_info": message.get("error_info"), @@ -290,12 +297,14 @@ def _record_case( def _assert_tool_answer( message: dict[str, Any], *, - expected_agent: str, + expected_agent: str | tuple[str, ...], expected_tool_prefix: str, expected_terms: tuple[str, ...], ) -> None: assert _blocking_error(message) is None, message.get("error_info") - assert expected_agent in _routing_agent(message), _routing_agent(message) + expected_agents = (expected_agent,) if isinstance(expected_agent, str) else expected_agent + routing_agent = _routing_agent(message) + assert any(agent in routing_agent for agent in expected_agents), routing_agent names = _tool_names(message) assert any(name.startswith(expected_tool_prefix) for name in names), names text = _normalize_scientific_text(_text(message)).lower() @@ -595,10 +604,12 @@ def test_local_ndp_catalog_discovery_is_visible_to_core_expert_path( ) _assert_tool_answer( answer, - expected_agent="data", + expected_agent=("data", "ndp_catalog"), expected_tool_prefix="ndp_", expected_terms=("National Data Platform", "noaa", "dataset"), ) + handoff_agents = {str(row.get("agent_id")) for row in _expert_handoffs(answer)} + assert "ndp_catalog" in handoff_agents, _expert_handoffs(answer) assert any(name in _tool_names(answer) for name in ("ndp_search_datasets",)), _tool_names( answer )