Skip to content

fix(server): unblock per-pod concurrency — uvicorn workers=4 + pymongo native async#237

Merged
gabrycina merged 5 commits into
scaleapi:mainfrom
gabrycina:fix/mongo-async-motor
May 15, 2026
Merged

fix(server): unblock per-pod concurrency — uvicorn workers=4 + pymongo native async#237
gabrycina merged 5 commits into
scaleapi:mainfrom
gabrycina:fix/mongo-async-motor

Conversation

@gabrycina
Copy link
Copy Markdown
Contributor

@gabrycina gabrycina commented May 14, 2026

Problem

A single AgentEx pod was getting bottlenecked on conversational user traffic, forcing us to scale to ~20 pods just for user testing. Two compounding causes:

  1. uvicorn was running with the default --workers 1 in production. One worker == one process == one event loop.
  2. The MongoDB adapter exposes async def methods but uses the synchronous pymongo driver. Every Mongo call inside an async def blocked the event loop for the full DB round-trip, so even within a single worker the server serialized all in-flight Mongo-bound requests instead of multiplexing them.

Net effect: per-pod throughput on any conversational/persistence path was effectively 1 request at a time.

Changes

This PR lands two commits in order:

1. fix(server): default to uvicorn --workers 4 in production CMD

Sets UVICORN_WORKERS=4 as the production default (overrideable via env var). Dev stage stays single-worker because --reload is incompatible with --workers >1.

2. fix(server): migrate to pymongo native async (AsyncMongoClient)

Migrates the MongoDB adapter from sync pymongo to pymongo's native async API (AsyncMongoClient / AsyncDatabase / AsyncCollection), GA since pymongo 4.13. Each coroutine in the adapter is now truly non-blocking on Mongo I/O:

# Before
result = self.collection.insert_one(data)                # blocks event loop
results = list(self.collection.find(query))              # blocks event loop

# After
result = await self.collection.insert_one(data)
results = await self.collection.find(query).to_list(length=None)

For cursor paginators (list, find_by_field, find_by_field_with_cursor) the chained .skip().limit().sort() is still lazy; only to_list() materializes.

Why not Motor? Motor was officially deprecated by MongoDB on May 14, 2025 in favor of the pymongo native async API. Final EOL is May 2027. The recommended migration path is straight to pymongo.AsyncMongoClient.

Other touches:

  • GlobalDependencies.mongodb_client is now AsyncMongoClient; close() is awaited in force_reload / async_shutdown; startup ping is awaited.
  • mongodb_indexes.{ensure,drop_all,get_index_stats} are async def; create_index / drop_indexes / list_indexes are awaited / async-iterated on AsyncCollection.
  • TaskStateRepository.get_by_task_and_agent awaits find_one.
  • Test fixtures (base_mongodb_database, legacy mongodb_database, integration isolated_test_schema) yield an AsyncDatabase so repositories under test consume the same async API as in production.
  • pymongo dep floor bumped to >=4.13 (the GA cut for AsyncMongoClient).

Tests

New agentex/tests/unit/adapters/test_mongodb_adapter.py covering every path the migration touches: create / get / update / delete, batch_create / batch_get, list with pagination + ordering (asc/desc), find_by_field with paging, find_by_field_with_cursor before/after, delete_by_field. 6/6 tests pass against a real MongoDB testcontainer.

Stress-test results

Tested with two images (workers=4 + to_thread, workers=4 + native async) side-by-side, capped to prod-pod size (1 CPU / 2 GiB), against the same local MongoDB seeded with 15 realistic agent documents.

Reads — 10 000 find() calls per run

concurrency native async (this PR) to_thread baseline ratio
50 8910 r/s 3880 r/s 2.30×
100 8166 r/s 4645 r/s 1.76×
200 7349 r/s 4412 r/s 1.67×
500 5936 r/s 4475 r/s 1.33×
1000 6581 r/s 4440 r/s 1.48×

