Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@
},
"devDependencies": {
"glob": "^10.3.10"
},
"pnpm": {
"patchedDependencies": {
"@microsoft/fetch-event-source": "patches/@microsoft__fetch-event-source.patch"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Electric.Plug.ServeShapePlug do
all_params =
Map.merge(conn.query_params, conn.path_params)
|> Map.update("live", "false", &(&1 != "false"))
|> Map.update("experimental_live_sse", "false", &(&1 != "false"))

case Api.validate(api, all_params) do
{:ok, request} ->
Expand Down
235 changes: 227 additions & 8 deletions packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Electric.Shapes.Api do
alias __MODULE__
alias __MODULE__.Request
alias __MODULE__.Response
alias __MODULE__.SseState

import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2]

Expand All @@ -25,7 +26,9 @@ defmodule Electric.Shapes.Api do
required: true
],
allow_shape_deletion: [type: :boolean],
keepalive_interval: [type: :integer],
long_poll_timeout: [type: :integer],
sse_timeout: [type: :integer],
max_age: [type: :integer],
stack_ready_timeout: [type: :integer],
stale_age: [type: :integer],
Expand All @@ -48,12 +51,15 @@ defmodule Electric.Shapes.Api do
:stack_id,
:storage,
allow_shape_deletion: false,
keepalive_interval: 21_000,
long_poll_timeout: 20_000,
sse_timeout: 60_000,
max_age: 60,
stack_ready_timeout: 5_000,
stale_age: 300,
send_cache_headers?: true,
encoder: Electric.Shapes.Api.Encoder.JSON,
sse_encoder: Electric.Shapes.Api.Encoder.SSE,
configured: false
]

Expand Down Expand Up @@ -311,8 +317,18 @@ defmodule Electric.Shapes.Api do

# TODO: discuss returning a 307 redirect rather than a 409, the client
# will have to detect this and throw out old data

# In SSE mode we send the must refetch object as an event
# instead of a singleton array containing that object
must_refetch =
if request.params.experimental_live_sse do
hd(@must_refetch)
else
@must_refetch
end

{:error,
Response.error(request, @must_refetch,
Response.error(request, must_refetch,
handle: active_shape_handle,
status: 409
)}
Expand Down Expand Up @@ -482,7 +498,7 @@ defmodule Electric.Shapes.Api do
if live? && Enum.take(log, 1) == [] do
request
|> update_attrs(%{ot_is_immediate_response: false})
|> hold_until_change()
|> handle_live_request()
else
up_to_date_lsn =
if live? do
Expand All @@ -495,9 +511,9 @@ defmodule Electric.Shapes.Api do
max(global_last_seen_lsn, chunk_end_offset.tx_offset)
end

body = Stream.concat([log, maybe_up_to_date(request, up_to_date_lsn)])
log_stream = Stream.concat(log, maybe_up_to_date(request, up_to_date_lsn))

%{response | chunked: true, body: encode_log(request, body)}
%{response | chunked: true, body: encode_log(request, log_stream)}
end

{:error, error} ->
Expand All @@ -511,6 +527,14 @@ defmodule Electric.Shapes.Api do
end
end

defp handle_live_request(%Request{params: %{experimental_live_sse: true}} = request) do
stream_sse_events(request)
end

defp handle_live_request(%Request{} = request) do
hold_until_change(request)
end

defp hold_until_change(%Request{} = request) do
%{
new_changes_ref: ref,
Expand Down Expand Up @@ -547,10 +571,182 @@ defmodule Electric.Shapes.Api do
end
end

defp stream_sse_events(%Request{} = request) do
%{
new_changes_ref: ref,
handle: shape_handle,
api: %{keepalive_interval: keepalive_interval, sse_timeout: sse_timeout},
params: %{offset: since_offset}
} = request

Logger.debug(
"Client #{inspect(self())} is streaming SSE for changes to #{shape_handle} since #{inspect(since_offset)}"
)

# Set up timer for SSE comment as keep-alive
keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

# Set up timer for SSE timeout
timeout_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)

# Stream changes as SSE events for the duration of the timer.
sse_event_stream =
Stream.resource(
fn ->
%SseState{
mode: :receive,
request: request,
stream: nil,
since_offset: since_offset,
last_message_time: System.monotonic_time(:millisecond),
keepalive_ref: keepalive_ref
}
end,
&next_sse_event/1,
fn %SseState{keepalive_ref: latest_keepalive_ref} ->
Process.cancel_timer(latest_keepalive_ref)
Process.cancel_timer(timeout_ref)
end
)

response = %{request.response | chunked: true, body: sse_event_stream}

%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
end

defp next_sse_event(%SseState{mode: :receive} = state) do
%{
keepalive_ref: keepalive_ref,
last_message_time: last_message_time,
request:
%{
api: %{
keepalive_interval: keepalive_interval
},
handle: shape_handle,
new_changes_ref: ref
} = request,
since_offset: since_offset
} = state

receive do
{^ref, :new_changes, latest_log_offset} ->
updated_request =
%{request | last_offset: latest_log_offset}
|> determine_global_last_seen_lsn()
|> determine_log_chunk_offset()
|> determine_up_to_date()

# This is usually but not always the `latest_log_offset`
# as per `determine_log_chunk_offset/1`.
end_offset = updated_request.chunk_end_offset

case Shapes.get_merged_log_stream(updated_request.api, shape_handle,
since: since_offset,
up_to: end_offset
) do
{:ok, log} ->
Process.cancel_timer(keepalive_ref)

control_messages = maybe_up_to_date(updated_request, end_offset.tx_offset)
message_stream = Stream.concat(log, control_messages)
encoded_stream = encode_log(updated_request, message_stream)

current_time = System.monotonic_time(:millisecond)

new_keepalive_ref =
Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

{[],
%{
state
| mode: :emit,
stream: encoded_stream,
since_offset: end_offset,
last_message_time: current_time,
keepalive_ref: new_keepalive_ref
}}

{:error, _error} ->
{[], state}
end

{^ref, :shape_rotation} ->
must_refetch = %{headers: %{control: "must-refetch"}}
message = encode_message(request, must_refetch)

{message, %{state | mode: :done}}

{:sse_keepalive, ^ref} ->
current_time = System.monotonic_time(:millisecond)
time_since_last_message = current_time - last_message_time

if time_since_last_message >= keepalive_interval do
new_keepalive_ref =
Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

{[": keep-alive\n\n"],
%{state | last_message_time: current_time, keepalive_ref: new_keepalive_ref}}
else
# Not time to send a keep-alive yet, schedule for the remaining time
remaining_time = keepalive_interval - time_since_last_message
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, remaining_time)

