Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions agentflow_cli/cli/commands/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""API server command implementation."""

import ipaddress
import os
import socket
import sys
Expand Down Expand Up @@ -196,8 +197,15 @@ def _build_playground_url(
return f"{playground_base_url}?{urlencode({'backendUrl': backend_url})}"

def _normalize_browser_host(self, host: str) -> str:
if host in {"0.0.0.0", "::", "[::]", ""}:
if not host:
return "127.0.0.1"
if host.startswith("[") and host.endswith("]"):
return host[1:-1]
return host

normalized_host = host[1:-1] if host.startswith("[") and host.endswith("]") else host

try:
if ipaddress.ip_address(normalized_host).is_unspecified:
return "127.0.0.1"
except ValueError:
pass

return normalized_host
19 changes: 19 additions & 0 deletions agentflow_cli/media/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Media processing module for pyagenity-api.

Document extraction lives here (not in PyAgenity core) because
the core library stays lightweight — SDK users extract text themselves.
The API platform auto-extracts using textxtract.
"""

from ._compat import ensure_document_handling_aliases
from .extractor import DocumentExtractor
from .pipeline import DocumentPipeline


ensure_document_handling_aliases()


__all__ = [
"DocumentExtractor",
"DocumentPipeline",
]
23 changes: 23 additions & 0 deletions agentflow_cli/media/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Compatibility helpers for upstream AgentFlow media enums."""

from __future__ import annotations

from agentflow.media.config import DocumentHandling


def ensure_document_handling_aliases() -> None:
"""Expose stable aliases across AgentFlow enum renames."""
try:
_ = DocumentHandling.PASS_RAW
except AttributeError:
DocumentHandling.PASS_RAW = DocumentHandling.FORWARD_RAW

try:
_ = DocumentHandling.FORWARD_RAW
except AttributeError:
DocumentHandling.FORWARD_RAW = DocumentHandling.PASS_RAW


ensure_document_handling_aliases()

DOCUMENT_PASS_RAW = DocumentHandling.PASS_RAW
79 changes: 79 additions & 0 deletions agentflow_cli/media/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Document extraction service using textxtract.

Wraps ``AsyncTextExtractor`` from the textxtract library.
This module lives in pyagenity-api, NOT in the core PyAgenity library,
because extraction is an API-platform concern.
"""

from __future__ import annotations

import logging
from typing import Any


logger = logging.getLogger("agentflow_cli.media.extractor")

try:
from textxtract import AsyncTextExtractor
from textxtract.core.exceptions import ExtractionError, FileTypeNotSupportedError
except ImportError: # pragma: no cover
AsyncTextExtractor = None # type: ignore[assignment]

class FileTypeNotSupportedError(Exception): # type: ignore[no-redef]
"""Fallback exception when textxtract is unavailable."""

class ExtractionError(Exception): # type: ignore[no-redef]
"""Fallback exception when textxtract is unavailable."""


class DocumentExtractor:
"""Wraps textxtract AsyncTextExtractor for API-side document extraction.

Examples::

extractor = DocumentExtractor()
text = await extractor.extract(pdf_bytes, "report.pdf")
"""

def __init__(self, extractor: Any | None = None):
if extractor is not None:
self.extractor = extractor
return

if AsyncTextExtractor is None:
raise ImportError(
"textxtract is required for document extraction. "
"Install with `pip install textxtract[pdf,docx,html,xml,md]`"
)

self.extractor = AsyncTextExtractor()

async def extract(self, data: bytes | str, filename: str | None = None) -> str | None:
"""Extract text from bytes or a local path.

Args:
data: Raw bytes or local path.
filename: Required when passing bytes so type can be detected.

Returns:
Extracted text, or ``None`` when file type is unsupported.

Raises:
ValueError: For missing filename or failed extraction.
"""
if isinstance(data, bytes) and not filename:
raise ValueError("filename must be provided when extracting from bytes")