The to_thread baseline plateaus at ~4400 r/s the moment concurrency exceeds the default ThreadPoolExecutor size — adding more concurrency does nothing because in-flight Mongo ops are bounded by min(32, cpu+4) threads per worker. Native async has no such ceiling; the I/O is truly non-blocking.

Writes — 20 000 (insert_one + find_one) per run

concurrency native async (this PR) to_thread baseline ratio
100 4961 r/s 2652 r/s 1.87×
500 3757 r/s 2463 r/s 1.52×

100 000-request soak (c=200, reads)

native async (this PR) to_thread baseline
elapsed 17.86 s 23.50 s
throughput 5600 r/s 4256 r/s
RSS before 1.003 GiB 1.001 GiB
RSS after 1.003 GiB 1.002 GiB
Mongo failures 0 0

Both memory-stable across 100 k ops; native async is 31 % faster end-to-end on the same workload.

HTTP layer (/agents, c=100, 5 000 reqs)

This PR delivers ~850 req/s at workers=4 on the local stack; the HTTP layer bottleneck is the Postgres-bound auth middleware and the 1-CPU CPU cap, not the Mongo path. The Mongo-isolated numbers above are the right view of what this PR actually changes — in production with a properly-sized Postgres pool the HTTP-layer ceiling moves to where the Mongo numbers are.

Per-pod capacity translation

Assuming a typical chat turn fans out into ~15–25 server requests:

today (workers=1, sync pymongo) this PR
Per-pod concurrent Mongo ops ceiling ~1 (event-loop-bound) ~400 (Mongo pool, not thread pool)
Concurrent active users per pod ~5–10 ~150–300
Pods needed for 40k user base @ 5% peak (~2 000 active) ~200 ~10

Risk

  • API surface unchanged. Every repository method is still async def; only the underlying I/O is now non-blocking.
  • pymongo.errors.* exception types are shared between sync and async APIs, so the retry decorator (which catches AutoReconnect, NetworkTimeout, ServerSelectionTimeoutError) keeps working.
  • AWS DocumentDB compatibility preserved: same connection options (retryWrites=False, pool sizes, timeouts).
  • Dev Dockerfile stage unchanged in worker count because --reload doesn't compose with --workers >1.

Rollback

Revert this PR. As an emergency env-only mitigation, setting UVICORN_WORKERS=1 at runtime still leaves the native-async adapter in place (which is strictly better than the pre-PR sync-blocking behavior).

Greptile Summary

  • Migrates the MongoDB adapter from synchronous pymongo to pymongo's native async API (AsyncMongoClient/AsyncCollection), eliminating event-loop blocking on every Mongo I/O call. All callers that already declared async def now truly await the I/O.
  • Sets UVICORN_WORKERS=4 as the production default via ENV + a sh -c \"exec ...\" CMD, allowing the value to be overridden at runtime via the UVICORN_WORKERS env var while preserving proper PID-1 signal propagation.
  • Adds comprehensive unit tests (test_mongodb_adapter.py) exercising the migrated code paths against a real MongoDB testcontainer.

Confidence Score: 5/5

Safe to merge; all async conversions are correct, test coverage is solid, and no P1/P0 issues found.

No P0 or P1 issues identified. The async migration touches every collection call site and all are correctly awaited. Pytest asyncio_mode=auto is configured, so @pytest.fixture async def fixtures work as expected. drop_all_indexes and get_index_stats have no sync callers. The Dockerfile CMD uses exec for proper PID-1 signal handling. New unit tests exercise all migrated paths against a real testcontainer.

No files require special attention.

Important Files Changed

