-
Notifications
You must be signed in to change notification settings - Fork 295
fix: buffer SSE messages until up-to-date message #2869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…pe stream if the SSE connection breaks after processing messages and before receiving the up-to-date message.
Codecov ReportAttention: Patch coverage is
✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## main #2869 +/- ##
==========================================
+ Coverage 77.79% 79.21% +1.41%
==========================================
Files 157 157
Lines 7495 7523 +28
Branches 284 284
==========================================
+ Hits 5831 5959 +128
+ Misses 1662 1562 -100
Partials 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great. Thanks @kevin-dp
As an aside I don't like the existing getOffset method that constructs one from the LSN and a _0 suffix. The offset being constructed from an LSN is a backend implementation detail we should not depend on. I think we had this in my original POC as a work around, but we really should replace this by adding the actual offset on the up-to-date message, at least in SSE mode.
edit: filed a bug #2870
This PR addresses the issue raised in electric-sql#2546 (comment). The crux of the issue is that messages might be processed several times if the SSE connection breaks after processing a message but before receiving the up-to-date message. This is because we only advance the offset when processing the up-to-date message, hence, we would fetch again at the old offset and receive some messages that we already processed. The fix is easy: we batch SSE messages until we receive up-to-date and only process the messages at that point. The hard part was to write a unit test that reproduces this scenario. After a bit of trial and error i managed to do that by creating a custom fetch wrapper that proxies the SSE response but filters out the first up-to-date message and then forces a refresh of the shapestream at that point.
This PR addresses the issue raised in #2546 (comment). The crux of the issue is that messages might be processed several times if the SSE connection breaks after processing a message but before receiving the up-to-date message. This is because we only advance the offset when processing the up-to-date message, hence, we would fetch again at the old offset and receive some messages that we already processed.
The fix is easy: we batch SSE messages until we receive up-to-date and only process the messages at that point. The hard part was to write a unit test that reproduces this scenario. After a bit of trial and error i managed to do that by creating a custom fetch wrapper that proxies the SSE response but filters out the first up-to-date message and then forces a refresh of the shapestream at that point.