From d772cdfb03a5b38e91125c9c2341a853e88c0f01 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Wed, 1 Apr 2026 00:27:36 +0600 Subject: [PATCH 1/4] feat: implement media service for document extraction and processing pipeline --- agentflow_cli/media/__init__.py | 14 ++ agentflow_cli/media/extractor.py | 77 +++++++ agentflow_cli/media/pipeline.py | 82 +++++++ .../src/app/core/config/media_settings.py | 40 ++++ agentflow_cli/src/app/loader.py | 12 + .../routers/graph/services/graph_service.py | 3 + .../graph/services/multimodal_preprocessor.py | 65 ++++++ .../src/app/routers/media/__init__.py | 180 +++++++++++++++ agentflow_cli/src/app/routers/media/router.py | 127 +++++++++++ .../src/app/routers/media/schemas.py | 28 +++ agentflow_cli/src/app/routers/setup_router.py | 2 + pyproject.toml | 7 +- requirements.txt | 31 +++ tests/test_multimodal_sprint2_extraction.py | 214 ++++++++++++++++++ uv.lock | 2 +- 15 files changed, 878 insertions(+), 6 deletions(-) create mode 100644 agentflow_cli/media/__init__.py create mode 100644 agentflow_cli/media/extractor.py create mode 100644 agentflow_cli/media/pipeline.py create mode 100644 agentflow_cli/src/app/core/config/media_settings.py create mode 100644 agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py create mode 100644 agentflow_cli/src/app/routers/media/__init__.py create mode 100644 agentflow_cli/src/app/routers/media/router.py create mode 100644 agentflow_cli/src/app/routers/media/schemas.py create mode 100644 requirements.txt create mode 100644 tests/test_multimodal_sprint2_extraction.py diff --git a/agentflow_cli/media/__init__.py b/agentflow_cli/media/__init__.py new file mode 100644 index 0000000..e714b72 --- /dev/null +++ b/agentflow_cli/media/__init__.py @@ -0,0 +1,14 @@ +"""Media processing module for pyagenity-api. + +Document extraction lives here (not in PyAgenity core) because +the core library stays lightweight — SDK users extract text themselves. +The API platform auto-extracts using textxtract. +""" + +from .extractor import DocumentExtractor +from .pipeline import DocumentPipeline + +__all__ = [ + "DocumentExtractor", + "DocumentPipeline", +] diff --git a/agentflow_cli/media/extractor.py b/agentflow_cli/media/extractor.py new file mode 100644 index 0000000..e390c1c --- /dev/null +++ b/agentflow_cli/media/extractor.py @@ -0,0 +1,77 @@ +"""Document extraction service using textxtract. + +Wraps ``AsyncTextExtractor`` from the textxtract library. +This module lives in pyagenity-api, NOT in the core PyAgenity library, +because extraction is an API-platform concern. +""" + +from __future__ import annotations + +import logging +from typing import Any + +logger = logging.getLogger("agentflow_cli.media.extractor") + +try: + from textxtract import AsyncTextExtractor + from textxtract.core.exceptions import ExtractionError, FileTypeNotSupportedError +except ImportError: # pragma: no cover + AsyncTextExtractor = None # type: ignore[assignment] + + class FileTypeNotSupportedError(Exception): # type: ignore[no-redef] + """Fallback exception when textxtract is unavailable.""" + + class ExtractionError(Exception): # type: ignore[no-redef] + """Fallback exception when textxtract is unavailable.""" + + +class DocumentExtractor: + """Wraps textxtract AsyncTextExtractor for API-side document extraction. + + Examples:: + + extractor = DocumentExtractor() + text = await extractor.extract(pdf_bytes, "report.pdf") + """ + + def __init__(self, extractor: Any | None = None): + if extractor is not None: + self.extractor = extractor + return + + if AsyncTextExtractor is None: + raise ImportError( + "textxtract is required for document extraction. " + "Install with `pip install textxtract[pdf,docx,html,xml,md]`" + ) + + self.extractor = AsyncTextExtractor() + + async def extract(self, data: bytes | str, filename: str | None = None) -> str | None: + """Extract text from bytes or a local path. + + Args: + data: Raw bytes or local path. + filename: Required when passing bytes so type can be detected. + + Returns: + Extracted text, or ``None`` when file type is unsupported. + + Raises: + ValueError: For missing filename or failed extraction. + """ + if isinstance(data, bytes) and not filename: + raise ValueError("filename must be provided when extracting from bytes") + + try: + if isinstance(data, bytes): + assert filename is not None + return await self.extractor.extract(data, filename) + return await self.extractor.extract(data) + + except FileTypeNotSupportedError: + logger.warning("Document type not supported for extraction: %s", filename) + return None + except ExtractionError as exc: + logger.exception("Document extraction failed for %s", filename) + raise ValueError("Failed to extract text from document") from exc diff --git a/agentflow_cli/media/pipeline.py b/agentflow_cli/media/pipeline.py new file mode 100644 index 0000000..3bcf01c --- /dev/null +++ b/agentflow_cli/media/pipeline.py @@ -0,0 +1,82 @@ +"""Document processing pipeline for the API layer. + +Orchestrates: receive uploaded file -> extract text via DocumentExtractor +-> return TextBlock or DocumentBlock depending on config. +""" + +from __future__ import annotations + +import logging +from base64 import b64decode +from typing import Any + +from agentflow.media.config import DocumentHandling +from agentflow.state.message_block import DocumentBlock, TextBlock + +from .extractor import DocumentExtractor + +logger = logging.getLogger("agentflow_cli.media.pipeline") + + +class DocumentPipeline: + """Document processing pipeline with extraction config. + + Modes: + - ``EXTRACT_TEXT``: Extract text via textxtract, return TextBlock. + - ``PASS_RAW``: Return the original DocumentBlock untouched. + - ``SKIP``: Drop the document entirely (returns None). + """ + + def __init__( + self, + document_extractor: DocumentExtractor | None = None, + handling: DocumentHandling = DocumentHandling.EXTRACT_TEXT, + ): + self.handling = handling + # Lazy-init extractor only when actually needed (EXTRACT_TEXT mode) + self._extractor = document_extractor + + @property + def extractor(self) -> DocumentExtractor: + if self._extractor is None: + self._extractor = DocumentExtractor() + return self._extractor + + async def process_document(self, document_block: Any) -> Any | None: + """Process a DocumentBlock according to the handling policy. + + Args: + document_block: A ``DocumentBlock`` instance. + + Returns: + A ``TextBlock`` (if extracted), the original ``DocumentBlock`` + (if pass-raw or extraction unsupported), or ``None`` (if skip). + """ + if self.handling == DocumentHandling.SKIP: + return None + + if self.handling == DocumentHandling.PASS_RAW: + return document_block + + # EXTRACT_TEXT path + if not isinstance(document_block, DocumentBlock): + raise ValueError("Expected DocumentBlock in pipeline") + + # If an excerpt already exists, return it as text + if document_block.excerpt and document_block.excerpt.strip(): + return TextBlock(text=document_block.excerpt) + + media = document_block.media + + if media.kind == "data" and media.data_base64: + decoded = b64decode(media.data_base64) + filename = media.filename or "document.pdf" + extracted = await self.extractor.extract(decoded, filename) + if extracted: + return TextBlock(text=extracted) + # Extraction returned None (unsupported type) — keep raw + return document_block + + # For external URLs or provider file_id, we cannot extract directly. + # Keep as-is so API layer may resolve later. + return document_block diff --git a/agentflow_cli/src/app/core/config/media_settings.py b/agentflow_cli/src/app/core/config/media_settings.py new file mode 100644 index 0000000..2c7868b --- /dev/null +++ b/agentflow_cli/src/app/core/config/media_settings.py @@ -0,0 +1,40 @@ +"""Media / multimodal configuration loaded from environment variables.""" + +from __future__ import annotations + +import logging +from enum import Enum +from functools import lru_cache + +from pydantic_settings import BaseSettings + +logger = logging.getLogger("agentflow-cli.media") + + +class MediaStorageType(str, Enum): + MEMORY = "memory" + LOCAL = "local" + CLOUD = "cloud" + PG = "pg" + + +class MediaSettings(BaseSettings): + """Settings loaded from env vars (prefix-free, matches plan).""" + + MEDIA_STORAGE_TYPE: MediaStorageType = MediaStorageType.LOCAL + MEDIA_STORAGE_PATH: str = "./uploads" + MEDIA_MAX_SIZE_MB: float = 25.0 + DOCUMENT_HANDLING: str = "extract_text" # extract_text | pass_raw | skip + + # Cloud storage (S3/GCS) — only used when MEDIA_STORAGE_TYPE=cloud + MEDIA_CLOUD_PROVIDER: str = "aws" # aws | gcp + MEDIA_CLOUD_BUCKET: str = "" + MEDIA_CLOUD_REGION: str = "us-east-1" + + class Config: + extra = "allow" + + +@lru_cache +def get_media_settings() -> MediaSettings: + return MediaSettings() # type: ignore[call-arg] diff --git a/agentflow_cli/src/app/loader.py b/agentflow_cli/src/app/loader.py index 1fbfdb7..fbf888d 100644 --- a/agentflow_cli/src/app/loader.py +++ b/agentflow_cli/src/app/loader.py @@ -336,6 +336,18 @@ async def attach_all_modules( authorization_path = config.authorization_path load_and_bind_authorization(container, authorization_path) + # --- Media service wiring --- + from agentflow_cli.src.app.core.config.media_settings import ( + MediaSettings, + get_media_settings, + ) + from agentflow_cli.src.app.routers.media import MediaService + + media_settings = get_media_settings() + container.bind_instance(MediaSettings, media_settings) + media_service = MediaService(settings=media_settings) + container.bind_instance(MediaService, media_service) + logger.info("Container loaded successfully") logger.debug(f"Container dependency graph: {container.get_dependency_graph()}") diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index 5e1a15a..62d4b96 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -20,6 +20,9 @@ GraphSchema, GraphSetupSchema, ) +from agentflow_cli.src.app.routers.graph.services.multimodal_preprocessor import ( + preprocess_multimodal_messages, +) from agentflow_cli.src.app.utils import DummyThreadNameGenerator, ThreadNameGenerator diff --git a/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py b/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py new file mode 100644 index 0000000..d0242d3 --- /dev/null +++ b/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py @@ -0,0 +1,65 @@ +"""Pre-processing utilities for multimodal messages at the API boundary.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from agentflow.state import Message +from agentflow.state.message_block import DocumentBlock, ImageBlock, TextBlock + +if TYPE_CHECKING: + from agentflow_cli.src.app.routers.media import MediaService + +logger = logging.getLogger("agentflow-cli.media") + + +async def preprocess_multimodal_messages( + messages: list[Message], + media_service: "MediaService | None", +) -> list[Message]: + """Resolve file_id references in messages before graph execution. + + For each ``DocumentBlock`` with a ``file_id``: + - If cached extracted text exists, replace with a ``TextBlock``. + - Otherwise pass through unchanged (the agent converter will handle it). + + For each ``ImageBlock``/``AudioBlock`` with a ``file_id``: + - Convert to a ``agentflow://media/{file_id}`` URL-based reference so + the MediaRefResolver can pick it up at LLM-call time. + + This is a no-op when ``media_service`` is None. + """ + if media_service is None: + return messages + + processed: list[Message] = [] + for msg in messages: + new_content = [] + changed = False + + for block in msg.content: + if isinstance(block, DocumentBlock) and block.media.kind == "file_id" and block.media.file_id: + cached = media_service.get_cached_extraction(block.media.file_id) + if cached: + new_content.append(TextBlock(text=cached)) + changed = True + continue + + if hasattr(block, "media") and block.media.kind == "file_id" and block.media.file_id: + fid = block.media.file_id + # Convert file_id → agentflow://media/ URL reference + if not (block.media.url and block.media.url.startswith("agentflow://media/")): + block.media.kind = "url" + block.media.url = f"agentflow://media/{fid}" + changed = True + + new_content.append(block) + + if changed: + new_msg = msg.model_copy(update={"content": new_content}) + processed.append(new_msg) + else: + processed.append(msg) + + return processed diff --git a/agentflow_cli/src/app/routers/media/__init__.py b/agentflow_cli/src/app/routers/media/__init__.py new file mode 100644 index 0000000..fe66aab --- /dev/null +++ b/agentflow_cli/src/app/routers/media/__init__.py @@ -0,0 +1,180 @@ +"""Media service — provides the configured MediaStore and DocumentPipeline.""" + +from __future__ import annotations + +import logging +from typing import Any + +from agentflow.media.config import DocumentHandling +from agentflow.media.storage.base import BaseMediaStore +from injectq import inject, singleton + +from agentflow_cli.media.extractor import DocumentExtractor +from agentflow_cli.media.pipeline import DocumentPipeline +from agentflow_cli.src.app.core.config.media_settings import MediaSettings, MediaStorageType + +logger = logging.getLogger("agentflow-cli.media") + + +def _create_media_store(settings: MediaSettings) -> BaseMediaStore: + """Factory that instantiates the right MediaStore from settings.""" + stype = settings.MEDIA_STORAGE_TYPE + + if stype == MediaStorageType.MEMORY: + from agentflow.media.storage.memory_store import InMemoryMediaStore + + return InMemoryMediaStore() + + if stype == MediaStorageType.LOCAL: + from agentflow.media.storage.local_store import LocalFileMediaStore + + return LocalFileMediaStore(base_dir=settings.MEDIA_STORAGE_PATH) + + if stype == MediaStorageType.CLOUD: + from agentflow.media.storage.cloud_store import CloudMediaStore + + return CloudMediaStore( + provider=settings.MEDIA_CLOUD_PROVIDER, + bucket=settings.MEDIA_CLOUD_BUCKET, + region=settings.MEDIA_CLOUD_REGION, + ) + + if stype == MediaStorageType.PG: + from agentflow.media.storage.pg_store import PgBlobStore + + return PgBlobStore() + + raise ValueError(f"Unknown MEDIA_STORAGE_TYPE: {stype}") + + +def _create_document_pipeline(settings: MediaSettings) -> DocumentPipeline: + handling_map = { + "extract_text": DocumentHandling.EXTRACT_TEXT, + "pass_raw": DocumentHandling.PASS_RAW, + "skip": DocumentHandling.SKIP, + } + handling = handling_map.get(settings.DOCUMENT_HANDLING, DocumentHandling.EXTRACT_TEXT) + + extractor = DocumentExtractor() if handling == DocumentHandling.EXTRACT_TEXT else None + return DocumentPipeline(document_extractor=extractor, handling=handling) + + +@singleton +class MediaService: + """Central media service providing storage + document extraction.""" + + @inject + def __init__(self, settings: MediaSettings | None = None): + from agentflow_cli.src.app.core.config.media_settings import get_media_settings + + self._settings = settings or get_media_settings() + self._store: BaseMediaStore | None = None + self._pipeline: DocumentPipeline | None = None + self._extraction_cache: dict[str, str] = {} # file_id -> extracted text + + @property + def store(self) -> BaseMediaStore: + if self._store is None: + self._store = _create_media_store(self._settings) + return self._store + + @property + def pipeline(self) -> DocumentPipeline: + if self._pipeline is None: + self._pipeline = _create_document_pipeline(self._settings) + return self._pipeline + + @property + def max_size_bytes(self) -> int: + return int(self._settings.MEDIA_MAX_SIZE_MB * 1024 * 1024) + + # ------------------------------------------------------------------ + # File operations + # ------------------------------------------------------------------ + + async def upload_file( + self, + data: bytes, + filename: str, + mime_type: str, + ) -> dict[str, Any]: + """Upload a file: store binary + optionally extract text. + + Returns a dict with file_id, mime_type, size_bytes, filename, + extracted_text (nullable), and url. + """ + if len(data) > self.max_size_bytes: + raise ValueError( + f"File size {len(data)} exceeds maximum " + f"{self._settings.MEDIA_MAX_SIZE_MB}MB" + ) + + # Store binary in the configured MediaStore + storage_key = await self.store.store( + data, + mime_type, + metadata={"filename": filename}, + ) + + result: dict[str, Any] = { + "file_id": storage_key, + "mime_type": mime_type, + "size_bytes": len(data), + "filename": filename, + "extracted_text": None, + "url": f"/v1/files/{storage_key}", + } + + # If it's a document, try extraction + if self._is_extractable(mime_type): + try: + text = await self.pipeline.extractor.extract(data, filename) + if text: + result["extracted_text"] = text + self._extraction_cache[storage_key] = text + except Exception as exc: + logger.warning("Extraction failed for %s: %s", filename, exc) + + return result + + async def get_file(self, file_id: str) -> tuple[bytes, str]: + """Retrieve file bytes and MIME type.""" + return await self.store.retrieve(file_id) + + async def get_file_info(self, file_id: str) -> dict[str, Any]: + """Return metadata about a stored file.""" + exists = await self.store.exists(file_id) + if not exists: + raise KeyError(f"File not found: {file_id}") + + data, mime_type = await self.store.retrieve(file_id) + return { + "file_id": file_id, + "mime_type": mime_type, + "size_bytes": len(data), + "extracted_text": self._extraction_cache.get(file_id), + } + + def get_cached_extraction(self, file_id: str) -> str | None: + """Return cached extracted text for a file_id.""" + return self._extraction_cache.get(file_id) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + @staticmethod + def _is_extractable(mime_type: str) -> bool: + extractable = { + "application/pdf", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/msword", + "text/html", + "text/xml", + "application/xml", + "text/markdown", + "text/csv", + "application/json", + "text/plain", + } + return mime_type.lower() in extractable diff --git a/agentflow_cli/src/app/routers/media/router.py b/agentflow_cli/src/app/routers/media/router.py new file mode 100644 index 0000000..a071a4d --- /dev/null +++ b/agentflow_cli/src/app/routers/media/router.py @@ -0,0 +1,127 @@ +"""File upload, retrieval, and multimodal config endpoints.""" + +from __future__ import annotations + +import mimetypes +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile +from fastapi.logger import logger +from fastapi.responses import Response +from injectq.integrations import InjectAPI + +from agentflow_cli.src.app.core.auth.permissions import RequirePermission +from agentflow_cli.src.app.core.config.media_settings import MediaSettings, get_media_settings +from agentflow_cli.src.app.routers.media import MediaService +from agentflow_cli.src.app.routers.media.schemas import ( + FileInfoResponse, + FileUploadResponse, + MultimodalConfigResponse, +) +from agentflow_cli.src.app.utils import success_response + +router = APIRouter(tags=["Files"]) + + +# ------------------------------------------------------------------ +# 4.1 POST /v1/files/upload +# ------------------------------------------------------------------ +@router.post( + "/v1/files/upload", + summary="Upload a file (image, audio, document)", + response_model=FileUploadResponse, + description=( + "Accepts a multipart file upload. Stores binary in the configured " + "MediaStore and optionally extracts text for documents." + ), +) +async def upload_file( + request: Request, + file: UploadFile, + service: MediaService = InjectAPI(MediaService), + user: dict[str, Any] = Depends(RequirePermission("files", "upload")), +): + if file.filename is None: + raise HTTPException(status_code=400, detail="filename is required") + + data = await file.read() + if not data: + raise HTTPException(status_code=400, detail="Empty file") + + mime = file.content_type or mimetypes.guess_type(file.filename)[0] or "application/octet-stream" + + logger.info("File upload: %s (%s, %d bytes)", file.filename, mime, len(data)) + + try: + result = await service.upload_file(data, file.filename, mime) + except ValueError as exc: + raise HTTPException(status_code=413, detail=str(exc)) + + return success_response(FileUploadResponse(**result), request) + + +# ------------------------------------------------------------------ +# 4.2 GET /v1/files/{file_id} — raw binary download +# ------------------------------------------------------------------ +@router.get( + "/v1/files/{file_id}", + summary="Retrieve file binary", + description="Returns the raw binary with the correct Content-Type.", +) +async def get_file( + file_id: str, + service: MediaService = InjectAPI(MediaService), + user: dict[str, Any] = Depends(RequirePermission("files", "read")), +): + try: + data, mime_type = await service.get_file(file_id) + except KeyError: + raise HTTPException(status_code=404, detail="File not found") + + return Response(content=data, media_type=mime_type) + + +# ------------------------------------------------------------------ +# 4.2 GET /v1/files/{file_id}/info — metadata only +# ------------------------------------------------------------------ +@router.get( + "/v1/files/{file_id}/info", + summary="Retrieve file metadata", + response_model=FileInfoResponse, +) +async def get_file_info( + request: Request, + file_id: str, + service: MediaService = InjectAPI(MediaService), + user: dict[str, Any] = Depends(RequirePermission("files", "read")), +): + try: + info = await service.get_file_info(file_id) + except KeyError: + raise HTTPException(status_code=404, detail="File not found") + + return success_response(FileInfoResponse(**info), request) + + +# ------------------------------------------------------------------ +# 4.4 GET /v1/config/multimodal — read current multimodal config +# ------------------------------------------------------------------ +@router.get( + "/v1/config/multimodal", + summary="Get multimodal configuration", + response_model=MultimodalConfigResponse, +) +async def get_multimodal_config( + request: Request, + user: dict[str, Any] = Depends(RequirePermission("config", "read")), +): + settings = get_media_settings() + return success_response( + MultimodalConfigResponse( + media_storage_type=settings.MEDIA_STORAGE_TYPE.value, + media_storage_path=settings.MEDIA_STORAGE_PATH, + media_max_size_mb=settings.MEDIA_MAX_SIZE_MB, + document_handling=settings.DOCUMENT_HANDLING, + ), + request, + ) diff --git a/agentflow_cli/src/app/routers/media/schemas.py b/agentflow_cli/src/app/routers/media/schemas.py new file mode 100644 index 0000000..441f0dd --- /dev/null +++ b/agentflow_cli/src/app/routers/media/schemas.py @@ -0,0 +1,28 @@ +"""Schemas for the media / file upload endpoints.""" + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class FileUploadResponse(BaseModel): + file_id: str = Field(..., description="Opaque storage key") + mime_type: str + size_bytes: int + filename: str + extracted_text: str | None = None + url: str = Field(..., description="Relative retrieval URL") + + +class FileInfoResponse(BaseModel): + file_id: str + mime_type: str + size_bytes: int + extracted_text: str | None = None + + +class MultimodalConfigResponse(BaseModel): + media_storage_type: str + media_storage_path: str + media_max_size_mb: float + document_handling: str diff --git a/agentflow_cli/src/app/routers/setup_router.py b/agentflow_cli/src/app/routers/setup_router.py index 62132c2..8f83a66 100644 --- a/agentflow_cli/src/app/routers/setup_router.py +++ b/agentflow_cli/src/app/routers/setup_router.py @@ -2,6 +2,7 @@ from .checkpointer.router import router as checkpointer_router from .graph import router as graph_router +from .media.router import router as media_router from .ping.router import router as ping_router from .store import router as store_router @@ -21,3 +22,4 @@ def init_routes(app: FastAPI): app.include_router(checkpointer_router) app.include_router(store_router) app.include_router(ping_router) + app.include_router(media_router) diff --git a/pyproject.toml b/pyproject.toml index 3a79a52..1b5538d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "10xscale-agentflow-cli" -version = "0.2.8" +version = "0.2.9" description = "CLI and API for 10xscale AgentFlow" readme = "README.md" license = {text = "MIT"} @@ -45,6 +45,7 @@ dependencies = [ "typer", "python-dotenv", "PyJWT", + "textxtract[pdf,docx,html,xml,md]>=0.2.0", ] [project.urls] @@ -54,10 +55,6 @@ Issues = "https://github.com/10xHub/agentflow-cli/issues" Documentation = "https://agentflow-cli.readthedocs.io/" [project.optional-dependencies] -a2a = [ - "a2a-sdk", - "httpx", -] sentry = [ "sentry-sdk>=2.10.0", ] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..81386ab --- /dev/null +++ b/requirements.txt @@ -0,0 +1,31 @@ +-e . + +# Runtime dependencies (from pyproject.toml) +10xscale-agentflow>=0.5.0 +fastapi +gunicorn +orjson +python-multipart +pydantic +pydantic-settings +uvicorn +typer +python-dotenv +PyJWT + +# Dev dependencies (from dependency-groups.dev) +snowflakekit +pytest>=8.4.2 +pytest-asyncio>=1.2.0 +pytest-cov>=7.0.0 +pytest-env>=1.1.5 +pytest-xdist>=3.8.0 +pre-commit>=3.8.0 +ruff==0.5.2 +mkdocs-gen-files==0.5.0 +mkdocstrings==0.25.2 +mypy-extensions==1.0.0 +httpx==0.27.0 +lib==4.0.0 +markdown-it-py==3.0.0 +requests==2.32.3 \ No newline at end of file diff --git a/tests/test_multimodal_sprint2_extraction.py b/tests/test_multimodal_sprint2_extraction.py new file mode 100644 index 0000000..6fb702f --- /dev/null +++ b/tests/test_multimodal_sprint2_extraction.py @@ -0,0 +1,214 @@ +"""Tests for Sprint 2 – DocumentExtractor & DocumentPipeline. + +These tests live in pyagenity-api because document extraction is an API concern +(not a core library concern). +""" + +import pytest + +from agentflow.media.config import DocumentHandling +from agentflow.state.message_block import DocumentBlock, MediaRef, TextBlock + +from agentflow_cli.media.extractor import ( + DocumentExtractor, + ExtractionError, + FileTypeNotSupportedError, +) +from agentflow_cli.media.pipeline import DocumentPipeline + + +# --------------------------------------------------------------------------- +# Fake textxtract extractor for testing +# --------------------------------------------------------------------------- + + +class FakeExtractor: + """Simulates textxtract.AsyncTextExtractor behavior.""" + + async def extract(self, data, filename=None): + if filename == "unsupported.xyz": + raise FileTypeNotSupportedError("unsupported file type") + if data == b"corrupt": + raise ExtractionError("extraction failed") + if isinstance(data, bytes): + return f"extracted:{len(data)}bytes" + return f"extracted:path={data}" + + +# --------------------------------------------------------------------------- +# DocumentExtractor tests +# --------------------------------------------------------------------------- + + +class TestDocumentExtractor: + @pytest.mark.asyncio + async def test_extract_bytes_success(self): + ext = DocumentExtractor(extractor=FakeExtractor()) + result = await ext.extract(b"hello", "doc.pdf") + assert result == "extracted:5bytes" + + @pytest.mark.asyncio + async def test_extract_path_success(self): + ext = DocumentExtractor(extractor=FakeExtractor()) + result = await ext.extract("/tmp/report.pdf") + assert result == "extracted:path=/tmp/report.pdf" + + @pytest.mark.asyncio + async def test_extract_bytes_no_filename_raises(self): + ext = DocumentExtractor(extractor=FakeExtractor()) + with pytest.raises(ValueError, match="filename must be provided"): + await ext.extract(b"hello") + + @pytest.mark.asyncio + async def test_unsupported_type_returns_none(self): + ext = DocumentExtractor(extractor=FakeExtractor()) + result = await ext.extract(b"abc", "unsupported.xyz") + assert result is None + + @pytest.mark.asyncio + async def test_extraction_error_raises_value_error(self): + ext = DocumentExtractor(extractor=FakeExtractor()) + with pytest.raises(ValueError, match="Failed to extract"): + await ext.extract(b"corrupt", "doc.pdf") + + def test_no_textxtract_raises_import_error(self, monkeypatch): + """If textxtract is not installed, instantiation raises ImportError.""" + import agentflow_cli.media.extractor as mod + + monkeypatch.setattr(mod, "AsyncTextExtractor", None) + with pytest.raises(ImportError, match="textxtract is required"): + DocumentExtractor() + + def test_custom_extractor_injection(self): + """Custom extractor can be injected for testing.""" + ext = DocumentExtractor(extractor=FakeExtractor()) + assert ext.extractor is not None + + +# --------------------------------------------------------------------------- +# DocumentPipeline tests +# --------------------------------------------------------------------------- + + +class TestDocumentPipeline: + @pytest.mark.asyncio + async def test_skip_returns_none(self): + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=FakeExtractor()), + handling=DocumentHandling.SKIP, + ) + block = DocumentBlock(media=MediaRef(kind="url", url="https://example.com/doc.pdf")) + result = await pipeline.process_document(block) + assert result is None + + @pytest.mark.asyncio + async def test_pass_raw_returns_original(self): + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=FakeExtractor()), + handling=DocumentHandling.PASS_RAW, + ) + block = DocumentBlock(media=MediaRef(kind="url", url="https://example.com/doc.pdf")) + result = await pipeline.process_document(block) + assert result is block + + @pytest.mark.asyncio + async def test_extract_text_from_base64(self): + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=FakeExtractor()), + handling=DocumentHandling.EXTRACT_TEXT, + ) + block = DocumentBlock( + media=MediaRef( + kind="data", + data_base64="YWJj", # base64 of b"abc" + mime_type="application/pdf", + filename="report.pdf", + ) + ) + result = await pipeline.process_document(block) + assert isinstance(result, TextBlock) + assert "extracted:" in result.text + + @pytest.mark.asyncio + async def test_extract_text_uses_excerpt_if_present(self): + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=FakeExtractor()), + handling=DocumentHandling.EXTRACT_TEXT, + ) + block = DocumentBlock( + media=MediaRef(kind="url", url="https://example.com/doc.pdf"), + excerpt="Already extracted text", + ) + result = await pipeline.process_document(block) + assert isinstance(result, TextBlock) + assert result.text == "Already extracted text" + + @pytest.mark.asyncio + async def test_extract_text_url_ref_pass_through(self): + """URL-referenced docs can't be extracted locally; return as-is.""" + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=FakeExtractor()), + handling=DocumentHandling.EXTRACT_TEXT, + ) + block = DocumentBlock(media=MediaRef(kind="url", url="https://example.com/doc.pdf")) + result = await pipeline.process_document(block) + assert result is block # Can't extract from URL => pass through + + @pytest.mark.asyncio + async def test_extract_text_unsupported_falls_back(self): + """When extraction returns None (unsupported), keep raw block.""" + + class UnsupportedExtractor: + async def extract(self, data, filename=None): + raise FileTypeNotSupportedError("nope") + + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=UnsupportedExtractor()), + handling=DocumentHandling.EXTRACT_TEXT, + ) + block = DocumentBlock( + media=MediaRef( + kind="data", + data_base64="YWJj", + mime_type="application/octet-stream", + filename="weird.dat", + ) + ) + result = await pipeline.process_document(block) + # Extractor returns None for unsupported => DocumentExtractor returns None => pipeline returns raw block + assert result is block + + @pytest.mark.asyncio + async def test_extract_rejects_non_document_block(self): + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=FakeExtractor()), + handling=DocumentHandling.EXTRACT_TEXT, + ) + with pytest.raises(ValueError, match="Expected DocumentBlock"): + await pipeline.process_document("not a block") + + @pytest.mark.asyncio + async def test_extract_uses_default_filename(self): + """When filename is missing, defaults to 'document.pdf'.""" + pipeline = DocumentPipeline( + document_extractor=DocumentExtractor(extractor=FakeExtractor()), + handling=DocumentHandling.EXTRACT_TEXT, + ) + block = DocumentBlock( + media=MediaRef(kind="data", data_base64="YWJj", mime_type="application/pdf") + ) + result = await pipeline.process_document(block) + assert isinstance(result, TextBlock) + + @pytest.mark.asyncio + async def test_lazy_extractor_init(self): + """Extractor is lazily initialized when not provided.""" + # This would raise ImportError if textxtract isn't installed, + # but we test the lazy property pattern + pipeline = DocumentPipeline(handling=DocumentHandling.SKIP) + assert pipeline._extractor is None + # Skip mode doesn't need extractor + result = await pipeline.process_document( + DocumentBlock(media=MediaRef(kind="url", url="https://x.com/a.pdf")) + ) + assert result is None diff --git a/uv.lock b/uv.lock index 1ad3a37..82143f0 100644 --- a/uv.lock +++ b/uv.lock @@ -22,7 +22,7 @@ wheels = [ [[package]] name = "10xscale-agentflow-cli" -version = "0.2.6" +version = "0.2.9" source = { editable = "." } dependencies = [ { name = "10xscale-agentflow" }, From fa563b6996edc0311f6ffe14723e9402774a793f Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Wed, 1 Apr 2026 00:27:42 +0600 Subject: [PATCH 2/4] refactor: improve formatting of error message in MediaService for file size validation --- .../app/routers/graph/services/multimodal_preprocessor.py | 6 +++++- agentflow_cli/src/app/routers/media/__init__.py | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py b/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py index d0242d3..abef1c3 100644 --- a/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py +++ b/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py @@ -39,7 +39,11 @@ async def preprocess_multimodal_messages( changed = False for block in msg.content: - if isinstance(block, DocumentBlock) and block.media.kind == "file_id" and block.media.file_id: + if ( + isinstance(block, DocumentBlock) + and block.media.kind == "file_id" + and block.media.file_id + ): cached = media_service.get_cached_extraction(block.media.file_id) if cached: new_content.append(TextBlock(text=cached)) diff --git a/agentflow_cli/src/app/routers/media/__init__.py b/agentflow_cli/src/app/routers/media/__init__.py index fe66aab..39c746f 100644 --- a/agentflow_cli/src/app/routers/media/__init__.py +++ b/agentflow_cli/src/app/routers/media/__init__.py @@ -105,8 +105,7 @@ async def upload_file( """ if len(data) > self.max_size_bytes: raise ValueError( - f"File size {len(data)} exceeds maximum " - f"{self._settings.MEDIA_MAX_SIZE_MB}MB" + f"File size {len(data)} exceeds maximum {self._settings.MEDIA_MAX_SIZE_MB}MB" ) # Store binary in the configured MediaStore From de676088a209b7ed5c3211e99341a7184a9359aa Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Wed, 1 Apr 2026 01:07:35 +0600 Subject: [PATCH 3/4] feat: add media service integration and multimodal preprocessing tests --- .../routers/graph/services/graph_service.py | 22 +- tests/test_sprint4_media_api.py | 294 ++++++++++++++++++ 2 files changed, 315 insertions(+), 1 deletion(-) create mode 100644 tests/test_sprint4_media_api.py diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index 62d4b96..08405d0 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -55,6 +55,21 @@ def __init__( self.checkpointer = checkpointer self.thread_name_generator = thread_name_generator + # Lazy import to avoid circular dependency + self._media_service = None + + @property + def media_service(self): + if self._media_service is None: + try: + container = InjectQ.get_instance() + from agentflow_cli.src.app.routers.media import MediaService + + self._media_service = container.try_get(MediaService) + except Exception: + self._media_service = None + return self._media_service + async def _save_thread_name( self, config: dict[str, Any], @@ -185,8 +200,13 @@ async def _prepare_input( config["recursion_limit"] = graph_input.recursion_limit or 25 # Prepare the input for the graph + # Preprocess multimodal messages (resolve file_id → cached text, etc.) + preprocessed = await preprocess_multimodal_messages( + graph_input.messages, + self.media_service, + ) input_data: dict = { - "messages": graph_input.messages, + "messages": preprocessed, } if graph_input.initial_state: input_data["state"] = graph_input.initial_state diff --git a/tests/test_sprint4_media_api.py b/tests/test_sprint4_media_api.py new file mode 100644 index 0000000..2ea833b --- /dev/null +++ b/tests/test_sprint4_media_api.py @@ -0,0 +1,294 @@ +"""Sprint 4 — API layer multimodal tests. + +Tests the file upload/retrieval endpoints, MediaService, multimodal +preprocessing, and config endpoint. +""" + +from __future__ import annotations + +import base64 +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# --------------------------------------------------------------------------- +# MediaService unit tests +# --------------------------------------------------------------------------- +from agentflow_cli.src.app.core.config.media_settings import MediaSettings, MediaStorageType + + +def _make_settings(**overrides) -> MediaSettings: + defaults = { + "MEDIA_STORAGE_TYPE": MediaStorageType.MEMORY, + "MEDIA_STORAGE_PATH": "./test_uploads", + "MEDIA_MAX_SIZE_MB": 1.0, + "DOCUMENT_HANDLING": "extract_text", + } + defaults.update(overrides) + return MediaSettings(**defaults) + + +class TestMediaService: + """Unit tests for MediaService.""" + + def _make_service(self, **kwargs): + from agentflow_cli.src.app.routers.media import MediaService + + return MediaService(settings=_make_settings(**kwargs)) + + @pytest.mark.asyncio + async def test_upload_file_stores_and_returns_metadata(self): + svc = self._make_service() + result = await svc.upload_file(b"hello world", "test.txt", "text/plain") + assert result["filename"] == "test.txt" + assert result["mime_type"] == "text/plain" + assert result["size_bytes"] == 11 + assert result["file_id"] + assert result["url"].startswith("/v1/files/") + + @pytest.mark.asyncio + async def test_upload_file_exceeds_max_size(self): + svc = self._make_service(MEDIA_MAX_SIZE_MB=0.0001) # ~100 bytes + with pytest.raises(ValueError, match="exceeds maximum"): + await svc.upload_file(b"x" * 200, "big.bin", "application/octet-stream") + + @pytest.mark.asyncio + async def test_upload_file_extracts_text_for_documents(self): + svc = self._make_service() + with patch.object( + svc.pipeline.extractor, + "extract", + new_callable=AsyncMock, + return_value="Extracted text!", + ): + result = await svc.upload_file(b"%PDF-1.4 ...", "doc.pdf", "application/pdf") + assert result["extracted_text"] == "Extracted text!" + + @pytest.mark.asyncio + async def test_upload_no_extraction_for_images(self): + svc = self._make_service() + result = await svc.upload_file(b"\xff\xd8\xff\xe0", "img.jpg", "image/jpeg") + assert result["extracted_text"] is None + + @pytest.mark.asyncio + async def test_get_file_round_trip(self): + svc = self._make_service() + result = await svc.upload_file(b"binary data", "test.bin", "application/octet-stream") + data, mime = await svc.get_file(result["file_id"]) + assert data == b"binary data" + assert mime == "application/octet-stream" + + @pytest.mark.asyncio + async def test_get_file_info(self): + svc = self._make_service() + result = await svc.upload_file(b"abc", "small.txt", "text/plain") + info = await svc.get_file_info(result["file_id"]) + assert info["mime_type"] == "text/plain" + assert info["size_bytes"] == 3 + + @pytest.mark.asyncio + async def test_get_file_not_found(self): + svc = self._make_service() + with pytest.raises(KeyError): + await svc.get_file("nonexistent-key") + + @pytest.mark.asyncio + async def test_get_file_info_not_found(self): + svc = self._make_service() + with pytest.raises(KeyError): + await svc.get_file_info("nonexistent-key") + + @pytest.mark.asyncio + async def test_cached_extraction_lookup(self): + svc = self._make_service() + with patch.object( + svc.pipeline.extractor, "extract", new_callable=AsyncMock, return_value="Cached!" + ): + result = await svc.upload_file(b"%PDF data", "report.pdf", "application/pdf") + assert svc.get_cached_extraction(result["file_id"]) == "Cached!" + assert svc.get_cached_extraction("missing") is None + + +# --------------------------------------------------------------------------- +# Multimodal preprocessor tests +# --------------------------------------------------------------------------- + + +class TestMultimodalPreprocessor: + """Tests for preprocess_multimodal_messages.""" + + @pytest.mark.asyncio + async def test_noop_when_no_media_service(self): + from agentflow.state import Message + + from agentflow_cli.src.app.routers.graph.services.multimodal_preprocessor import ( + preprocess_multimodal_messages, + ) + + msgs = [Message.text_message("hello")] + result = await preprocess_multimodal_messages(msgs, None) + assert result is msgs # same identity, no copy + + @pytest.mark.asyncio + async def test_document_file_id_resolved_to_text(self): + from agentflow.state import Message + from agentflow.state.message_block import DocumentBlock, MediaRef, TextBlock + + from agentflow_cli.src.app.routers.graph.services.multimodal_preprocessor import ( + preprocess_multimodal_messages, + ) + + doc_block = DocumentBlock( + media=MediaRef(kind="file_id", file_id="file-abc", mime_type="application/pdf"), + ) + msg = Message(role="user", content=[TextBlock(text="Read this"), doc_block]) + + mock_svc = MagicMock() + mock_svc.get_cached_extraction.return_value = "The extracted PDF text" + + result = await preprocess_multimodal_messages([msg], mock_svc) + # doc_block should be replaced with a TextBlock + assert len(result[0].content) == 2 + assert result[0].content[0].type == "text" + assert result[0].content[0].text == "Read this" + assert result[0].content[1].type == "text" + assert result[0].content[1].text == "The extracted PDF text" + + @pytest.mark.asyncio + async def test_image_file_id_to_agentflow_url(self): + from agentflow.state import Message + from agentflow.state.message_block import ImageBlock, MediaRef + + from agentflow_cli.src.app.routers.graph.services.multimodal_preprocessor import ( + preprocess_multimodal_messages, + ) + + img_block = ImageBlock( + media=MediaRef(kind="file_id", file_id="file-img-123", mime_type="image/png"), + ) + msg = Message(role="user", content=[img_block]) + + mock_svc = MagicMock() + mock_svc.get_cached_extraction.return_value = None + + result = await preprocess_multimodal_messages([msg], mock_svc) + media = result[0].content[0].media + assert media.kind == "url" + assert media.url == "agentflow://media/file-img-123" + + @pytest.mark.asyncio + async def test_text_only_message_unchanged(self): + from agentflow.state import Message + from agentflow.state.message_block import TextBlock + + from agentflow_cli.src.app.routers.graph.services.multimodal_preprocessor import ( + preprocess_multimodal_messages, + ) + + msg = Message(role="user", content=[TextBlock(text="hello")]) + mock_svc = MagicMock() + + result = await preprocess_multimodal_messages([msg], mock_svc) + assert result[0].content[0].text == "hello" + + @pytest.mark.asyncio + async def test_document_file_id_without_cached_text(self): + from agentflow.state import Message + from agentflow.state.message_block import DocumentBlock, MediaRef + + from agentflow_cli.src.app.routers.graph.services.multimodal_preprocessor import ( + preprocess_multimodal_messages, + ) + + doc = DocumentBlock( + media=MediaRef(kind="file_id", file_id="file-no-cache", mime_type="application/pdf"), + ) + msg = Message(role="user", content=[doc]) + + mock_svc = MagicMock() + mock_svc.get_cached_extraction.return_value = None + + result = await preprocess_multimodal_messages([msg], mock_svc) + # Should convert file_id → agentflow://media/ URL reference + media = result[0].content[0].media + assert media.kind == "url" + assert media.url == "agentflow://media/file-no-cache" + + +# --------------------------------------------------------------------------- +# MediaSettings tests +# --------------------------------------------------------------------------- + + +class TestMediaSettings: + def test_defaults(self): + s = MediaSettings() + assert s.MEDIA_STORAGE_TYPE == MediaStorageType.LOCAL + assert s.MEDIA_MAX_SIZE_MB == 25.0 + assert s.DOCUMENT_HANDLING == "extract_text" + + def test_memory_type(self): + s = MediaSettings(MEDIA_STORAGE_TYPE="memory") + assert s.MEDIA_STORAGE_TYPE == MediaStorageType.MEMORY + + +# --------------------------------------------------------------------------- +# Media store factory tests +# --------------------------------------------------------------------------- + + +class TestMediaStoreFactory: + def test_memory_store(self): + from agentflow.media.storage.memory_store import InMemoryMediaStore + from agentflow_cli.src.app.routers.media import _create_media_store + + s = _make_settings(MEDIA_STORAGE_TYPE=MediaStorageType.MEMORY) + store = _create_media_store(s) + assert isinstance(store, InMemoryMediaStore) + + def test_local_store(self): + from agentflow.media.storage.local_store import LocalFileMediaStore + from agentflow_cli.src.app.routers.media import _create_media_store + + s = _make_settings(MEDIA_STORAGE_TYPE=MediaStorageType.LOCAL) + store = _create_media_store(s) + assert isinstance(store, LocalFileMediaStore) + + def test_unknown_type_raises(self): + from agentflow_cli.src.app.routers.media import _create_media_store + + s = _make_settings() + s.MEDIA_STORAGE_TYPE = "bogus" + with pytest.raises(ValueError, match="Unknown"): + _create_media_store(s) + + +# --------------------------------------------------------------------------- +# Schemas tests +# --------------------------------------------------------------------------- + + +class TestSchemas: + def test_file_upload_response_schema(self): + from agentflow_cli.src.app.routers.media.schemas import FileUploadResponse + + r = FileUploadResponse( + file_id="abc", + mime_type="image/png", + size_bytes=1024, + filename="img.png", + url="/v1/files/abc", + ) + assert r.file_id == "abc" + assert r.extracted_text is None + + def test_multimodal_config_response_schema(self): + from agentflow_cli.src.app.routers.media.schemas import MultimodalConfigResponse + + r = MultimodalConfigResponse( + media_storage_type="local", + media_storage_path="./uploads", + media_max_size_mb=25.0, + document_handling="extract_text", + ) + assert r.media_storage_type == "local" From 563fd0cc58ef3d5b40a3cf21e2067df51f514b64 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Fri, 3 Apr 2026 00:44:56 +0600 Subject: [PATCH 4/4] feat: enhance media service with direct URL retrieval and caching mechanisms --- agentflow_cli/cli/commands/api.py | 16 +- agentflow_cli/media/__init__.py | 5 + agentflow_cli/media/_compat.py | 23 ++ agentflow_cli/media/extractor.py | 4 +- agentflow_cli/media/pipeline.py | 6 +- .../src/app/core/config/media_settings.py | 10 + .../services/checkpointer_service.py | 11 +- .../graph/services/multimodal_preprocessor.py | 26 +- .../src/app/routers/media/__init__.py | 248 ++++++++++++++++-- agentflow_cli/src/app/routers/media/router.py | 29 +- .../src/app/routers/media/schemas.py | 12 + .../routers/store/services/store_service.py | 11 +- tests/test_sprint4_media_api.py | 48 +++- 13 files changed, 420 insertions(+), 29 deletions(-) create mode 100644 agentflow_cli/media/_compat.py diff --git a/agentflow_cli/cli/commands/api.py b/agentflow_cli/cli/commands/api.py index 29f42fe..2b3617e 100644 --- a/agentflow_cli/cli/commands/api.py +++ b/agentflow_cli/cli/commands/api.py @@ -1,5 +1,6 @@ """API server command implementation.""" +import ipaddress import os import socket import sys @@ -196,8 +197,15 @@ def _build_playground_url( return f"{playground_base_url}?{urlencode({'backendUrl': backend_url})}" def _normalize_browser_host(self, host: str) -> str: - if host in {"0.0.0.0", "::", "[::]", ""}: + if not host: return "127.0.0.1" - if host.startswith("[") and host.endswith("]"): - return host[1:-1] - return host + + normalized_host = host[1:-1] if host.startswith("[") and host.endswith("]") else host + + try: + if ipaddress.ip_address(normalized_host).is_unspecified: + return "127.0.0.1" + except ValueError: + pass + + return normalized_host diff --git a/agentflow_cli/media/__init__.py b/agentflow_cli/media/__init__.py index e714b72..72f099f 100644 --- a/agentflow_cli/media/__init__.py +++ b/agentflow_cli/media/__init__.py @@ -5,9 +5,14 @@ The API platform auto-extracts using textxtract. """ +from ._compat import ensure_document_handling_aliases from .extractor import DocumentExtractor from .pipeline import DocumentPipeline + +ensure_document_handling_aliases() + + __all__ = [ "DocumentExtractor", "DocumentPipeline", diff --git a/agentflow_cli/media/_compat.py b/agentflow_cli/media/_compat.py new file mode 100644 index 0000000..fd11ed9 --- /dev/null +++ b/agentflow_cli/media/_compat.py @@ -0,0 +1,23 @@ +"""Compatibility helpers for upstream AgentFlow media enums.""" + +from __future__ import annotations + +from agentflow.media.config import DocumentHandling + + +def ensure_document_handling_aliases() -> None: + """Expose stable aliases across AgentFlow enum renames.""" + try: + _ = DocumentHandling.PASS_RAW + except AttributeError: + DocumentHandling.PASS_RAW = DocumentHandling.FORWARD_RAW + + try: + _ = DocumentHandling.FORWARD_RAW + except AttributeError: + DocumentHandling.FORWARD_RAW = DocumentHandling.PASS_RAW + + +ensure_document_handling_aliases() + +DOCUMENT_PASS_RAW = DocumentHandling.PASS_RAW diff --git a/agentflow_cli/media/extractor.py b/agentflow_cli/media/extractor.py index e390c1c..69dbcc2 100644 --- a/agentflow_cli/media/extractor.py +++ b/agentflow_cli/media/extractor.py @@ -10,6 +10,7 @@ import logging from typing import Any + logger = logging.getLogger("agentflow_cli.media.extractor") try: @@ -65,7 +66,8 @@ async def extract(self, data: bytes | str, filename: str | None = None) -> str | try: if isinstance(data, bytes): - assert filename is not None + if filename is None: + raise ValueError("filename must be provided when extracting from bytes") return await self.extractor.extract(data, filename) return await self.extractor.extract(data) diff --git a/agentflow_cli/media/pipeline.py b/agentflow_cli/media/pipeline.py index 3bcf01c..1328127 100644 --- a/agentflow_cli/media/pipeline.py +++ b/agentflow_cli/media/pipeline.py @@ -13,8 +13,10 @@ from agentflow.media.config import DocumentHandling from agentflow.state.message_block import DocumentBlock, TextBlock +from ._compat import DOCUMENT_PASS_RAW from .extractor import DocumentExtractor + logger = logging.getLogger("agentflow_cli.media.pipeline") @@ -23,7 +25,7 @@ class DocumentPipeline: Modes: - ``EXTRACT_TEXT``: Extract text via textxtract, return TextBlock. - - ``PASS_RAW``: Return the original DocumentBlock untouched. + - ``PASS_RAW``/``FORWARD_RAW``: Return the original DocumentBlock untouched. - ``SKIP``: Drop the document entirely (returns None). """ @@ -55,7 +57,7 @@ async def process_document(self, document_block: Any) -> Any | None: if self.handling == DocumentHandling.SKIP: return None - if self.handling == DocumentHandling.PASS_RAW: + if self.handling == DOCUMENT_PASS_RAW: return document_block # EXTRACT_TEXT path diff --git a/agentflow_cli/src/app/core/config/media_settings.py b/agentflow_cli/src/app/core/config/media_settings.py index 2c7868b..d3590a2 100644 --- a/agentflow_cli/src/app/core/config/media_settings.py +++ b/agentflow_cli/src/app/core/config/media_settings.py @@ -8,6 +8,7 @@ from pydantic_settings import BaseSettings + logger = logging.getLogger("agentflow-cli.media") @@ -30,6 +31,15 @@ class MediaSettings(BaseSettings): MEDIA_CLOUD_PROVIDER: str = "aws" # aws | gcp MEDIA_CLOUD_BUCKET: str = "" MEDIA_CLOUD_REGION: str = "us-east-1" + MEDIA_CLOUD_PREFIX: str = "agentflow-media" + MEDIA_CLOUD_ACCESS_KEY_ID: str | None = None + MEDIA_CLOUD_SECRET_ACCESS_KEY: str | None = None + MEDIA_CLOUD_SESSION_TOKEN: str | None = None + MEDIA_CLOUD_PROJECT_ID: str | None = None + MEDIA_CLOUD_CREDENTIALS_JSON: str | None = None + + MEDIA_SIGNED_URL_TTL_SECONDS: int = 3600 + MEDIA_SIGNED_URL_REFRESH_BUFFER_SECONDS: int = 60 class Config: extra = "allow" diff --git a/agentflow_cli/src/app/routers/checkpointer/services/checkpointer_service.py b/agentflow_cli/src/app/routers/checkpointer/services/checkpointer_service.py index 03c374f..88335d5 100644 --- a/agentflow_cli/src/app/routers/checkpointer/services/checkpointer_service.py +++ b/agentflow_cli/src/app/routers/checkpointer/services/checkpointer_service.py @@ -18,6 +18,15 @@ from agentflow_cli.src.app.utils.parse_output import parse_state_output +class CheckpointerUnavailableError(HTTPException, ValueError): + """Raised when the checkpointer service has not been configured.""" + + def __init__(self) -> None: + detail = "Checkpointer is not configured" + HTTPException.__init__(self, status_code=503, detail=detail) + ValueError.__init__(self, detail) + + @singleton class CheckpointerService: @inject @@ -27,7 +36,7 @@ def __init__(self, checkpointer: BaseCheckpointer): def _config(self, config: dict[str, Any] | None, user: dict) -> dict[str, Any]: if not self.checkpointer: - raise HTTPException(status_code=503, detail="Checkpointer service is not available") + raise CheckpointerUnavailableError() cfg: dict[str, Any] = dict(config or {}) cfg["user"] = user diff --git a/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py b/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py index abef1c3..079e4ed 100644 --- a/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py +++ b/agentflow_cli/src/app/routers/graph/services/multimodal_preprocessor.py @@ -2,11 +2,13 @@ from __future__ import annotations +import inspect import logging from typing import TYPE_CHECKING from agentflow.state import Message -from agentflow.state.message_block import DocumentBlock, ImageBlock, TextBlock +from agentflow.state.message_block import DocumentBlock, TextBlock + if TYPE_CHECKING: from agentflow_cli.src.app.routers.media import MediaService @@ -14,9 +16,27 @@ logger = logging.getLogger("agentflow-cli.media") +async def _get_cached_extraction(media_service: MediaService, file_id: str) -> str | None: + """Read cached extraction from async or sync service APIs.""" + async_getter = getattr(media_service, "aget_cached_extraction", None) + if callable(async_getter): + result = async_getter(file_id) + if inspect.isawaitable(result): + return await result + + sync_getter = getattr(media_service, "get_cached_extraction", None) + if callable(sync_getter): + return sync_getter(file_id) + + if callable(async_getter): + return result + + return None + + async def preprocess_multimodal_messages( messages: list[Message], - media_service: "MediaService | None", + media_service: MediaService | None, ) -> list[Message]: """Resolve file_id references in messages before graph execution. @@ -44,7 +64,7 @@ async def preprocess_multimodal_messages( and block.media.kind == "file_id" and block.media.file_id ): - cached = media_service.get_cached_extraction(block.media.file_id) + cached = await _get_cached_extraction(media_service, block.media.file_id) if cached: new_content.append(TextBlock(text=cached)) changed = True diff --git a/agentflow_cli/src/app/routers/media/__init__.py b/agentflow_cli/src/app/routers/media/__init__.py index 39c746f..917c473 100644 --- a/agentflow_cli/src/app/routers/media/__init__.py +++ b/agentflow_cli/src/app/routers/media/__init__.py @@ -2,19 +2,39 @@ from __future__ import annotations +import inspect +import json import logging +import time from typing import Any +from agentflow.checkpointer import BaseCheckpointer from agentflow.media.config import DocumentHandling from agentflow.media.storage.base import BaseMediaStore -from injectq import inject, singleton +from injectq import InjectQ, inject, singleton +from agentflow_cli.media._compat import DOCUMENT_PASS_RAW, ensure_document_handling_aliases from agentflow_cli.media.extractor import DocumentExtractor from agentflow_cli.media.pipeline import DocumentPipeline from agentflow_cli.src.app.core.config.media_settings import MediaSettings, MediaStorageType + logger = logging.getLogger("agentflow-cli.media") +ensure_document_handling_aliases() + +_SIGNED_URL_NAMESPACE = "media:signed-url" +_EXTRACTION_NAMESPACE = "media:extraction" + + +def _build_config_instance(config_cls: type[Any], values: dict[str, Any]) -> Any: + """Instantiate third-party config classes using only supported kwargs.""" + params = inspect.signature(config_cls).parameters + supported = { + name: value for name, value in values.items() if name in params and value not in (None, "") + } + return config_cls(**supported) + def _create_media_store(settings: MediaSettings) -> BaseMediaStore: """Factory that instantiates the right MediaStore from settings.""" @@ -32,11 +52,58 @@ def _create_media_store(settings: MediaSettings) -> BaseMediaStore: if stype == MediaStorageType.CLOUD: from agentflow.media.storage.cloud_store import CloudMediaStore + from cloud_storage_manager import ( + AwsConfig, + CloudStorageFactory, + StorageConfig, + StorageProvider, + ) + + provider_name = settings.MEDIA_CLOUD_PROVIDER.lower() + provider = getattr(StorageProvider, provider_name.upper(), None) + if provider is None and provider_name == "gcs": + provider = getattr(StorageProvider, "GCP", None) + if provider is None: + raise ValueError(f"Unsupported MEDIA_CLOUD_PROVIDER: {settings.MEDIA_CLOUD_PROVIDER}") + + storage_kwargs: dict[str, Any] = {} + + if provider_name == "aws": + storage_kwargs["aws"] = _build_config_instance( + AwsConfig, + { + "bucket_name": settings.MEDIA_CLOUD_BUCKET, + "region": settings.MEDIA_CLOUD_REGION, + "region_name": settings.MEDIA_CLOUD_REGION, + "access_key_id": settings.MEDIA_CLOUD_ACCESS_KEY_ID, + "secret_access_key": settings.MEDIA_CLOUD_SECRET_ACCESS_KEY, + "session_token": settings.MEDIA_CLOUD_SESSION_TOKEN, + }, + ) + else: + from cloud_storage_manager import GcpConfig + + credentials = None + if settings.MEDIA_CLOUD_CREDENTIALS_JSON: + try: + credentials = json.loads(settings.MEDIA_CLOUD_CREDENTIALS_JSON) + except json.JSONDecodeError as exc: + raise ValueError("MEDIA_CLOUD_CREDENTIALS_JSON must be valid JSON") from exc + + storage_kwargs["gcp"] = _build_config_instance( + GcpConfig, + { + "bucket_name": settings.MEDIA_CLOUD_BUCKET, + "project_id": settings.MEDIA_CLOUD_PROJECT_ID, + "credentials": credentials, + "credentials_json": credentials, + }, + ) + storage = CloudStorageFactory.get_storage(provider, StorageConfig(**storage_kwargs)) return CloudMediaStore( - provider=settings.MEDIA_CLOUD_PROVIDER, - bucket=settings.MEDIA_CLOUD_BUCKET, - region=settings.MEDIA_CLOUD_REGION, + storage=storage, + prefix=settings.MEDIA_CLOUD_PREFIX, ) if stype == MediaStorageType.PG: @@ -50,7 +117,7 @@ def _create_media_store(settings: MediaSettings) -> BaseMediaStore: def _create_document_pipeline(settings: MediaSettings) -> DocumentPipeline: handling_map = { "extract_text": DocumentHandling.EXTRACT_TEXT, - "pass_raw": DocumentHandling.PASS_RAW, + "pass_raw": DOCUMENT_PASS_RAW, "skip": DocumentHandling.SKIP, } handling = handling_map.get(settings.DOCUMENT_HANDLING, DocumentHandling.EXTRACT_TEXT) @@ -64,13 +131,19 @@ class MediaService: """Central media service providing storage + document extraction.""" @inject - def __init__(self, settings: MediaSettings | None = None): + def __init__( + self, + settings: MediaSettings | None = None, + checkpointer: BaseCheckpointer | None = None, + ): from agentflow_cli.src.app.core.config.media_settings import get_media_settings self._settings = settings or get_media_settings() + self._checkpointer = checkpointer self._store: BaseMediaStore | None = None self._pipeline: DocumentPipeline | None = None self._extraction_cache: dict[str, str] = {} # file_id -> extracted text + self._signed_url_cache: dict[str, dict[str, Any]] = {} # file_id -> signed URL payload @property def store(self) -> BaseMediaStore: @@ -84,6 +157,15 @@ def pipeline(self) -> DocumentPipeline: self._pipeline = _create_document_pipeline(self._settings) return self._pipeline + @property + def checkpointer(self) -> BaseCheckpointer | None: + if self._checkpointer is None: + try: + self._checkpointer = InjectQ.get_instance().try_get(BaseCheckpointer) + except Exception: + self._checkpointer = None + return self._checkpointer + @property def max_size_bytes(self) -> int: return int(self._settings.MEDIA_MAX_SIZE_MB * 1024 * 1024) @@ -122,18 +204,30 @@ async def upload_file( "filename": filename, "extracted_text": None, "url": f"/v1/files/{storage_key}", + "direct_url": None, + "direct_url_expires_at": None, } # If it's a document, try extraction - if self._is_extractable(mime_type): + if self.pipeline.handling == DocumentHandling.EXTRACT_TEXT and self._is_extractable( + mime_type + ): try: text = await self.pipeline.extractor.extract(data, filename) if text: result["extracted_text"] = text - self._extraction_cache[storage_key] = text + await self._cache_extraction(storage_key, text) except Exception as exc: logger.warning("Extraction failed for %s: %s", filename, exc) + direct_url_info = await self.get_direct_url_info( + storage_key, + mime_type=mime_type, + ) + if direct_url_info: + result["direct_url"] = direct_url_info["url"] + result["direct_url_expires_at"] = direct_url_info["expires_at"] + return result async def get_file(self, file_id: str) -> tuple[bytes, str]: @@ -142,22 +236,146 @@ async def get_file(self, file_id: str) -> tuple[bytes, str]: async def get_file_info(self, file_id: str) -> dict[str, Any]: """Return metadata about a stored file.""" - exists = await self.store.exists(file_id) - if not exists: + metadata = await self.store.get_metadata(file_id) + if metadata is None: raise KeyError(f"File not found: {file_id}") - data, mime_type = await self.store.retrieve(file_id) - return { + info = { "file_id": file_id, - "mime_type": mime_type, - "size_bytes": len(data), - "extracted_text": self._extraction_cache.get(file_id), + "mime_type": metadata["mime_type"], + "size_bytes": metadata["size_bytes"], + "filename": metadata.get("filename"), + "extracted_text": await self.aget_cached_extraction(file_id), + "direct_url": None, + "direct_url_expires_at": None, } + direct_url_info = await self.get_direct_url_info( + file_id, + mime_type=metadata["mime_type"], + ) + if direct_url_info: + info["direct_url"] = direct_url_info["url"] + info["direct_url_expires_at"] = direct_url_info["expires_at"] + return info + + async def get_direct_url_info( + self, + file_id: str, + mime_type: str | None = None, + expiration_seconds: int | None = None, + ) -> dict[str, Any] | None: + """Return a cached direct URL payload when the store supports it.""" + ttl = expiration_seconds or self._settings.MEDIA_SIGNED_URL_TTL_SECONDS + refresh_buffer = self._settings.MEDIA_SIGNED_URL_REFRESH_BUFFER_SECONDS + + if mime_type is None: + metadata = await self.store.get_metadata(file_id) + if metadata is None: + raise KeyError(f"File not found: {file_id}") + mime_type = metadata["mime_type"] + + cache_key = f"{file_id}:{mime_type}:{ttl}" + cached = await self._get_cached_payload( + namespace=_SIGNED_URL_NAMESPACE, + cache_key=cache_key, + local_cache=self._signed_url_cache, + ) + if cached and cached.get("expires_at", 0) > int(time.time()) + refresh_buffer: + return cached + + direct_url = await self.store.get_direct_url( + file_id, + mime_type=mime_type, + expiration=ttl, + ) + if direct_url is None: + return None + + payload = { + "url": direct_url, + "expires_at": int(time.time() + ttl), + } + await self._cache_payload( + namespace=_SIGNED_URL_NAMESPACE, + cache_key=cache_key, + payload=payload, + local_cache=self._signed_url_cache, + ttl_seconds=ttl, + ) + return payload def get_cached_extraction(self, file_id: str) -> str | None: """Return cached extracted text for a file_id.""" return self._extraction_cache.get(file_id) + async def aget_cached_extraction(self, file_id: str) -> str | None: + """Return cached extracted text from local memory or shared cache.""" + cached = self._extraction_cache.get(file_id) + if cached is not None: + return cached + + payload = await self._get_cached_payload( + namespace=_EXTRACTION_NAMESPACE, + cache_key=file_id, + local_cache=None, + ) + if isinstance(payload, dict): + text = payload.get("text") + if isinstance(text, str): + self._extraction_cache[file_id] = text + return text + return None + + async def _cache_extraction(self, file_id: str, text: str) -> None: + self._extraction_cache[file_id] = text + await self._cache_payload( + namespace=_EXTRACTION_NAMESPACE, + cache_key=file_id, + payload={"text": text}, + local_cache=None, + ttl_seconds=24 * 3600, + ) + + async def _get_cached_payload( + self, + namespace: str, + cache_key: str, + local_cache: dict[str, dict[str, Any]] | None, + ) -> dict[str, Any] | None: + if local_cache is not None: + cached = local_cache.get(cache_key) + if cached and cached.get("expires_at", float("inf")) > time.time(): + return cached + if cached: + local_cache.pop(cache_key, None) + + if self.checkpointer is None: + return None + + payload = await self.checkpointer.aget_cache_value(namespace, cache_key) + if isinstance(payload, dict) and local_cache is not None: + local_cache[cache_key] = payload + return payload if isinstance(payload, dict) else None + + async def _cache_payload( + self, + namespace: str, + cache_key: str, + payload: dict[str, Any], + local_cache: dict[str, dict[str, Any]] | None, + ttl_seconds: int, + ) -> None: + if local_cache is not None: + local_cache[cache_key] = payload + + if self.checkpointer is not None: + await self.checkpointer.aput_cache_value( + namespace, + cache_key, + payload, + ttl_seconds=ttl_seconds, + ) + # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ diff --git a/agentflow_cli/src/app/routers/media/router.py b/agentflow_cli/src/app/routers/media/router.py index a071a4d..eee7286 100644 --- a/agentflow_cli/src/app/routers/media/router.py +++ b/agentflow_cli/src/app/routers/media/router.py @@ -11,15 +11,17 @@ from injectq.integrations import InjectAPI from agentflow_cli.src.app.core.auth.permissions import RequirePermission -from agentflow_cli.src.app.core.config.media_settings import MediaSettings, get_media_settings +from agentflow_cli.src.app.core.config.media_settings import get_media_settings from agentflow_cli.src.app.routers.media import MediaService from agentflow_cli.src.app.routers.media.schemas import ( + FileAccessUrlResponse, FileInfoResponse, FileUploadResponse, MultimodalConfigResponse, ) from agentflow_cli.src.app.utils import success_response + router = APIRouter(tags=["Files"]) @@ -103,6 +105,31 @@ async def get_file_info( return success_response(FileInfoResponse(**info), request) +@router.get( + "/v1/files/{file_id}/url", + summary="Retrieve direct access URL", + response_model=FileAccessUrlResponse, +) +async def get_file_access_url( + request: Request, + file_id: str, + service: MediaService = InjectAPI(MediaService), + user: dict[str, Any] = Depends(RequirePermission("files", "read")), +): + try: + info = await service.get_file_info(file_id) + except KeyError: + raise HTTPException(status_code=404, detail="File not found") + + payload = FileAccessUrlResponse( + file_id=file_id, + url=info["direct_url"] or f"/v1/files/{file_id}", + expires_at=info.get("direct_url_expires_at"), + mime_type=info["mime_type"], + ) + return success_response(payload, request) + + # ------------------------------------------------------------------ # 4.4 GET /v1/config/multimodal — read current multimodal config # ------------------------------------------------------------------ diff --git a/agentflow_cli/src/app/routers/media/schemas.py b/agentflow_cli/src/app/routers/media/schemas.py index 441f0dd..ff3818b 100644 --- a/agentflow_cli/src/app/routers/media/schemas.py +++ b/agentflow_cli/src/app/routers/media/schemas.py @@ -12,13 +12,25 @@ class FileUploadResponse(BaseModel): filename: str extracted_text: str | None = None url: str = Field(..., description="Relative retrieval URL") + direct_url: str | None = None + direct_url_expires_at: int | None = None class FileInfoResponse(BaseModel): file_id: str mime_type: str size_bytes: int + filename: str | None = None extracted_text: str | None = None + direct_url: str | None = None + direct_url_expires_at: int | None = None + + +class FileAccessUrlResponse(BaseModel): + file_id: str + url: str + expires_at: int | None = None + mime_type: str class MultimodalConfigResponse(BaseModel): diff --git a/agentflow_cli/src/app/routers/store/services/store_service.py b/agentflow_cli/src/app/routers/store/services/store_service.py index c8df152..2925f4b 100644 --- a/agentflow_cli/src/app/routers/store/services/store_service.py +++ b/agentflow_cli/src/app/routers/store/services/store_service.py @@ -21,6 +21,15 @@ ) +class StoreUnavailableError(HTTPException, ValueError): + """Raised when the store service has not been configured.""" + + def __init__(self) -> None: + detail = "Store is not configured" + HTTPException.__init__(self, status_code=503, detail=detail) + ValueError.__init__(self, detail) + + @singleton class StoreService: """Service layer wrapping interactions with the configured BaseStore.""" @@ -31,7 +40,7 @@ def __init__(self, store: BaseStore | None): def _get_store(self) -> BaseStore: if not self.store: - raise HTTPException(status_code=503, detail="Store service is not available") + raise StoreUnavailableError() return self.store def _config(self, config: dict[str, Any] | None, user: dict[str, Any]) -> dict[str, Any]: diff --git a/tests/test_sprint4_media_api.py b/tests/test_sprint4_media_api.py index 2ea833b..1db44f1 100644 --- a/tests/test_sprint4_media_api.py +++ b/tests/test_sprint4_media_api.py @@ -14,6 +14,7 @@ # --------------------------------------------------------------------------- # MediaService unit tests # --------------------------------------------------------------------------- +from agentflow.checkpointer import InMemoryCheckpointer from agentflow_cli.src.app.core.config.media_settings import MediaSettings, MediaStorageType @@ -34,7 +35,10 @@ class TestMediaService: def _make_service(self, **kwargs): from agentflow_cli.src.app.routers.media import MediaService - return MediaService(settings=_make_settings(**kwargs)) + return MediaService( + settings=_make_settings(**kwargs), + checkpointer=InMemoryCheckpointer(), + ) @pytest.mark.asyncio async def test_upload_file_stores_and_returns_metadata(self): @@ -85,6 +89,7 @@ async def test_get_file_info(self): info = await svc.get_file_info(result["file_id"]) assert info["mime_type"] == "text/plain" assert info["size_bytes"] == 3 + assert info["filename"] == "small.txt" @pytest.mark.asyncio async def test_get_file_not_found(self): @@ -106,8 +111,49 @@ async def test_cached_extraction_lookup(self): ): result = await svc.upload_file(b"%PDF data", "report.pdf", "application/pdf") assert svc.get_cached_extraction(result["file_id"]) == "Cached!" + assert await svc.aget_cached_extraction(result["file_id"]) == "Cached!" assert svc.get_cached_extraction("missing") is None + @pytest.mark.asyncio + async def test_get_file_info_uses_store_metadata_without_blob_download(self): + svc = self._make_service() + mock_store = MagicMock() + mock_store.get_metadata = AsyncMock( + return_value={ + "mime_type": "image/png", + "size_bytes": 123, + "filename": "img.png", + } + ) + mock_store.get_direct_url = AsyncMock(return_value=None) + mock_store.retrieve = AsyncMock(side_effect=AssertionError("retrieve should not be called")) + svc._store = mock_store + + info = await svc.get_file_info("file-123") + + assert info["mime_type"] == "image/png" + assert info["size_bytes"] == 123 + assert info["filename"] == "img.png" + mock_store.retrieve.assert_not_called() + + @pytest.mark.asyncio + async def test_get_direct_url_info_uses_cache(self): + svc = self._make_service() + mock_store = MagicMock() + mock_store.get_direct_url = AsyncMock(return_value="https://signed.example.com/file") + svc._store = mock_store + + first = await svc.get_direct_url_info("file-123", mime_type="image/png") + second = await svc.get_direct_url_info("file-123", mime_type="image/png") + + assert first == second + assert first["url"] == "https://signed.example.com/file" + mock_store.get_direct_url.assert_awaited_once_with( + "file-123", + mime_type="image/png", + expiration=3600, + ) + # --------------------------------------------------------------------------- # Multimodal preprocessor tests