Filename Overview
agentex/src/adapters/crud_store/adapter_mongodb.py All collection operations correctly awaited; cursor materialization uses to_list(length=None) with limit already applied. Minor: if skip: skips .skip(0) when page_number=1, which is a no-op on MongoDB so functionally correct.
agentex/src/config/dependencies.py Client replaced with AsyncMongoClient; startup ping and close() in both force_reload and async_shutdown are correctly awaited; ensure_mongodb_indexes is awaited.
agentex/src/config/mongodb_indexes.py All three utility functions made async def; create_index, drop_indexes awaited; list_indexes correctly async-iterated. drop_all_indexes/get_index_stats have no sync callers in the codebase.
agentex/Dockerfile Production CMD switched to sh -c exec ddtrace-run uvicorn with ENV default of 4 workers; exec ensures PID-1 signal handling. Dev stage left at single-worker with --reload unchanged.
agentex/tests/fixtures/database.py MongoDB fixtures migrated to AsyncMongoClient; cleanup awaits correct; still uses @pytest.fixture for async defs, which is valid because pytest.ini sets --asyncio-mode=auto.
agentex/tests/fixtures/containers.py Correctly changed mongodb_database from @pytest.fixture (sync) to @pytest_asyncio.fixture (async), with AsyncMongoClient and awaited cleanup.
agentex/tests/integration/fixtures/integration_client.py Uses pymongo.AsyncMongoClient (module-level access works since AsyncMongoClient is a top-level pymongo export); drop_database and close() correctly awaited.
agentex/tests/unit/adapters/test_mongodb_adapter.py New test file covering all migrated adapter paths: CRUD, batch ops, list pagination, find_by_field with cursor. Runs against a real MongoDB testcontainer.
agentex/tests/unit/api/test_health_interceptor.py Mock updated from MagicMock to AsyncMock for admin.command, correctly matching the now-awaitable async driver call.
agentex/src/domain/repositories/task_state_repository.py get_by_task_and_agent now correctly awaits find_one, fixing a previously silent bug where the coroutine was returned instead of the document.

Sequence Diagram

sequenceDiagram
    participant C as Client
    participant U as uvicorn worker (x4)
    participant F as FastAPI handler
    participant R as MongoDBCRUDRepository
    participant M as MongoDB (AsyncMongoClient)

    C->>U: HTTP request
    U->>F: route dispatch
    F->>R: await repo.get / create / list
    Note over R: Previously: sync pymongo blocked event loop here
    R->>M: await collection.find_one / insert_one
    M-->>R: result (non-blocking I/O)
    R-->>F: deserialized model
    F-->>U: JSON response
    U-->>C: HTTP response
    Note over U: Other coroutines run freely while Mongo I/O is in-flight
Loading

Reviews (2): Last reviewed commit: "fix(test): switch healthcheck mock to As..." | Re-trigger Greptile

gabrycina added 3 commits May 14, 2026 10:25
Currently the production CMD runs `uvicorn ... --port 5003` with no
`--workers` flag, which falls through to uvicorn's default of 1 worker.
Combined with `pymongo` (sync) calls inside `async def` adapter methods,
each pod is effectively single-threaded: every Mongo round-trip blocks
the entire event loop, so the pod can only process one in-flight
Mongo-bound request at a time.

At Cosmos round-trip latencies of 10-30ms, this caps each pod at
~30-100 req/s. Downstream tenants (OneEdge, FDD, Deep Research,
customer-emu-*) have been compensating by horizontally scaling: the
OneEdge user-testing pack runs ~20 agentex pods just to handle a few
hundred concurrent users, even though pods sit at ~12% CPU utilization
(1 worker on 8-core nodes).

This patch sets `--workers ${UVICORN_WORKERS:-4}` in the production
final-stage CMD. 4 workers is a safe default for the typical 1 CPU /
2Gi pod limits we see in customer-emu-fdd packs — it gives ~4x
concurrency per pod without exhausting memory (each worker is ~150-
200MB resident). Operators with higher CPU/memory limits can bump
UVICORN_WORKERS up to ~cpu_count.

