Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion agentex/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ RUN uv sync --frozen --group dev --package agentex-backend
COPY ${SOURCE_DIR}/src/ ./src/
EXPOSE 5003
ENV PYTHONPATH=/app
# Dev / single-worker stage. `--reload` doesn't work with `--workers >1`.
# Production multi-worker config is set in the final stage at the bottom of
# this file.
CMD ["ddtrace-run", "uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "5003", "--reload"]

# Docs builder stage
Expand Down Expand Up @@ -90,4 +93,20 @@ USER nonroot

EXPOSE 5003
ENV PYTHONPATH=/app
CMD ["ddtrace-run", "uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "5003"]
# Run uvicorn with multiple workers so a single pod isn't bottlenecked on one
# in-flight request at a time. Default of 4 is a safe match for the typical
# 1 CPU / 2Gi pod limits; bump UVICORN_WORKERS up to ~cpu_count when those
# limits are increased.
#
# IMPORTANT: each worker is a separate Python process with its OWN DB pools.
# Per-pod aggregates with the current defaults:
# - Postgres: POSTGRES_POOL_SIZE (10) × workers + max_overflow (20) × workers
# = 40 base + 80 overflow per pod at workers=4 (vs 10 + 20 at workers=1).
# - MongoDB: MONGODB_MAX_POOL_SIZE per worker; 4× the aggregate at workers=4.
# When changing UVICORN_WORKERS, review POSTGRES_POOL_SIZE /
# POSTGRES_MIDDLEWARE_POOL_SIZE / MONGODB_MAX_POOL_SIZE against the upstream
# DB's max_connections so you don't exhaust the pool. A rough rule of thumb is
# to keep `workers × POSTGRES_POOL_SIZE` below ~half of the upstream cap so
# overflow has headroom.
ENV UVICORN_WORKERS=4
CMD ["sh", "-c", "exec ddtrace-run uvicorn src.api.app:app --host 0.0.0.0 --port 5003 --workers ${UVICORN_WORKERS}"]
2 changes: 1 addition & 1 deletion agentex/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies = [
"kubernetes-asyncio>=31.1.0,<32",
"aiohttp>=3.10.9,<4",
"websockets~=14.2",
"pymongo>=4.11.2,<5",
"pymongo>=4.13.0,<5",
"httpx[http2]>=0.27.2",
"ddtrace>=3.13.0",
"json_log_formatter>=1.1.1",
Expand Down
66 changes: 32 additions & 34 deletions agentex/src/adapters/crud_store/adapter_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pymongo
from bson import ObjectId
from pymongo.collection import Collection
from pymongo.asynchronous.collection import AsyncCollection

from src.adapters.crud_store.exceptions import DuplicateItemError, ItemDoesNotExist
from src.adapters.crud_store.port import CRUDRepository
Expand Down Expand Up @@ -98,7 +98,7 @@ def __init__(
model_class: type[T],
):
self.db = db
self.collection: Collection = db[collection_name]
self.collection: AsyncCollection = db[collection_name]
self.model_class = model_class

# MongoDB already enforces uniqueness on _id
Expand Down Expand Up @@ -216,7 +216,7 @@ async def create(self, item: T) -> T:
if data.get("updated_at") is None:
data["updated_at"] = now

result = self.collection.insert_one(data)
result = await self.collection.insert_one(data)

# Update item with generated ID (as string)
# Set the .id field with the string representation of _id
Expand Down Expand Up @@ -274,7 +274,7 @@ async def batch_create(self, items: list[T]) -> list[T]:

data_list.append(data)

result = self.collection.insert_many(data_list)
result = await self.collection.insert_many(data_list)

# Update items with generated IDs (as strings)
for idx, inserted_id in enumerate(result.inserted_ids):
Expand Down Expand Up @@ -320,7 +320,7 @@ async def get(self, id: str | None = None, name: str | None = None) -> T | None:
else:
query = {"name": name}

document = self.collection.find_one(query)
document = await self.collection.find_one(query)
if document is None:
msg = (
f"Item with {'id' if id else 'name'} '{id or name}' does not exist."
Expand Down Expand Up @@ -362,7 +362,9 @@ async def get_by_field(self, field_name: str, field_value: Any) -> T | None:
except Exception:
pass

document = self.collection.find_one({mongo_field_name: mongo_field_value})
document = await self.collection.find_one(
{mongo_field_name: mongo_field_value}
)
if document is None:
raise ItemDoesNotExist(
f"Item with {field_name} '{field_value}' does not exist."
Expand Down Expand Up @@ -392,8 +394,7 @@ async def batch_get(
elif names:
query["name"] = {"$in": names}

cursor = self.collection.find(query)
results = list(cursor)
results = await self.collection.find(query).to_list(length=None)
if not results:
key = "ids" if ids else "names"
msg = f"No items found with {key} '{ids or names}'."
Expand Down Expand Up @@ -429,14 +430,14 @@ async def update(self, item: T) -> T:
# Add updated_at timestamp
update_data["updated_at"] = datetime.now(UTC)

result = self.collection.update_one(
result = await self.collection.update_one(
{"_id": id_value}, {"$set": update_data}
)

if result.matched_count == 0:
raise ItemDoesNotExist(f"Item with id '{id_value}' does not exist.")

updated_doc = self.collection.find_one({"_id": id_value})
updated_doc = await self.collection.find_one({"_id": id_value})
return self._deserialize(updated_doc)
except ItemDoesNotExist:
raise
Expand Down Expand Up @@ -476,7 +477,7 @@ async def delete(self, id: str | None = None, name: str | None = None) -> None:
else:
query = {"name": name}

result = self.collection.delete_one(query)
result = await self.collection.delete_one(query)
if result.deleted_count == 0:
msg = (
f"Item with {'id' if id else 'name'} '{id or name}' does not exist."
Expand Down Expand Up @@ -507,7 +508,7 @@ async def batch_delete(
elif names:
query["name"] = {"$in": names}

result = self.collection.delete_many(query)
result = await self.collection.delete_many(query)
if result.deleted_count == 0:
key = "ids" if ids else "names"
msg = f"No items found with {key} '{ids or names}'."
Expand Down Expand Up @@ -536,11 +537,6 @@ async def list(
raise ClientError("Page number must be greater than 0")
page_number = page_number or 1
skip = (page_number - 1) * limit
if filters:
cursor = self.collection.find(filters)
else:
cursor = self.collection.find()
cursor = cursor.skip(skip).limit(limit)

sort_list = []
if order_by:
Expand All @@ -553,10 +549,14 @@ async def list(

# Always use _id as tiebreaker
sort_list.append(("_id", pymongo.ASCENDING))
cursor = cursor.sort(sort_list)

try:
return [self._deserialize(doc) for doc in cursor]
cursor = (
self.collection.find(filters) if filters else self.collection.find()
)
cursor = cursor.skip(skip).limit(limit).sort(sort_list)
docs = await cursor.to_list(length=None)
return [self._deserialize(doc) for doc in docs]
except Exception as e:
raise ServiceError(
message=f"Failed to list items from MongoDB: {e}", detail=str(e)
Expand Down Expand Up @@ -600,25 +600,24 @@ async def find_by_field(
if filters:
query.update(filters)

cursor = self.collection.find(query)

# Apply sorting
sort_by_items = list(sort_by.items()) if sort_by else []
# Use ID for tiebreaking
sort_by_items.append(("_id", 1))
cursor = cursor.sort(sort_by_items)

# Apply limit if specified
limit = limit or DEFAULT_PAGE_LIMIT
cursor = cursor.limit(limit)

# Apply page number if specified
if page_number is not None and page_number < 1:
raise ClientError("Page number must be greater than 0")
if page_number is not None:
cursor = cursor.skip((page_number - 1) * limit)
skip = (page_number - 1) * limit if page_number is not None else 0

return [self._deserialize(doc) for doc in cursor]
cursor = self.collection.find(query).sort(sort_by_items).limit(limit)
if skip:
cursor = cursor.skip(skip)
docs = await cursor.to_list(length=None)
return [self._deserialize(doc) for doc in docs]
except Exception as e:
raise ServiceError(
message=f"Failed to find items by field in MongoDB: {e}", detail=str(e)
Expand Down Expand Up @@ -677,7 +676,7 @@ async def find_by_field_with_cursor(
except Exception:
cursor_object_id = cursor_id

cursor_doc = self.collection.find_one({"_id": cursor_object_id})
cursor_doc = await self.collection.find_one({"_id": cursor_object_id})
if cursor_doc and "created_at" in cursor_doc:
cursor_timestamp = cursor_doc["created_at"]
if before_id:
Expand Down Expand Up @@ -705,20 +704,17 @@ async def find_by_field_with_cursor(
},
]

# Create a cursor
db_cursor = self.collection.find(query)

# Apply sorting
sort_by_items = list(sort_by.items()) if sort_by else []
# Use ID for tiebreaking
sort_by_items.append(("_id", 1))
db_cursor = db_cursor.sort(sort_by_items)

# Apply limit if specified
limit = limit or DEFAULT_PAGE_LIMIT
db_cursor = db_cursor.limit(limit)

return [self._deserialize(doc) for doc in db_cursor]
db_cursor = self.collection.find(query).sort(sort_by_items).limit(limit)
docs = await db_cursor.to_list(length=None)
return [self._deserialize(doc) for doc in docs]
except Exception as e:
raise ServiceError(
message=f"Failed to find items by field with cursor in MongoDB: {e}",
Expand All @@ -745,7 +741,9 @@ async def delete_by_field(self, field_name: str, field_value: Any) -> int:
except Exception:
pass

result = self.collection.delete_many({mongo_field_name: mongo_field_value})
result = await self.collection.delete_many(
{mongo_field_name: mongo_field_value}
)
return result.deleted_count
except Exception as e:
raise ServiceError(
Expand Down
3 changes: 1 addition & 2 deletions agentex/src/api/health_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ async def _check_mongodb(self, deps: Any) -> dict[str, Any]:
if client is None:
return {"healthy": False, "error": "Client not initialized"}

# MongoDB client is synchronous, run in thread pool
async with asyncio.timeout(DEPENDENCY_CHECK_TIMEOUT):
await asyncio.to_thread(client.admin.command, "ping")
await client.admin.command("ping")

return {"healthy": True}
except TimeoutError:
Expand Down
20 changes: 10 additions & 10 deletions agentex/src/config/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from typing import Annotated

import httpx
import pymongo
import redis.asyncio as redis
from docker import DockerClient
from fastapi import Depends
from kubernetes_asyncio import config as k8s_config
from pymongo.database import Database as MongoDBDatabase
from pymongo import AsyncMongoClient
from pymongo.asynchronous.database import AsyncDatabase
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
Expand Down Expand Up @@ -43,8 +43,8 @@ def __init__(self):
self.database_async_read_write_engine: AsyncEngine | None = None
self.database_async_middleware_read_write_engine: AsyncEngine | None = None
self.docker_client = None
self.mongodb_client: pymongo.MongoClient | None = None
self.mongodb_database: MongoDBDatabase | None = None
self.mongodb_client: AsyncMongoClient | None = None
self.mongodb_database: AsyncDatabase | None = None
self.httpx_client: httpx.AsyncClient | None = None
self.redis_pool: redis.ConnectionPool | None = None
self.database_async_read_only_engine: AsyncEngine | None = None
Expand Down Expand Up @@ -122,7 +122,7 @@ async def load(self):

logger.info("Connecting to MongoDB")

self.mongodb_client = pymongo.MongoClient(
self.mongodb_client = AsyncMongoClient(
mongodb_uri,
serverSelectionTimeoutMS=20000,
connectTimeoutMS=20000,
Expand All @@ -136,7 +136,7 @@ async def load(self):
self.mongodb_database = self.mongodb_client[mongodb_database_name]

# Ping the database to verify connection
self.mongodb_client.admin.command("ping")
await self.mongodb_client.admin.command("ping")
logger.info(
f"Successfully connected to MongoDB database '{mongodb_database_name}'"
)
Expand All @@ -146,7 +146,7 @@ async def load(self):
from src.config.mongodb_indexes import ensure_mongodb_indexes

try:
ensure_mongodb_indexes(self.mongodb_database)
await ensure_mongodb_indexes(self.mongodb_database)
logger.info("MongoDB indexes ensured successfully")
except Exception as index_error:
# Don't fail startup if index creation fails
Expand Down Expand Up @@ -242,7 +242,7 @@ async def force_reload(self):
if self.database_async_read_only_engine:
await self.database_async_read_only_engine.dispose()
if self.mongodb_client:
self.mongodb_client.close()
await self.mongodb_client.close()

# Reset state
self._loaded = False
Expand Down Expand Up @@ -293,7 +293,7 @@ async def async_shutdown():

# Close MongoDB connection
if global_dependencies.mongodb_client:
global_dependencies.mongodb_client.close()
await global_dependencies.mongodb_client.close()

# Close HTTPX client
if global_dependencies.httpx_client:
Expand Down Expand Up @@ -375,7 +375,7 @@ def middleware_async_read_only_session_maker() -> async_sessionmaker[AsyncSessio
DockerClient, Depends(lambda: GlobalDependencies().docker_client)
]
DMongoDBDatabase = Annotated[
MongoDBDatabase, Depends(lambda: GlobalDependencies().mongodb_database)
AsyncDatabase, Depends(lambda: GlobalDependencies().mongodb_database)
]
DHttpxClient = Annotated[
httpx.AsyncClient, Depends(lambda: GlobalDependencies().httpx_client)
Expand Down
Loading
Loading