try:
if isinstance(data, bytes):
if filename is None:
raise ValueError("filename must be provided when extracting from bytes")
return await self.extractor.extract(data, filename)
return await self.extractor.extract(data)

except FileTypeNotSupportedError:
logger.warning("Document type not supported for extraction: %s", filename)
return None
except ExtractionError as exc:
logger.exception("Document extraction failed for %s", filename)
raise ValueError("Failed to extract text from document") from exc
84 changes: 84 additions & 0 deletions agentflow_cli/media/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Document processing pipeline for the API layer.

Orchestrates: receive uploaded file -> extract text via DocumentExtractor
-> return TextBlock or DocumentBlock depending on config.
"""

from __future__ import annotations

import logging
from base64 import b64decode
from typing import Any

from agentflow.media.config import DocumentHandling
from agentflow.state.message_block import DocumentBlock, TextBlock

from ._compat import DOCUMENT_PASS_RAW
from .extractor import DocumentExtractor


logger = logging.getLogger("agentflow_cli.media.pipeline")


class DocumentPipeline:
"""Document processing pipeline with extraction config.

Modes:
- ``EXTRACT_TEXT``: Extract text via textxtract, return TextBlock.
- ``PASS_RAW``/``FORWARD_RAW``: Return the original DocumentBlock untouched.
- ``SKIP``: Drop the document entirely (returns None).
"""

def __init__(
self,
document_extractor: DocumentExtractor | None = None,
handling: DocumentHandling = DocumentHandling.EXTRACT_TEXT,
):
self.handling = handling
# Lazy-init extractor only when actually needed (EXTRACT_TEXT mode)
self._extractor = document_extractor

@property
def extractor(self) -> DocumentExtractor:
if self._extractor is None:
self._extractor = DocumentExtractor()
return self._extractor

async def process_document(self, document_block: Any) -> Any | None:
"""Process a DocumentBlock according to the handling policy.

Args:
document_block: A ``DocumentBlock`` instance.

Returns:
A ``TextBlock`` (if extracted), the original ``DocumentBlock``
(if pass-raw or extraction unsupported), or ``None`` (if skip).
"""
if self.handling == DocumentHandling.SKIP:
return None

if self.handling == DOCUMENT_PASS_RAW:
return document_block

# EXTRACT_TEXT path
if not isinstance(document_block, DocumentBlock):
raise ValueError("Expected DocumentBlock in pipeline")

# If an excerpt already exists, return it as text
if document_block.excerpt and document_block.excerpt.strip():
return TextBlock(text=document_block.excerpt)

media = document_block.media

if media.kind == "data" and media.data_base64:
decoded = b64decode(media.data_base64)
filename = media.filename or "document.pdf"
extracted = await self.extractor.extract(decoded, filename)
if extracted:
return TextBlock(text=extracted)
# Extraction returned None (unsupported type) — keep raw
return document_block

# For external URLs or provider file_id, we cannot extract directly.
# Keep as-is so API layer may resolve later.
return document_block
50 changes: 50 additions & 0 deletions agentflow_cli/src/app/core/config/media_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Media / multimodal configuration loaded from environment variables."""

from __future__ import annotations

import logging
from enum import Enum
from functools import lru_cache

from pydantic_settings import BaseSettings


logger = logging.getLogger("agentflow-cli.media")


class MediaStorageType(str, Enum):
MEMORY = "memory"
LOCAL = "local"
CLOUD = "cloud"
PG = "pg"


class MediaSettings(BaseSettings):
"""Settings loaded from env vars (prefix-free, matches plan)."""

MEDIA_STORAGE_TYPE: MediaStorageType = MediaStorageType.LOCAL
MEDIA_STORAGE_PATH: str = "./uploads"
MEDIA_MAX_SIZE_MB: float = 25.0
DOCUMENT_HANDLING: str = "extract_text" # extract_text | pass_raw | skip

