diff --git a/src/interfaces/chat_app/app.py b/src/interfaces/chat_app/app.py index 555eac96..14c9d924 100644 --- a/src/interfaces/chat_app/app.py +++ b/src/interfaces/chat_app/app.py @@ -1647,8 +1647,18 @@ def stream_ab_comparison( # PipelineEventFormatter yielding *accumulated* content (not deltas); # the last write per arm is therefore the complete response text. arm_results = { - "a": {"final_text": "", "error": None, "final_emitted": False}, - "b": {"final_text": "", "error": None, "final_emitted": False}, + "a": { + "final_text": "", + "error": None, + "final_emitted": False, + "duration_ms": None, + }, + "b": { + "final_text": "", + "error": None, + "final_emitted": False, + "duration_ms": None, + }, } arm_model_used = { "a": f"{arm_a_variant.provider or ''}/{arm_a_variant.model or ''}".strip("/"), @@ -1695,6 +1705,8 @@ def _stream_arm(arm_archi, arm_label): final_text = getattr(output, "answer", "") or formatter.last_text or arm_results[arm_label]["final_text"] arm_results[arm_label]["final_text"] = final_text arm_results[arm_label]["final_emitted"] = True + duration_ms = int((_time.monotonic() - t0) * 1000) + arm_results[arm_label]["duration_ms"] = duration_ms event_queue.put({ "type": "final", "arm": arm_label, @@ -1702,7 +1714,7 @@ def _stream_arm(arm_archi, arm_label): "usage": output_meta.get("usage"), "model": output_meta.get("model"), "model_used": arm_model_used[arm_label], - "duration_ms": int((_time.monotonic() - t0) * 1000), + "duration_ms": duration_ms, }) except Exception as exc: arm_results[arm_label]["error"] = str(exc) @@ -1745,6 +1757,8 @@ def _stream_arm(arm_archi, arm_label): arm_b_error = arm_results["b"]["error"] arm_a_final_text = arm_results["a"]["final_text"] arm_b_final_text = arm_results["b"]["final_text"] + arm_a_duration_ms = arm_results["a"]["duration_ms"] + arm_b_duration_ms = arm_results["b"]["duration_ms"] if arm_a_error and arm_b_error: yield {"type": "error", "message": "Both A/B arms failed", @@ -1790,6 +1804,10 @@ def _stream_arm(arm_archi, arm_label): pipeline_used=self.current_pipeline_used, ) + # Persist per-arm latency for analysis by reusing the timing table keyed by message_id. + self._persist_ab_arm_timing(arm_a_mid, arm_a_duration_ms) + self._persist_ab_arm_timing(arm_b_mid, arm_b_duration_ms) + # Get user prompt message ID if not already stored above if not user_prompt_mid: user_prompt_mid = self._get_last_user_message_id(context.conversation_id) @@ -1829,6 +1847,8 @@ def _stream_arm(arm_archi, arm_label): "is_champion_first": is_champion_first, "arm_a_message_id": arm_a_mid, "arm_b_message_id": arm_b_mid, + "arm_a_duration_ms": arm_a_duration_ms, + "arm_b_duration_ms": arm_b_duration_ms, "variant_label_mode": self.ab_pool.variant_label_mode, } @@ -1851,6 +1871,33 @@ def _store_assistant_message(self, conversation_id, content, model_used=None, pi logger.error("Failed to store assistant message: %s", exc) return None + def _persist_ab_arm_timing(self, message_id: Optional[int], duration_ms: Optional[int]) -> None: + """Persist A/B arm latency into the timing table for post-hoc analysis.""" + if not message_id or duration_ms is None: + return + + safe_duration_ms = max(int(duration_ms), 0) + end_ts = datetime.now(timezone.utc) + start_ts = end_ts - timedelta(milliseconds=safe_duration_ms) + + synthetic_timestamps = { + "client_sent_msg_ts": start_ts, + "server_received_msg_ts": start_ts, + "lock_acquisition_ts": start_ts, + "vectorstore_update_ts": start_ts, + "query_convo_history_ts": start_ts, + "chain_finished_ts": end_ts, + "archi_message_ts": end_ts, + "insert_convo_ts": end_ts, + "finish_call_ts": end_ts, + "server_response_msg_ts": end_ts, + } + + try: + self.insert_timing(message_id, synthetic_timestamps) + except Exception as exc: + logger.warning("Failed to persist A/B timing for message %s: %s", message_id, exc) + def _get_last_user_message_id(self, conversation_id): """Get the most recent user message_id for a conversation.""" try: