Skip to content
6 changes: 4 additions & 2 deletions agentex/src/api/middleware_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,12 @@ async def verify_auth_gateway(
method = request.method

logger.info(
"[authentication_middleware] Request authenticated successfully for %s %s with principal %s",
"[authentication_middleware] Request authenticated successfully for %s %s "
"(user_id=%s, account_id=%s)",
method,
route_path,
principal_context,
getattr(principal_context, "user_id", None),
getattr(principal_context, "account_id", None),
)
return None # Authentication successful
except Exception as exc:
Expand Down
51 changes: 51 additions & 0 deletions agentex/src/domain/delegation_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Outbound runtime-delegation headers for ACP calls to agent pods (v1).

Forwards the validated user API key on a dedicated header so agents can call
downstream APIs as the user. Agent identity for SGP will eventually be a claim
on a pod-minted delegation token (OBO), not a separate header from agentex.
"""

from typing import Any

HEADER_ACTING_USER_API_KEY = "x-acting-user-api-key"
HEADER_SELECTED_ACCOUNT_ID = "x-selected-account-id"
HEADER_USER_API_KEY = "x-api-key"


def _normalize_headers(headers: dict[str, str] | None) -> dict[str, str]:
if not headers:
return {}
return {k.lower(): v for k, v in headers.items()}


def build_delegation_headers(
principal: Any,
inbound_headers: dict[str, str] | None,
*,
agent_identity: str | None = None,
) -> dict[str, str]:
"""
Outbound ACP headers so the agent can act on behalf of the authenticated user.

Requires a validated user principal from auth; reads x-api-key from the
inbound request (already checked during auth). Skips delegation when the
request is authenticated as the agent itself (agent_identity set).
"""
if agent_identity or principal is None:
return {}

normalized = _normalize_headers(inbound_headers)
api_key = normalized.get(HEADER_USER_API_KEY)
if not api_key:
return {}

result = {
HEADER_ACTING_USER_API_KEY: api_key,
}

account_id = normalized.get(HEADER_SELECTED_ACCOUNT_ID)
if account_id:
result[HEADER_SELECTED_ACCOUNT_ID] = account_id

return result
60 changes: 35 additions & 25 deletions agentex/src/domain/services/agent_acp_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from typing import Annotated, Any
from uuid import uuid4

from fastapi import Depends
from fastapi import Depends, Request
from pydantic import BaseModel

from src.adapters.http.adapter_httpx import DHttpxGateway
from src.domain.delegation_headers import build_delegation_headers
from src.domain.entities.agents import AgentEntity
from src.domain.entities.agents_rpc import (
AgentRPCMethod,
Expand Down Expand Up @@ -72,7 +73,10 @@
{
"authorization",
"cookie",
"x-api-key",
"x-agent-api-key",
"x-acting-user-api-key",
"x-acting-as-agent",
}
)

Expand All @@ -84,7 +88,7 @@ def filter_request_headers(headers: dict[str, str] | None) -> dict[str, str]:
Security filtering rules:
1. Allow only x-* prefixed headers (allowlist approach)
2. Block hop-by-hop headers (connection, keep-alive, etc.)
3. Block sensitive headers (authorization, cookie, x-agent-api-key)
3. Block sensitive headers (credentials, acting delegation, x-agent-api-key)

Args:
headers: Raw request headers from client
Expand Down Expand Up @@ -118,10 +122,12 @@ def __init__(
agent_repository: DAgentRepository,
agent_api_key_repository: DAgentAPIKeyRepository,
http_gateway: DHttpxGateway,
request: Request,
):
self._http_gateway = http_gateway
self._agent_repository = agent_repository
self._agent_api_key_repository = agent_api_key_repository
self._request = request

def _parse_task_message(self, result: dict[str, Any]) -> TaskMessageContentEntity:
"""Parse a result dict into a TaskMessage"""
Expand Down Expand Up @@ -254,26 +260,40 @@ async def _call_jsonrpc_stream(
logger.error(f"Error calling ACP server at {url}: {e}")
raise e

async def get_headers(self, agent: AgentEntity) -> dict[str, str]:
auth_headers = await self.get_agent_auth_headers(agent) or {}

request_id = ctx_var_request_id.get(uuid4().hex)
headers = {**auth_headers, "x-request-id": request_id}
return headers
def get_delegation_headers(self, agent: AgentEntity) -> dict[str, str]:
state = self._request.state
return build_delegation_headers(
getattr(state, "principal_context", None),
dict(self._request.headers),
agent_identity=getattr(state, "agent_identity", None),
)

async def get_agent_auth_headers(
async def get_headers(
self,
agent: AgentEntity,
) -> dict[str, str] | None:
"""
Get the authentication headers for an agent by its ID.
"""
request_headers: dict[str, str] | None = None,
) -> dict[str, str]:
filtered_request_headers = filter_request_headers(request_headers)
delegation_headers = self.get_delegation_headers(agent)
auth_headers = await self.get_agent_auth_headers(agent)
request_id = ctx_var_request_id.get(uuid4().hex)

# Later keys win. Client passthrough and delegation first; agent auth last.
return {
**filtered_request_headers,
**delegation_headers,
**auth_headers,
"x-request-id": request_id,
}

async def get_agent_auth_headers(self, agent: AgentEntity) -> dict[str, str]:
"""Authentication headers the agent pod uses to call back into agentex."""
api_key = await self._agent_api_key_repository.get_internal_api_key_by_agent_id(
agent_id=agent.id
)
if api_key:
return {"x-agent-api-key": api_key.api_key}
return None
return {}

async def create_task(
self,
Expand Down Expand Up @@ -398,24 +418,14 @@ async def send_event(
) -> dict[str, Any]:
"""Send an event to a running task"""

# Filter request headers for security (only safe x-* headers)
filtered_headers = filter_request_headers(request_headers)

# Don't include headers in params body - let SDK extract from HTTP headers
# This ensures single source of truth and avoids duplication
params = SendEventParams(
agent=agent,
task=task,
event=event,
request=None,
)

# Build HTTP headers: start with filtered request headers, then overlay auth headers
# Auth headers are added last to ensure they cannot be overwritten
# SDK will extract these headers and populate params.request at agent side
headers = filtered_headers.copy()
auth_headers = await self.get_headers(agent)
headers.update(auth_headers)
headers = await self.get_headers(agent, request_headers)

return await self._call_jsonrpc(
url=acp_url,
Expand Down
2 changes: 2 additions & 0 deletions agentex/src/utils/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

REQUEST_KEY_REGEXP_BLACKLIST = [
r"api_key",
r"api-key",
r"password",
r"secret",
r"token",
r"authorization",
r"acting-user",
]


Expand Down
20 changes: 18 additions & 2 deletions agentex/tests/fixtures/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Provides factory functions and specific fixtures for creating services with test repositories.
"""

from unittest.mock import Mock
from unittest.mock import MagicMock, Mock

import pytest

Expand All @@ -19,14 +19,30 @@ def create_task_message_service(task_message_repository):
return TaskMessageService(task_message_repository=task_message_repository)


def create_agent_acp_service(http_gateway, agent_repository, agent_api_key_repository):
def create_mock_request():
"""Minimal FastAPI Request stand-in for AgentACPService tests."""
request = MagicMock()
request.state = MagicMock()
request.state.principal_context = None
request.state.agent_identity = None
request.headers = {}
return request


def create_agent_acp_service(
http_gateway,
agent_repository,
agent_api_key_repository,
request=None,
):
"""Factory function to create AgentACPService with given HTTP gateway"""
from src.domain.services.agent_acp_service import AgentACPService

return AgentACPService(
http_gateway=http_gateway,
agent_repository=agent_repository,
agent_api_key_repository=agent_api_key_repository,
request=request or create_mock_request(),
)


Expand Down
Loading
Loading