The dev stage CMD (`--reload` mode) intentionally stays at the
implicit 1 worker because `--reload` is incompatible with
`--workers >1`.

Followup work (separate PRs, larger scope):

- Wrap each sync `pymongo` call in `asyncio.to_thread()` so the
  event loop isn't blocked at all (this would let a single worker
  handle hundreds of concurrent ops, bounded only by `MONGODB_MAX_
  POOL_SIZE`).
- Migrate the adapter to `motor` (the async MongoDB driver) for
  the cleanest end state.

Either of those would deliver another ~10x on top of this change.

Verified
- `docker buildx build --check` on the new Dockerfile: no warnings.
- Diff against main is purely additive (a comment + ENV + replaced
  CMD); no other behavior changes.

References
- adapter_mongodb.py uses sync pymongo: src/adapters/crud_store/adapter_mongodb.py:8,219,277
- async def wrapping sync calls: src/adapters/crud_store/adapter_mongodb.py:196 etc.
- pymongo.MongoClient init: src/config/dependencies.py
The MongoDB adapter exposes async methods but the underlying driver
(pymongo) is synchronous, so every Mongo round-trip blocked the worker's
event loop — capping a single worker to one in-flight DB request at a
time and starving every other coroutine until the call returned.

Wrap each pymongo call site (find/find_one, insert_one/many, update_one,
delete_one/many, and cursor materializations) with asyncio.to_thread so
the I/O runs on the default thread pool while the event loop keeps
serving other requests. For cursor builders the wrapper closes over the
full chain (find().skip().limit().sort()) so the chained calls and the
list() materialization happen together in the worker thread.

Combined with the --workers 4 default this gives per-pod concurrency on
the Mongo-bound paths instead of one-at-a-time serialization. Local
load test against /agents (which hits the wrapped list() path):
workers=1 256 req/s -> workers=4 892 req/s, 0 failures over 5000 reqs.
Builds on the asyncio.to_thread wrap from the parent PR. Switches the
MongoDB adapter from sync pymongo (wrapped in to_thread) to pymongo's
native async API (AsyncMongoClient, AsyncDatabase, AsyncCollection),
which has been GA since pymongo 4.13 and is MongoDB's recommended
replacement for motor (motor deprecated 2025-05).

With native async, each coroutine in the adapter is truly non-blocking
on Mongo I/O. We drop the thread-pool hop entirely:

  - asyncio.to_thread(self.collection.insert_one, data)
    -> await self.collection.insert_one(data)

  - asyncio.to_thread(lambda: list(self.collection.find(q)))
    -> await self.collection.find(q).to_list(length=None)

  - for cursor paginators (list, find_by_field,
    find_by_field_with_cursor) the chained skip/limit/sort still
    builds the cursor lazily; only to_list() materializes.

Also:

  - GlobalDependencies.mongodb_client is now AsyncMongoClient; close()
    is now awaited in force_reload and async_shutdown.
  - mongodb_indexes.{ensure,drop_all,get_index_stats} are async, since
    create_index / drop_indexes / list_indexes are all awaitable on
    AsyncCollection.
  - Test fixtures (base_mongodb_database, mongodb_database, integration
    isolated_test_schema) yield an AsyncDatabase so repositories under
    test can `await` collection ops just like in production.
  - Bumps pymongo floor to 4.13 (the GA cut for AsyncMongoClient).

Adds tests/unit/adapters/test_mongodb_adapter.py covering create / get
/ update / delete, batch_create / batch_get, list pagination + sort,
find_by_field with paging, find_by_field_with_cursor before/after, and
delete_by_field. All paths exercise the new await collection.* and the
cursor.to_list materialization.

Verified locally (workers=4, /agents endpoint, 5000 reqs at c=100):
  - 0 Mongo-path failures (the 5xx in the run were Postgres
    TooManyConnectionsError from the local test container, unrelated)
  - ~850 req/s sustained, ~1 GB RSS
  - 6/6 new adapter unit tests pass
