Skip to content

Conversation

@samwillis
Copy link
Contributor

@samwillis samwillis commented Apr 6, 2025

Probably needs some tests

this.#isUpToDate = true
this.#lastOffset =
`${upToDateMsg.headers.global_last_seen_lsn}_0` as Offset
// TODO: we also need the cache buster `cursor` value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it on the live SSE response as a header? I’m thinking that’s fine, as in, the client only needs to update the cursor when the response is empty, so it’s fine to get it from the header.

* ```
* const stream = new ShapeStream({
* url: `http://localhost:3000/v1/shape`,
* liveMode: 'sse'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this accept to mimic http could make sense

@balegas
Copy link
Contributor

balegas commented May 12, 2025

We want to merge experimental SSE support so that we can use them in our official demos.
One thing we want to experiment is to modify server/client to see if we can do live streaming just with regular http anf fetchclient instead of EventSource.

@thruflo
Copy link
Contributor

thruflo commented May 12, 2025

I think we did want to keep using SSE (just with the slightly different client behaviour) rather than switching to an alternative server response protocol, because using SSE proper triggers the desired request collapsing behaviour from CDNs.

@balegas
Copy link
Contributor

balegas commented May 13, 2025

Did you check that keeping the regular http response open for a bit would mess with the CDN request collapsing?

Copy link
Contributor

@kevin-dp kevin-dp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into the implementation it seems that it does not properly handle corner cases such as 409. Any non 200 status code would basically be thrown whereas in our long polling implementation we handle those status codes appropriately.

Also this currently only works in the browser in other environments like NodeJS the EventSource API is not defined and we have to polyfill it. However, as pointed out by @samwillis the browser's event source API is very limited and we would want to use the fetch-event-source library instead. That will solve the polyfill issue.

As a final remark, i don't like the complexity of the ShapeStream.start method. It was already complex and is now becoming even more complex. I have ideas to refactor this code to make it more comprehensible.