{[], %{state | keepalive_ref: new_keepalive_ref}}
end

{:sse_timeout, ^ref} ->
{[], %{state | mode: :done}}
end
end

defp next_sse_event(%SseState{mode: :emit} = state) do
%{stream: stream} = state

# Can change the number taken to adjust the grouping. Currently three
# because there's typically 3 elements per SSE -- the actual message
# and the "data: " and "\n\n" delimiters around it.
#
# The JSON encoder groups stream elements by 500. So perhaps this
# could be a larger number for more efficiency?
case StreamSplit.take_and_drop(stream, 3) do
{[], _tail} ->
{[], %{state | mode: :receive, stream: nil}}

{head, tail} ->
{head, %{state | stream: tail}}
end
end

defp next_sse_event(%SseState{mode: :done} = state), do: {:halt, state}

defp clean_up_change_listener(%Request{handle: shape_handle} = request)
when not is_nil(shape_handle) do
%{api: %{registry: registry}} = request
Registry.unregister(registry, shape_handle)
%{
api: %{
registry: registry,
sse_timeout: sse_timeout
},
params: %{
live: live?,
experimental_live_sse: live_sse?
}
} = request

# When handling SSE requests, the response body is a stream that listens for
# :new_changes events. If we unregister the shape_handle event listener immediately,
# we don't receive the events. So, in this case, we unregister the shape_handle
# listener after the sse_timeout, when we can be sure that the request is over.
if live? and live_sse? do
spawn(fn ->
:timer.sleep(sse_timeout)

Registry.unregister(registry, shape_handle)
end)
else
Registry.unregister(registry, shape_handle)
end

request
end

Expand Down Expand Up @@ -598,15 +794,34 @@ defmodule Electric.Shapes.Api do
def stack_id(%Api{stack_id: stack_id}), do: stack_id
def stack_id(%{api: %{stack_id: stack_id}}), do: stack_id

defp encode_log(%Request{api: api, params: %{live: true, experimental_live_sse: true}}, stream) do
encode_sse(api, :log, stream)
end

defp encode_log(%Request{api: api}, stream) do
encode(api, :log, stream)
end

@spec encode_message(Api.t() | Request.t(), term()) :: Enum.t()
def encode_message(%Api{} = api, message) do
# Error messages are encoded normally, even when using SSE
# because they are returned on the original fetch request
# with a status code that is not 2xx.
@spec encode_error_message(Api.t() | Request.t(), term()) :: Enum.t()
def encode_error_message(%Api{} = api, message) do
encode(api, :message, message)
end

def encode_error_message(%Request{api: api}, message) do
encode(api, :message, message)
end

@spec encode_message(Request.t(), term()) :: Enum.t()
def encode_message(
%Request{api: api, params: %{live: true, experimental_live_sse: true}},
message
) do
encode_sse(api, :message, message)
end

def encode_message(%Request{api: api}, message) do
encode(api, :message, message)
end
Expand All @@ -615,6 +830,10 @@ defmodule Electric.Shapes.Api do
apply(encoder, type, [message])
end

defp encode_sse(%Api{sse_encoder: sse_encoder}, type, message) when type in [:message, :log] do
apply(sse_encoder, type, [message])
end

def schema(%Response{
api: %Api{inspector: inspector},
shape_definition: %Shapes.Shape{} = shape
Expand Down
30 changes: 30 additions & 0 deletions packages/sync-service/lib/electric/shapes/api/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ defmodule Electric.Shapes.Api.Encoder.JSON do
end
end

defmodule Electric.Shapes.Api.Encoder.SSE do
@behaviour Electric.Shapes.Api.Encoder

@impl Electric.Shapes.Api.Encoder
def log(item_stream) do
# Note that, unlike the JSON log encoder, this doesn't currently use
# `Stream.chunk_every/1`.
#
# This is because it's only handling live events and is usually used
# for small updates (the point of enabling SSE mode is to avoid request
# overhead when consuming small changes).

item_stream
|> Stream.flat_map(&message/1)
end

@impl Electric.Shapes.Api.Encoder
def message(message) do
["data: ", ensure_json(message), "\n\n"]
end

defp ensure_json(json) when is_binary(json) do
json
end

defp ensure_json(term) do
Jason.encode_to_iodata!(term)
end
end

defmodule Electric.Shapes.Api.Encoder.Term do
@behaviour Electric.Shapes.Api.Encoder

Expand Down
Loading