# Cloud storage (S3/GCS) — only used when MEDIA_STORAGE_TYPE=cloud
MEDIA_CLOUD_PROVIDER: str = "aws" # aws | gcp
MEDIA_CLOUD_BUCKET: str = ""
MEDIA_CLOUD_REGION: str = "us-east-1"
MEDIA_CLOUD_PREFIX: str = "agentflow-media"
MEDIA_CLOUD_ACCESS_KEY_ID: str | None = None
MEDIA_CLOUD_SECRET_ACCESS_KEY: str | None = None
MEDIA_CLOUD_SESSION_TOKEN: str | None = None
MEDIA_CLOUD_PROJECT_ID: str | None = None
MEDIA_CLOUD_CREDENTIALS_JSON: str | None = None

MEDIA_SIGNED_URL_TTL_SECONDS: int = 3600
MEDIA_SIGNED_URL_REFRESH_BUFFER_SECONDS: int = 60

class Config:
extra = "allow"


@lru_cache
def get_media_settings() -> MediaSettings:
return MediaSettings() # type: ignore[call-arg]
12 changes: 12 additions & 0 deletions agentflow_cli/src/app/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ async def attach_all_modules(
authorization_path = config.authorization_path
load_and_bind_authorization(container, authorization_path)

# --- Media service wiring ---
from agentflow_cli.src.app.core.config.media_settings import (
MediaSettings,
get_media_settings,
)
from agentflow_cli.src.app.routers.media import MediaService

media_settings = get_media_settings()
container.bind_instance(MediaSettings, media_settings)
media_service = MediaService(settings=media_settings)
container.bind_instance(MediaService, media_service)

logger.info("Container loaded successfully")
logger.debug(f"Container dependency graph: {container.get_dependency_graph()}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
from agentflow_cli.src.app.utils.parse_output import parse_state_output


class CheckpointerUnavailableError(HTTPException, ValueError):
"""Raised when the checkpointer service has not been configured."""

def __init__(self) -> None:
detail = "Checkpointer is not configured"
HTTPException.__init__(self, status_code=503, detail=detail)
ValueError.__init__(self, detail)


@singleton
class CheckpointerService:
@inject
Expand All @@ -27,7 +36,7 @@ def __init__(self, checkpointer: BaseCheckpointer):

def _config(self, config: dict[str, Any] | None, user: dict) -> dict[str, Any]:
if not self.checkpointer:
raise HTTPException(status_code=503, detail="Checkpointer service is not available")
raise CheckpointerUnavailableError()

cfg: dict[str, Any] = dict(config or {})
cfg["user"] = user
Expand Down
25 changes: 24 additions & 1 deletion agentflow_cli/src/app/routers/graph/services/graph_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
GraphSchema,
GraphSetupSchema,
)
from agentflow_cli.src.app.routers.graph.services.multimodal_preprocessor import (
preprocess_multimodal_messages,
)
from agentflow_cli.src.app.utils import DummyThreadNameGenerator, ThreadNameGenerator


Expand Down Expand Up @@ -52,6 +55,21 @@ def __init__(
self.checkpointer = checkpointer
self.thread_name_generator = thread_name_generator

# Lazy import to avoid circular dependency
self._media_service = None

@property
def media_service(self):
if self._media_service is None:
try:
container = InjectQ.get_instance()
from agentflow_cli.src.app.routers.media import MediaService

self._media_service = container.try_get(MediaService)
except Exception:
self._media_service = None
return self._media_service

async def _save_thread_name(
self,
config: dict[str, Any],
Expand Down Expand Up @@ -182,8 +200,13 @@ async def _prepare_input(
config["recursion_limit"] = graph_input.recursion_limit or 25

# Prepare the input for the graph
# Preprocess multimodal messages (resolve file_id → cached text, etc.)
preprocessed = await preprocess_multimodal_messages(
graph_input.messages,
self.media_service,
)
input_data: dict = {
"messages": graph_input.messages,
"messages": preprocessed,
}
if graph_input.initial_state:
input_data["state"] = graph_input.initial_state
Expand Down
Loading
Loading