Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion agentex/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ RUN uv sync --frozen --group dev --package agentex-backend
COPY ${SOURCE_DIR}/src/ ./src/
EXPOSE 5003
ENV PYTHONPATH=/app
# Dev / single-worker stage. `--reload` doesn't work with `--workers >1`.
# Production multi-worker config is set in the final stage at the bottom of
# this file.
CMD ["ddtrace-run", "uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "5003", "--reload"]

# Docs builder stage
Expand Down Expand Up @@ -90,4 +93,12 @@ USER nonroot

EXPOSE 5003
ENV PYTHONPATH=/app
CMD ["ddtrace-run", "uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "5003"]
# Run uvicorn with multiple workers so a single pod isn't bottlenecked to one
# in-flight request at a time. The MongoDB adapter uses `pymongo` (sync) inside
# `async def` methods, which blocks the event loop for the duration of every
# Mongo round-trip; the only way to get per-pod concurrency today is via
# separate worker processes. Default of 4 is a safe match for the typical 1
# CPU / 2Gi pod limits; bump UVICORN_WORKERS up to ~cpu_count when those
# limits are increased.
ENV UVICORN_WORKERS=4
CMD ["sh", "-c", "exec ddtrace-run uvicorn src.api.app:app --host 0.0.0.0 --port 5003 --workers ${UVICORN_WORKERS}"]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The CMD expands ${UVICORN_WORKERS} without a fallback, so if an operator explicitly overrides the variable with an empty string (e.g., docker run -e UVICORN_WORKERS= or a Kubernetes env entry with an empty value), uvicorn receives --workers and fails at startup. Using the :-4 shell default directly in the expansion gives a safety net independent of the ENV instruction.

Suggested change
CMD ["sh", "-c", "exec ddtrace-run uvicorn src.api.app:app --host 0.0.0.0 --port 5003 --workers ${UVICORN_WORKERS}"]
CMD ["sh", "-c", "exec ddtrace-run uvicorn src.api.app:app --host 0.0.0.0 --port 5003 --workers ${UVICORN_WORKERS:-4}"]
Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/Dockerfile
Line: 104

Comment:
The CMD expands `${UVICORN_WORKERS}` without a fallback, so if an operator explicitly overrides the variable with an empty string (e.g., `docker run -e UVICORN_WORKERS=` or a Kubernetes env entry with an empty value), uvicorn receives `--workers ` and fails at startup. Using the `:-4` shell default directly in the expansion gives a safety net independent of the `ENV` instruction.

```suggestion
CMD ["sh", "-c", "exec ddtrace-run uvicorn src.api.app:app --host 0.0.0.0 --port 5003 --workers ${UVICORN_WORKERS:-4}"]
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

78 changes: 45 additions & 33 deletions agentex/src/adapters/crud_store/adapter_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ async def create(self, item: T) -> T:
if data.get("updated_at") is None:
data["updated_at"] = now

result = self.collection.insert_one(data)
result = await asyncio.to_thread(self.collection.insert_one, data)

# Update item with generated ID (as string)
# Set the .id field with the string representation of _id
Expand Down Expand Up @@ -274,7 +274,7 @@ async def batch_create(self, items: list[T]) -> list[T]:

data_list.append(data)

result = self.collection.insert_many(data_list)
result = await asyncio.to_thread(self.collection.insert_many, data_list)

# Update items with generated IDs (as strings)
for idx, inserted_id in enumerate(result.inserted_ids):
Expand Down Expand Up @@ -320,7 +320,7 @@ async def get(self, id: str | None = None, name: str | None = None) -> T | None:
else:
query = {"name": name}

document = self.collection.find_one(query)
document = await asyncio.to_thread(self.collection.find_one, query)
if document is None:
msg = (
f"Item with {'id' if id else 'name'} '{id or name}' does not exist."
Expand Down Expand Up @@ -362,7 +362,9 @@ async def get_by_field(self, field_name: str, field_value: Any) -> T | None:
except Exception:
pass

document = self.collection.find_one({mongo_field_name: mongo_field_value})
document = await asyncio.to_thread(
self.collection.find_one, {mongo_field_name: mongo_field_value}
)
if document is None:
raise ItemDoesNotExist(
f"Item with {field_name} '{field_value}' does not exist."
Expand Down Expand Up @@ -392,8 +394,7 @@ async def batch_get(
elif names:
query["name"] = {"$in": names}

cursor = self.collection.find(query)
results = list(cursor)
results = await asyncio.to_thread(lambda: list(self.collection.find(query)))
if not results:
key = "ids" if ids else "names"
msg = f"No items found with {key} '{ids or names}'."
Expand Down Expand Up @@ -429,14 +430,18 @@ async def update(self, item: T) -> T:
# Add updated_at timestamp
update_data["updated_at"] = datetime.now(UTC)

result = self.collection.update_one(
{"_id": id_value}, {"$set": update_data}
result = await asyncio.to_thread(
self.collection.update_one,
{"_id": id_value},
{"$set": update_data},
)

if result.matched_count == 0:
raise ItemDoesNotExist(f"Item with id '{id_value}' does not exist.")

updated_doc = self.collection.find_one({"_id": id_value})
updated_doc = await asyncio.to_thread(
self.collection.find_one, {"_id": id_value}
)
return self._deserialize(updated_doc)
except ItemDoesNotExist:
raise
Expand Down Expand Up @@ -476,7 +481,7 @@ async def delete(self, id: str | None = None, name: str | None = None) -> None:
else:
query = {"name": name}

result = self.collection.delete_one(query)
result = await asyncio.to_thread(self.collection.delete_one, query)
if result.deleted_count == 0:
msg = (
f"Item with {'id' if id else 'name'} '{id or name}' does not exist."
Expand Down Expand Up @@ -507,7 +512,7 @@ async def batch_delete(
elif names:
query["name"] = {"$in": names}

result = self.collection.delete_many(query)
result = await asyncio.to_thread(self.collection.delete_many, query)
if result.deleted_count == 0:
key = "ids" if ids else "names"
msg = f"No items found with {key} '{ids or names}'."
Expand Down Expand Up @@ -536,11 +541,6 @@ async def list(
raise ClientError("Page number must be greater than 0")
page_number = page_number or 1
skip = (page_number - 1) * limit
if filters:
cursor = self.collection.find(filters)
else:
cursor = self.collection.find()
cursor = cursor.skip(skip).limit(limit)

sort_list = []
if order_by:
Expand All @@ -553,10 +553,16 @@ async def list(

# Always use _id as tiebreaker
sort_list.append(("_id", pymongo.ASCENDING))
cursor = cursor.sort(sort_list)

def _fetch() -> builtins.list[dict[str, Any]]:
cursor = (
self.collection.find(filters) if filters else self.collection.find()
)
return list(cursor.skip(skip).limit(limit).sort(sort_list))

try:
return [self._deserialize(doc) for doc in cursor]
docs = await asyncio.to_thread(_fetch)
return [self._deserialize(doc) for doc in docs]
except Exception as e:
raise ServiceError(
message=f"Failed to list items from MongoDB: {e}", detail=str(e)
Expand Down Expand Up @@ -600,25 +606,27 @@ async def find_by_field(
if filters:
query.update(filters)

cursor = self.collection.find(query)

# Apply sorting
sort_by_items = list(sort_by.items()) if sort_by else []
# Use ID for tiebreaking
sort_by_items.append(("_id", 1))
cursor = cursor.sort(sort_by_items)

# Apply limit if specified
limit = limit or DEFAULT_PAGE_LIMIT
cursor = cursor.limit(limit)

# Apply page number if specified
if page_number is not None and page_number < 1:
raise ClientError("Page number must be greater than 0")
if page_number is not None:
cursor = cursor.skip((page_number - 1) * limit)
skip = (page_number - 1) * limit if page_number is not None else 0

return [self._deserialize(doc) for doc in cursor]
def _fetch() -> builtins.list[dict[str, Any]]:
cursor = self.collection.find(query).sort(sort_by_items).limit(limit)
if skip:
cursor = cursor.skip(skip)
return list(cursor)

docs = await asyncio.to_thread(_fetch)
return [self._deserialize(doc) for doc in docs]
except Exception as e:
raise ServiceError(
message=f"Failed to find items by field in MongoDB: {e}", detail=str(e)
Expand Down Expand Up @@ -677,7 +685,9 @@ async def find_by_field_with_cursor(
except Exception:
cursor_object_id = cursor_id

cursor_doc = self.collection.find_one({"_id": cursor_object_id})
cursor_doc = await asyncio.to_thread(
self.collection.find_one, {"_id": cursor_object_id}
)
if cursor_doc and "created_at" in cursor_doc:
cursor_timestamp = cursor_doc["created_at"]
if before_id:
Expand Down Expand Up @@ -705,20 +715,20 @@ async def find_by_field_with_cursor(
},
]

# Create a cursor
db_cursor = self.collection.find(query)

# Apply sorting
sort_by_items = list(sort_by.items()) if sort_by else []
# Use ID for tiebreaking
sort_by_items.append(("_id", 1))
db_cursor = db_cursor.sort(sort_by_items)

# Apply limit if specified
limit = limit or DEFAULT_PAGE_LIMIT
db_cursor = db_cursor.limit(limit)

return [self._deserialize(doc) for doc in db_cursor]
def _fetch() -> builtins.list[dict[str, Any]]:
db_cursor = self.collection.find(query).sort(sort_by_items).limit(limit)
return list(db_cursor)

docs = await asyncio.to_thread(_fetch)
return [self._deserialize(doc) for doc in docs]
except Exception as e:
raise ServiceError(
message=f"Failed to find items by field with cursor in MongoDB: {e}",
Expand All @@ -745,7 +755,9 @@ async def delete_by_field(self, field_name: str, field_value: Any) -> int:
except Exception:
pass

result = self.collection.delete_many({mongo_field_name: mongo_field_value})
result = await asyncio.to_thread(
self.collection.delete_many, {mongo_field_name: mongo_field_value}
)
return result.deleted_count
except Exception as e:
raise ServiceError(
Expand Down
Loading