estuary-cdk: implement ConcurrentTask for concurrent webhook writes#4372
estuary-cdk: implement ConcurrentTask for concurrent webhook writes#4372nicolaslazo wants to merge 1 commit into
Conversation
709683a to
4af1232
Compare
0ad5aee to
4ae0f7b
Compare
The current Task implementation gives callers manual control over how many documents are buffered before each checkpoint. That model works when there is a single producer per Task, but our aiohttp webhook server handles an arbitrary number of concurrent requests across the same Task; this exposes us to the risk that concurrent checkpoint() calls may interleave on the shared buffer. This change introduces a ConcurrentTask subclass with an atomic `emit()` method that atomically writes buffers and checkpoints together. ConcurrentTask also gives each binding its own buffer, replacing the previous stopgap where a single mutex was shared across every binding in the webhook server.
4af1232 to
2c56f61
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
I know I originally provided the direction of instantiating separate Tasks for each webhook binding, but the more I think about the problem we're trying to solve plus seeing that the existing source-appsflyer captures aren't throughput limited by sharing a single Task, I'd rather we not take the "one Task per webhook binding" approach right now; it's an optimization I thought we'd need, but based on empirical evidence, we don't need to take on that additional complexity & improve throughput here yet. I apologize for changing my mind & switching direction late on this one. 🙇
The messy bit remaining to clean up is ensuring a Task doesn't attempt to write to its buffer while its buffer is being drained. We're using locks to coordinate this whenever we're using the various Task methods, but IMO it'd be cleaner to push this complexity down into the buffer itself; if the buffer guaranteed no write would ever happen when it's being drained, that'd simplify a lot of the logic elsewhere in the CDK now & in the future.
So, what I'm proposing is we:
- still use a single
Taskfor all webhook bindings - wrap the
Task's buffer in aConcurrentBufferclass whose writes and drains are safe for concurrent producers. AConcurrentBufferwould be an overall improvement for all connectors, albeit it's an improvement that wasn't needed until we started capturing webhooks.
I've pushed a spike implementation of this up to the bair/estuary-cdk-concurrent-buffer-spike branch for reference of what I'm thinking. Do you think that's a suitable alternative or see something I'm missing?
Description:
The current
Taskimplementation gives callers manual control over how many documents are buffered before each checkpoint. That model works when there is a single producer perTask, but ouraiohttpwebhook server handles an arbitrary number of concurrent requests across the sameTask; this exposes us to the risk that concurrentcheckpoint()calls may interleave on the shared buffer.This change introduces a
ConcurrentTaskclass that wrapsTaskwith an atomicemit()method that atomically writes buffers and checkpoints together.ConcurrentTaskalso gives each binding its own buffer, replacing the previous stopgap where a single mutex was shared across every binding in the webhook server.Closes #4322.
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
(anything that might help someone review this PR)