@gabrycina gabrycina requested a review from a team as a code owner May 14, 2026 10:27
@gabrycina gabrycina changed the title fix(server): migrate MongoDB adapter to pymongo native async (Tier 3) fix(server): unblock per-pod concurrency — uvicorn workers=4 + pymongo native async May 14, 2026
Copy link
Copy Markdown
Member

@olliestanley olliestanley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly looks great from my perspective. just left two small comments in the diff.

outside the diff, one P0 to address:

in src/api/health_interceptor.py we need to fix the readiness check - it currently runs

await asyncio.to_thread(client.admin.command, "ping")

but since we switched to the async client, this will not actually execute the coroutine. it can be replaced with

await client.admin.command("ping")

Comment thread agentex/Dockerfile Outdated
Comment thread agentex/tests/unit/adapters/test_mongodb_adapter.py Outdated
Three items from @olliestanley's review:

1. health_interceptor._check_mongodb was passing the async client's
   coroutine-returning command() to asyncio.to_thread, which wraps it
   in a thread that just creates the coroutine and returns it without
   awaiting. The healthcheck silently always passed. Replace with
   direct `await client.admin.command("ping")` now that the client is
   async.

2. Expand the Dockerfile comment near UVICORN_WORKERS=4 to call out
   that each worker has its own DB connection pool, and that scaling
   workers multiplies the per-pod connection count. List the relevant
   env vars (POSTGRES_POOL_SIZE / POSTGRES_MIDDLEWARE_POOL_SIZE /
   MONGODB_MAX_POOL_SIZE) so ops knows what to tune in tandem.

3. Rename test_delete_by_field_and_batch_delete -> test_delete_by_field
   (the test doesn't exercise batch_delete, just delete_by_field).
@gabrycina
Copy link
Copy Markdown
Contributor Author

Thanks @olliestanley — all three addressed in 5540620:

  1. P0 healthcheck: confirmed and fixed. await asyncio.to_thread(client.admin.command, "ping") on the async client was indeed silently never awaiting the coroutine — the healthcheck was passing regardless of Mongo state. Replaced with await client.admin.command("ping") directly.

  2. DB pool sizing comment in the Dockerfile: expanded the comment near UVICORN_WORKERS=4 to call out the multiplicative effect on per-pod pool counts and list the env vars (POSTGRES_POOL_SIZE, POSTGRES_MIDDLEWARE_POOL_SIZE, MONGODB_MAX_POOL_SIZE) that should be reviewed when changing workers. Left the defaults unchanged so existing single-worker deployments aren't affected — sizing is now an explicit ops decision rather than a hidden 4× multiplier.

  3. Test rename: test_delete_by_field_and_batch_deletetest_delete_by_field. Good catch.

Copy link
Copy Markdown
Member

@olliestanley olliestanley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

The readiness test mocked client.admin.command with a plain MagicMock,
which worked when the call was wrapped in asyncio.to_thread (sync
return value). Now that the production code awaits the call directly
on the native async client, the mock must return a coroutine — switch
to AsyncMock.

This is the exact symmetric case of the P0 we just fixed: the test
mock and the production code have to agree on whether the call is
sync or async.
@smoreinis
Copy link
Copy Markdown
Collaborator

for posterity: the integration test run here failed because it has trouble running on a fork, but I was able to manually kick one off and all the integration tests did pass: https://github.com/scaleapi/scale-agentex/actions/runs/25875798139

in the process of reviewing now, ultimately this is a fix that will benefit all of our Agentex users — many thanks for attacking this so eagerly.

Copy link
Copy Markdown
Collaborator

@smoreinis smoreinis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG, I built a custom image and deployed it to our AWS sgp dev environment so can confirm that this deploys cleanly there.

@gabrycina gabrycina merged commit 8adca83 into scaleapi:main May 15, 2026
15 of 17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants