From 869fade4a81457249cc88ae2e223cb773ee1a813 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 01:13:06 -0800 Subject: [PATCH] redis update --- eval_protocol/proxy/proxy_core/app.py | 6 +++++- eval_protocol/proxy/proxy_core/redis_utils.py | 11 +++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/eval_protocol/proxy/proxy_core/app.py b/eval_protocol/proxy/proxy_core/app.py index 528d467e..751d5dc1 100644 --- a/eval_protocol/proxy/proxy_core/app.py +++ b/eval_protocol/proxy/proxy_core/app.py @@ -4,7 +4,7 @@ """ from fastapi import FastAPI, Depends, Request, Query -from typing import Optional, List +from typing import Optional, List, Callable import os import redis import logging @@ -105,6 +105,7 @@ def create_app( auth_provider: AuthProvider = NoAuthProvider(), preprocess_chat_request: Optional[ChatRequestHook] = None, preprocess_traces_request: Optional[TracesRequestHook] = None, + extra_routes: Optional[Callable[[FastAPI], None]] = None, ) -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): @@ -288,6 +289,9 @@ async def pointwise_get_langfuse_trace( params=params, ) + if extra_routes is not None: + extra_routes(app) + # Health @app.get("/health") async def health(): diff --git a/eval_protocol/proxy/proxy_core/redis_utils.py b/eval_protocol/proxy/proxy_core/redis_utils.py index 2ebd6245..71edb5a0 100644 --- a/eval_protocol/proxy/proxy_core/redis_utils.py +++ b/eval_protocol/proxy/proxy_core/redis_utils.py @@ -8,8 +8,12 @@ logger = logging.getLogger(__name__) +DEFAULT_ROLLOUT_TTL_SECONDS = 60 * 60 * 24 -def register_insertion_id(redis_client: redis.Redis, rollout_id: str, insertion_id: str) -> bool: + +def register_insertion_id( + redis_client: redis.Redis, rollout_id: str, insertion_id: str, ttl_seconds: int = DEFAULT_ROLLOUT_TTL_SECONDS +) -> bool: """Register an insertion_id for a rollout_id in Redis. Tracks all expected completion insertion_ids for this rollout. @@ -22,7 +26,10 @@ def register_insertion_id(redis_client: redis.Redis, rollout_id: str, insertion_ True if successful, False otherwise """ try: - redis_client.sadd(rollout_id, insertion_id) + pipe = redis_client.pipeline() + pipe.sadd(rollout_id, insertion_id) + pipe.expire(rollout_id, int(ttl_seconds)) + pipe.execute() logger.info(f"Registered insertion_id {insertion_id} for rollout {rollout_id}") return True except Exception as e: