From e8c409f4a8b0c911286c4667e78366958e65dc82 Mon Sep 17 00:00:00 2001 From: Arshad Ansari Date: Tue, 26 May 2026 18:53:15 +0530 Subject: [PATCH] fix(extraction): cap concurrent LLM calls to stop aegis-burst timeout cascade MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When aegis intelligence (worker/.../intelligence.py) pushes its daily arxiv batch (~30–50 summaries in seconds), every ingestion job hits qwen3:14b at once. Ollama on asif serves ~2–4 in parallel, so the tail queues inside Ollama past the 600s read timeout, retries (KS ×2, LiteLLM ×2) snowball back into the same queue, and the whole class falls to 0 triples. Prod signature: 11+ ReadTimeouts clustered in ~2s about 10 min after a burst. Fix: ExtractionClient now holds an asyncio.Semaphore around the LLM POST, sized via settings.extraction_max_concurrent (default 4, env EXTRACTION_MAX_CONCURRENT). The semaphore wraps just the request, not the retry backoff, so a failing call doesn't hog a slot. This moves the queue from Ollama-side (where read_timeout fires) to KS-side (where it doesn't). Repro mirrored prod: 30 concurrent realistic prompts via KS's exact httpx config → median 313s, max 593s, 2 ReadTimeouts at 600.1s, zero PoolTimeouts. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/knowledge_service/clients/llm.py | 20 +++++++---- src/knowledge_service/config.py | 6 ++++ src/knowledge_service/main.py | 1 + tests/test_extraction_client.py | 53 +++++++++++++++++++++++++++- uv.lock | 2 +- 5 files changed, 73 insertions(+), 9 deletions(-) diff --git a/src/knowledge_service/clients/llm.py b/src/knowledge_service/clients/llm.py index fd36f6e..30cfcd5 100644 --- a/src/knowledge_service/clients/llm.py +++ b/src/knowledge_service/clients/llm.py @@ -87,9 +87,14 @@ def __init__( model: str, api_key: str, registry: DomainRegistry | None = None, + max_concurrent: int = 4, ) -> None: super().__init__(base_url, model, api_key, read_timeout=600.0) self._registry = registry + # Cap concurrent LLM calls so an aegis ingestion burst doesn't queue + # past the read timeout inside Ollama. Acquired around the POST itself + # (not the whole retry loop) so backoff sleep doesn't hold a slot. + self._sem = asyncio.Semaphore(max_concurrent) self._prompt_builder = None if registry is not None: from knowledge_service.clients.prompt_builder import PromptBuilder # noqa: PLC0415 @@ -111,13 +116,14 @@ async def _post_chat(self, prompt: str) -> str | None: for attempt in range(_EXTRACT_MAX_RETRIES + 1): try: - response = await self._client.post( - "/v1/chat/completions", - json={ - "model": self._model, - "messages": [{"role": "user", "content": prompt}], - }, - ) + async with self._sem: + response = await self._client.post( + "/v1/chat/completions", + json={ + "model": self._model, + "messages": [{"role": "user", "content": prompt}], + }, + ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"] except httpx.HTTPStatusError as exc: diff --git a/src/knowledge_service/config.py b/src/knowledge_service/config.py index 0361063..b8be67f 100644 --- a/src/knowledge_service/config.py +++ b/src/knowledge_service/config.py @@ -20,6 +20,12 @@ class Settings(BaseSettings): max_chunks: int = 50 embed_batch_size: int = 20 entity_cache_max_size: int = 1000 + # Cap concurrent extraction LLM calls. Without this, a burst of N ingestion + # jobs (typical aegis daily arxiv pull is 30–50) fires N parallel requests + # at qwen3, which serves ~2–4 at a time on asif; the tail queues inside + # Ollama past the 600s read timeout and the whole batch falls into a + # retry-cascade. Pick 4 to match observed parallelism. + extraction_max_concurrent: int = 4 # Ingestion pipeline spacy_data_dir: str = "/app/data/spacy" diff --git a/src/knowledge_service/main.py b/src/knowledge_service/main.py index 9f8182a..fe5649b 100644 --- a/src/knowledge_service/main.py +++ b/src/knowledge_service/main.py @@ -150,6 +150,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: model=settings.llm_chat_model, api_key=settings.llm_api_key, registry=domain_registry, + max_concurrent=settings.extraction_max_concurrent, ) app.state.extraction_client = extraction_client app.state.embedding_client = embedding_client diff --git a/tests/test_extraction_client.py b/tests/test_extraction_client.py index 5e993e8..03f1257 100644 --- a/tests/test_extraction_client.py +++ b/tests/test_extraction_client.py @@ -1,3 +1,4 @@ +import asyncio import json import httpx @@ -6,11 +7,14 @@ from knowledge_service.clients.llm import ExtractionClient from knowledge_service.models import EntityInput, TripleInput +# Capture the real asyncio.sleep before any autouse fixture rebinds it; concurrency +# tests need a real yield to observe overlap. +_REAL_SLEEP = asyncio.sleep + @pytest.fixture(autouse=True) def _skip_retry_backoff(monkeypatch): """Monkeypatch asyncio.sleep to a no-op so retry tests don't actually wait.""" - import asyncio async def _nosleep(_seconds): return None @@ -420,3 +424,50 @@ async def test_no_auth_header_when_key_empty(self, httpx_mock): headers = httpx_mock.get_requests()[0].headers assert "authorization" not in headers await client.close() + + +class TestConcurrencyCap: + """ExtractionClient must bound concurrent LLM calls. + + Regression for the prod cascade: an aegis batch of 30+ ingestion jobs + all hit qwen3 simultaneously, queued inside Ollama, and timed out at + the 600s read boundary together. The semaphore moves the queue from + qwen3 back into KS, where the read timeout doesn't apply. + """ + + async def test_inflight_never_exceeds_cap(self, monkeypatch): + cap = 4 + burst = 12 # ≥ 3× the cap so violations are obvious if uncapped + + inflight = 0 + max_inflight = 0 + + async def trace_post(*_args, **_kwargs): + nonlocal inflight, max_inflight + inflight += 1 + max_inflight = max(max_inflight, inflight) + try: + # Real sleep to keep slots occupied long enough for overlap. + await _REAL_SLEEP(0.02) + finally: + inflight -= 1 + return httpx.Response( + 200, + json=_make_combined_response(entities=[], relations=[]), + request=httpx.Request("POST", _CHAT_URL), + ) + + client = ExtractionClient( + base_url=_BASE, model="qwen3:14b", api_key=_KEY, max_concurrent=cap + ) + monkeypatch.setattr(client._client, "post", trace_post) + + await asyncio.gather(*[client._post_chat("prompt") for _ in range(burst)]) + + assert max_inflight <= cap, f"max_inflight={max_inflight} exceeded cap={cap}" + await client.close() + + async def test_default_cap_is_set(self): + client = ExtractionClient(base_url=_BASE, model="qwen3:14b", api_key=_KEY) + assert client._sem._value >= 1 + await client.close() diff --git a/uv.lock b/uv.lock index a28779d..3831124 100644 --- a/uv.lock +++ b/uv.lock @@ -501,7 +501,7 @@ wheels = [ [[package]] name = "knowledge-service" -version = "0.1.111" +version = "0.1.113" source = { editable = "." } dependencies = [ { name = "asyncpg" },