Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions src/interfaces/chat_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1647,8 +1647,18 @@ def stream_ab_comparison(
# PipelineEventFormatter yielding *accumulated* content (not deltas);
# the last write per arm is therefore the complete response text.
arm_results = {
"a": {"final_text": "", "error": None, "final_emitted": False},
"b": {"final_text": "", "error": None, "final_emitted": False},
"a": {
"final_text": "",
"error": None,
"final_emitted": False,
"duration_ms": None,
},
"b": {
"final_text": "",
"error": None,
"final_emitted": False,
"duration_ms": None,
},
}
arm_model_used = {
"a": f"{arm_a_variant.provider or ''}/{arm_a_variant.model or ''}".strip("/"),
Expand Down Expand Up @@ -1695,14 +1705,16 @@ def _stream_arm(arm_archi, arm_label):
final_text = getattr(output, "answer", "") or formatter.last_text or arm_results[arm_label]["final_text"]
arm_results[arm_label]["final_text"] = final_text
arm_results[arm_label]["final_emitted"] = True
duration_ms = int((_time.monotonic() - t0) * 1000)
arm_results[arm_label]["duration_ms"] = duration_ms
event_queue.put({
"type": "final",
"arm": arm_label,
"response": final_text,
"usage": output_meta.get("usage"),
"model": output_meta.get("model"),
"model_used": arm_model_used[arm_label],
"duration_ms": int((_time.monotonic() - t0) * 1000),
"duration_ms": duration_ms,
})
except Exception as exc:
arm_results[arm_label]["error"] = str(exc)
Expand Down Expand Up @@ -1745,6 +1757,8 @@ def _stream_arm(arm_archi, arm_label):
arm_b_error = arm_results["b"]["error"]
arm_a_final_text = arm_results["a"]["final_text"]
arm_b_final_text = arm_results["b"]["final_text"]
arm_a_duration_ms = arm_results["a"]["duration_ms"]
arm_b_duration_ms = arm_results["b"]["duration_ms"]

if arm_a_error and arm_b_error:
yield {"type": "error", "message": "Both A/B arms failed",
Expand Down Expand Up @@ -1790,6 +1804,10 @@ def _stream_arm(arm_archi, arm_label):
pipeline_used=self.current_pipeline_used,
)

# Persist per-arm latency for analysis by reusing the timing table keyed by message_id.
self._persist_ab_arm_timing(arm_a_mid, arm_a_duration_ms)
self._persist_ab_arm_timing(arm_b_mid, arm_b_duration_ms)

# Get user prompt message ID if not already stored above
if not user_prompt_mid:
user_prompt_mid = self._get_last_user_message_id(context.conversation_id)
Expand Down Expand Up @@ -1829,6 +1847,8 @@ def _stream_arm(arm_archi, arm_label):
"is_champion_first": is_champion_first,
"arm_a_message_id": arm_a_mid,
"arm_b_message_id": arm_b_mid,
"arm_a_duration_ms": arm_a_duration_ms,
"arm_b_duration_ms": arm_b_duration_ms,
"variant_label_mode": self.ab_pool.variant_label_mode,
}

Expand All @@ -1851,6 +1871,33 @@ def _store_assistant_message(self, conversation_id, content, model_used=None, pi
logger.error("Failed to store assistant message: %s", exc)
return None

def _persist_ab_arm_timing(self, message_id: Optional[int], duration_ms: Optional[int]) -> None:
"""Persist A/B arm latency into the timing table for post-hoc analysis."""
if not message_id or duration_ms is None:
return

safe_duration_ms = max(int(duration_ms), 0)
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(milliseconds=safe_duration_ms)

synthetic_timestamps = {
"client_sent_msg_ts": start_ts,
"server_received_msg_ts": start_ts,
"lock_acquisition_ts": start_ts,
"vectorstore_update_ts": start_ts,
"query_convo_history_ts": start_ts,
"chain_finished_ts": end_ts,
"archi_message_ts": end_ts,
"insert_convo_ts": end_ts,
"finish_call_ts": end_ts,
"server_response_msg_ts": end_ts,
}

try:
self.insert_timing(message_id, synthetic_timestamps)
except Exception as exc:
logger.warning("Failed to persist A/B timing for message %s: %s", message_id, exc)

def _get_last_user_message_id(self, conversation_id):
"""Get the most recent user message_id for a conversation."""
try:
Expand Down
Loading