This is a follow up PR on
#2546 and
#2544. It solves a bug
related with 409s (must refetch) in SSE mode and it replaces the
EventSource browser API by the
[fetch-event-source](https://github.com/Azure/fetch-event-source)
library. I refactored the `ShapeStream.#start` method which was becoming
very big and complex. To this end, i split the logic into helper methods
that handle the different parts that need to happen (building the shape
URL, making the request, parsing the response headers, handling the
response body, etc.).

I had to patch the
[fetch-event-source](https://github.com/Azure/fetch-event-source)
library because it relies on browser-specific features such as
`document` and `window` (cf.
Azure/fetch-event-source#41). But we want our
client to also work in server-side JS environments.

I also had to patch the `fetch-event-source` library because it does not
abort the fetch when you pass an already aborted signal. A complete
description of the bug and the fix can be found here:
Azure/fetch-event-source#98.
@KyleAMathews
Copy link
Contributor

Shipped in #2776

@kevin-dp kevin-dp reopened this Jun 25, 2025
@kevin-dp kevin-dp changed the base branch from thruflo/experimental-live-sse to main June 25, 2025 14:56
@kevin-dp kevin-dp changed the base branch from main to thruflo/experimental-live-sse June 25, 2025 14:56
@thruflo
Copy link
Contributor

thruflo commented Jun 25, 2025

One of the nuances here that I was thinking about recently is how the SSE client handles interruptibility.

I.e.: we have the ability for the backend to interrupt a long poll request. But for an SSE request, if it's interrupted, how does the SSE client handle this and is there potential for some kind of duplicate message delivery? Or is the SSE client advancing its offset with every message it receives?

@kevin-dp
Copy link
Contributor

kevin-dp commented Jun 25, 2025

But for an SSE request, if it's interrupted, how does the SSE client handle this and is there potential for some kind of duplicate message delivery?

The SSE approach should behave the same way as the long poll when the frontend or the backend interrupt the request. When that happens, the fetch (note that both long poll and SSE use fetch) is aborted and an error is thrown. That error is caught here:

try {
await this.#requestShape({
fetchUrl,
requestAbortController,
headers: requestHeaders,
})
} catch (e) {
// Handle abort error triggered by refresh
if (
(e instanceof FetchError || e instanceof FetchBackoffAbortError) &&
requestAbortController.signal.aborted &&
requestAbortController.signal.reason ===
FORCE_DISCONNECT_AND_REFRESH
) {
// Loop back to the top of the while loop to start a new request
continue
}

We will continue to the next iteration of the loop which will try resuming from where we left it.

Or is the SSE client advancing its offset with every message it receives?

Yes, each time we receive a message it is processed by this method:

async #onMessages(messages: string, schema: Schema, isSseMessage = false) {
    const batch = this.#messageParser.parse(messages, schema)

    // Update isUpToDate
    if (batch.length > 0) {
      const lastMessage = batch[batch.length - 1]
      if (isUpToDateMessage(lastMessage)) {
        if (isSseMessage) {
          // Only use the offset from the up-to-date message if this was an SSE message.
          // If we would use this offset from a regular fetch, then it will be wrong
          // and we will get an "offset is out of bounds for this shape" error
          const offset = getOffset(lastMessage)
          if (offset) {
            this.#lastOffset = offset
          }
        }
        this.#lastSyncedAt = Date.now()
        this.#isUpToDate = true
      }

      await this.#publish(batch)
    }
  }

So we're using the offset that is provided in the up-to-date message to advance the offset.

@kevin-dp kevin-dp marked this pull request as ready for review June 25, 2025 15:11
@kevin-dp kevin-dp merged commit 5c3f5fc into thruflo/experimental-live-sse Jun 25, 2025
65 of 69 checks passed
@kevin-dp kevin-dp deleted the samwillis/experimental-live-sse branch June 25, 2025 15:11
@thruflo
Copy link
Contributor

thruflo commented Jun 26, 2025

@kevin-dp my concern is:

  • accumulate some messages (no uptodate)
  • error, reconnect
  • accumulate the same messages again
  • uptodate -> commit with duplicate data

Seeing the message processing code, I think this can be a possibility, if the SSE client maintains the accumulated messages when handling an interrupt.

Maybe this isn’t an issue because the client clears the accumulator each time it re-connects on error? If so, it’s worth knowing that it’s important to interrupt with an error.

I had considered a 200 response with @magnetised for the Phoenix.Sync interrupt. We’ve moved past and we’re not doing it any more but for a while it could have snuck in there!

@kevin-dp
Copy link
Contributor

@thruflo right, that's a valid concern. So as you described, if we receive an update and then before we get the up to date message the connection breaks, we will reconnect and start from the old offset and get the update a second time. So the shapestream consumer will receive the update twice (or more, depending on how many times this happens). This happens because we immediately publish every incoming message. So, if the connection breaks before it gets uptodate, we will try again with the same offset and get some updates we already got before. Even though this is not 100% right, i think it's fine in most cases because the consumer would usually just materialize the stream into a map and applying an insert/update/delete twice or more is idempotent. But if the consumer does any additional side effects like counting the number of updates then it will be wrong.

We can solve this problem by changing the SSE onmessage callback such that it buffers messages until we get the uptodate message. That way we process only complete batches of operations like we do in regular mode (by virtue of Electric sending out complete batches that end with an up to date message).

@KyleAMathews
Copy link
Contributor

Yeah we should keep the same semantics so buffering until up-to-date makes sense

kevin-dp added a commit that referenced this pull request Jul 3, 2025
This PR addresses the issue raised in
#2546 (comment).
The crux of the issue is that messages might be processed several times
if the SSE connection breaks after processing a message but before
receiving the up-to-date message. This is because we only advance the
offset when processing the up-to-date message, hence, we would fetch
again at the old offset and receive some messages that we already
processed.

The fix is easy: we batch SSE messages until we receive up-to-date and
only process the messages at that point. The hard part was to write a
unit test that reproduces this scenario. After a bit of trial and error
i managed to do that by creating a custom fetch wrapper that proxies the
SSE response but filters out the first up-to-date message and then
forces a refresh of the shapestream at that point.
3dyuval pushed a commit to 3dyuval/electric that referenced this pull request Aug 23, 2025
This PR addresses the issue raised in
electric-sql#2546 (comment).
The crux of the issue is that messages might be processed several times
if the SSE connection breaks after processing a message but before
receiving the up-to-date message. This is because we only advance the
offset when processing the up-to-date message, hence, we would fetch
again at the old offset and receive some messages that we already
processed.

The fix is easy: we batch SSE messages until we receive up-to-date and
only process the messages at that point. The hard part was to write a
unit test that reproduces this scenario. After a bit of trial and error
i managed to do that by creating a custom fetch wrapper that proxies the
SSE response but filters out the first up-to-date message and then
forces a refresh of the shapestream at that point.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants