Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ standard library.
| [`conditions`](./conditions) | Conditional- and range-request value types (ETag, Range, Conditions). |
| [`config`](./config) | Layered override → environment → default settings resolver; non-failing typed getters. |
| [`serde`](./serde) | Serialization seam (Marshaler/Unmarshaler) with a JSON default, plus Tristate for PATCH payloads. |
| [`sse`](./sse) | Server-Sent Events (text/event-stream) WHATWG parser + reconnecting Stream (Last-Event-ID replay). |
| [`sse`](./sse) | Server-Sent Events (text/event-stream) WHATWG parser + reconnecting Stream (Last-Event-ID replay); `Client.EventStream` runs it through the pipeline. |
| [`jsonl`](./jsonl) | JSON Lines / NDJSON streaming decoder (`iter.Seq2`). |
| [`webhook`](./webhook) | Inbound webhook signature verification (constant-time HMAC + timestamp tolerance). |
| [`formdata`](./formdata) | Multipart/form-data request body builder (replayable; file uploads). |
Expand Down Expand Up @@ -118,6 +118,26 @@ stripped and query values are redacted unless allowlisted with
`DEXPACE_RETRY_BASE_DELAY`, `DEXPACE_HTTP_TIMEOUT` (default transport only) — for
settings not set explicitly; explicit options always win.

### Streaming

`Client.EventStream(ctx, req, opts...)` returns an `iter.Seq2[sse.Event, error]`
reconnecting Server-Sent Events stream that runs through the pipeline — every
connection clones `req`, sets `Accept: text/event-stream`, and replays the
`Last-Event-ID` after a mid-stream interruption, so auth, logging, and tracing
run per connection. A non-2xx response or a transport failure on connect ends
the stream with that error; cancel the request context to stop.

```go
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://api.example.com/v1/events", nil)
for ev, err := range client.EventStream(ctx, req) {
if err != nil {
log.Printf("stream ended: %v", err)
break
}
fmt.Println(ev.Data)
}
```

## Requirements

Go **1.26+**. The module targets modern idioms: generics, range-over-func
Expand Down
3 changes: 2 additions & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
//
// The sse package parses Server-Sent Events (text/event-stream) into a
// range-over-func iterator of events, with a reconnecting Stream that replays the
// Last-Event-ID.
// Last-Event-ID. Client.EventStream wires that reconnecting stream through the
// pipeline, so every connection runs the configured policies.
//
// The webhook package verifies inbound webhook signatures (constant-time HMAC
// with a timestamp-tolerance window).
Expand Down
93 changes: 93 additions & 0 deletions docs/superpowers/specs/2026-06-16-client-eventstream-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Client-integrated SSE stream — design

**Date:** 2026-06-16
**Status:** Approved (standing delegation); ready for implementation
**Subsystem:** deferred-feature #6 (the `dexpace.Client`-integrated SSE client noted as out-of-scope in the SSE reconnect spec)

## Context

`sse.Stream(ctx, ConnectFunc, …)` reconnects, replays `Last-Event-ID`, and honours
server `retry` backoff, but the caller must wire the HTTP themselves via a
`ConnectFunc`. The common case — "stream this endpoint through my configured
client" — should be one call. This adds `Client.EventStream`, which builds the
`ConnectFunc` from the client pipeline so events flow through auth, logging,
tracing, retry, and the rest exactly like any other request.

## Decisions

1. **A method on `Client`, not a new type.** `EventStream(ctx, req, opts...)
iter.Seq2[sse.Event, error]` mirrors `Client.Do` (caller supplies a standard
`*http.Request`) and returns the same iterator shape as `sse.Parse`/`Stream`.
2. **Each (re)connect clones the request** with `req.Clone(ctx)`, sets
`Accept: text/event-stream` (unless the caller set one), adds the
`Last-Event-ID` header once an id has been seen, and sends it through `c.Do` —
so the full policy stack runs per connection (token refresh, per-connect logs).
3. **Connect status check.** A non-2xx response ends the stream with an error
(`sse.Stream` treats connect errors as terminal); the body is drained and
closed first. A transport error from `c.Do` is returned as-is.
4. **Replayable bodies.** SSE is normally `GET` (no body). When the request has a
body, the clone resets it via `req.GetBody` so reconnects re-send it; a
non-replayable body surfaces an error on the second connect.
5. **`opts ...sse.StreamOption` pass through** to `sse.Stream` (e.g.
`WithReconnectDelay`).

## Architecture

### `header` addition
`header.LastEventID = "Last-Event-Id"` (canonical form of `Last-Event-ID`).

### `eventstream.go` (package `dexpace`)

```go
// EventStream opens a reconnecting Server-Sent Events stream for req and yields
// decoded events through the client pipeline. Each connection clones req, sets
// Accept: text/event-stream (unless already set) and, after the first event id,
// the Last-Event-ID header, then sends it via Do. A non-2xx response or a
// transport failure on connect ends the stream with that error; a mid-stream
// interruption reconnects transparently with the most recent event id. Cancel the
// request context to stop. The iterator is single-pass.
func (c *Client) EventStream(ctx context.Context, req *http.Request, opts ...sse.StreamOption) iter.Seq2[sse.Event, error]
```

The `ConnectFunc` clones `req` with the connect ctx, resets a replayable body via
`req.GetBody`, sets `Accept` and `Last-Event-ID`, calls `c.Do`, closes the body and
returns an error on a transport failure or non-2xx status, else returns
`resp.Body`.

## Edge cases

- Caller-set `Accept` is preserved (some servers want a custom accept).
- Non-2xx connect → drained, closed, terminal error with the status.
- Transport error → returned; with the default retry policy, transient connect
failures are already retried inside `c.Do` before `sse.Stream` sees them.
- Body present but non-replayable → error on reconnect (documented; SSE is
normally bodyless).
- Context cancel → `sse.Stream` stops without a further connect (existing
behaviour).

## Package layout

| Path | Change |
|---|---|
| `header/header.go` (modify) | add `LastEventID` |
| `eventstream.go` (new, package dexpace) | `Client.EventStream` |
| `eventstream_test.go` (new, `dexpace_test`) | flatten/reconnect, Last-Event-ID, Accept, non-2xx |
| `doc.go`, `README.md` | document |

## Testing

- **Flatten + reconnect**: a stub transporter returns two SSE bodies then a
terminal error, with `WithReconnectDelay(0)`; assert events `a,b,c` in order
then the error.
- **Last-Event-ID replay**: events carry ids; assert the header sent on the second
connect equals the last dispatched id.
- **Accept header**: every connect carries `Accept: text/event-stream`.
- **Non-2xx connect**: a 503 first response yields a terminal error and no events.
- Retry disabled in tests (`WithRetry(retry.Options{MaxRetries: -1})`) for
deterministic connect counts. Table-driven where natural, parallel; stdlib-only;
`gofmt`/`go vet`/`go test -race` clean.

## Out of scope (deferred)

- A typed event-decoding helper (callers decode `Event.Data` with `serde`/`jsonl`).
- Automatic `GET`-forcing or body stripping — the caller controls the request.
63 changes: 63 additions & 0 deletions eventstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

package dexpace

import (
"context"
"fmt"
"io"
"iter"
"net/http"

"github.com/dexpace/go-sdk/header"
"github.com/dexpace/go-sdk/mediatype"
"github.com/dexpace/go-sdk/sse"
)

// EventStream opens a reconnecting Server-Sent Events stream for req and yields
// decoded events through the client pipeline. Each connection clones req, sets
// Accept: text/event-stream (unless the caller already set Accept) and, after the
// first event id is seen, the Last-Event-ID header, then sends it through Do — so
// auth, logging, tracing, and the rest run per connection.
//
// A non-2xx response or a transport failure on connect ends the stream with that
// error (a connect error is terminal). A mid-stream interruption reconnects
// transparently with the most recent event id. When req carries a body it must be
// replayable (req.GetBody set, as net/http does for in-memory bodies) so reconnects
// can re-send it; SSE is normally a bodyless GET. Cancel the request context to
// stop. The iterator is single-pass. Pass sse.StreamOption values (for example
// sse.WithReconnectDelay) to configure reconnection.
func (c *Client) EventStream(ctx context.Context, req *http.Request, opts ...sse.StreamOption) iter.Seq2[sse.Event, error] {
connect := func(ctx context.Context, lastEventID string) (io.ReadCloser, error) {
r := req.Clone(ctx)
if req.Body != nil && req.GetBody != nil {
body, err := req.GetBody()
if err != nil {
return nil, fmt.Errorf("dexpace: rewind event-stream request body: %w", err)
}
r.Body = body
}
if r.Header.Get(header.Accept) == "" {
r.Header.Set(header.Accept, mediatype.TextEventStream.Essence())
}
if lastEventID != "" {
r.Header.Set(header.LastEventID, lastEventID)
}

resp, err := c.Do(r)
if err != nil {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 4096))
_ = resp.Body.Close()
return nil, fmt.Errorf("dexpace: event stream connect: unexpected status %s", resp.Status)
}
return resp.Body, nil
}
return sse.Stream(ctx, connect, opts...)
}
153 changes: 153 additions & 0 deletions eventstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

package dexpace_test

import (
"context"
"errors"
"io"
"net/http"
"strings"
"sync"
"testing"

dexpace "github.com/dexpace/go-sdk"
"github.com/dexpace/go-sdk/retry"
"github.com/dexpace/go-sdk/sse"
)

// sseStub is a stub Transporter that returns scripted responses and records the
// Last-Event-Id and Accept headers seen on each connect.
type sseStub struct {
mu sync.Mutex
calls int
lastEventIDs []string
accepts []string
responses []func() (*http.Response, error)
}

func (s *sseStub) Do(req *http.Request) (*http.Response, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastEventIDs = append(s.lastEventIDs, req.Header.Get("Last-Event-Id"))
s.accepts = append(s.accepts, req.Header.Get("Accept"))
i := s.calls
s.calls++
if i < len(s.responses) {
return s.responses[i]()
}
return nil, errors.New("sseStub: no more scripted responses")
}

func sseBody(status int, body string) func() (*http.Response, error) {
return func() (*http.Response, error) {
return &http.Response{
StatusCode: status,
Status: http.StatusText(status),
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader(body)),
}, nil
}
}

func newEventStreamClient(stub *sseStub) *dexpace.Client {
return dexpace.New(
dexpace.WithTransport(stub),
dexpace.WithRetry(retry.Options{MaxRetries: -1}), // deterministic connect counts
)
}

func TestEventStreamReconnectAndFlatten(t *testing.T) {
t.Parallel()
stub := &sseStub{responses: []func() (*http.Response, error){
sseBody(200, "id: 1\ndata: a\n\nid: 2\ndata: b\n\n"),
sseBody(200, "id: 3\ndata: c\n\n"),
func() (*http.Response, error) { return nil, errors.New("stop") },
}}
client := newEventStreamClient(stub)

req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://example.test/stream", nil)
if err != nil {
t.Fatal(err)
}

var data []string
var gotErr error
for ev, err := range client.EventStream(context.Background(), req, sse.WithReconnectDelay(0)) {
if err != nil {
gotErr = err
break
}
data = append(data, ev.Data)
}
if strings.Join(data, ",") != "a,b,c" {
t.Fatalf("events = %v, want [a b c]", data)
}
if gotErr == nil || !strings.Contains(gotErr.Error(), "stop") {
t.Fatalf("final error = %v, want the terminal connect error", gotErr)
}
}

func TestEventStreamLastEventIDAndAccept(t *testing.T) {
t.Parallel()
stub := &sseStub{responses: []func() (*http.Response, error){
sseBody(200, "id: 1\ndata: a\n\nid: 2\ndata: b\n\n"),
sseBody(200, "id: 3\ndata: c\n\n"),
func() (*http.Response, error) { return nil, errors.New("stop") },
}}
client := newEventStreamClient(stub)
req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://example.test/stream", nil)

for _, err := range client.EventStream(context.Background(), req, sse.WithReconnectDelay(0)) {
if err != nil {
break
}
}

stub.mu.Lock()
defer stub.mu.Unlock()
if len(stub.lastEventIDs) < 3 {
t.Fatalf("got %d connects, want >= 3", len(stub.lastEventIDs))
}
if stub.lastEventIDs[0] != "" {
t.Fatalf("first connect Last-Event-Id = %q, want empty", stub.lastEventIDs[0])
}
if stub.lastEventIDs[1] != "2" {
t.Fatalf("second connect Last-Event-Id = %q, want 2", stub.lastEventIDs[1])
}
if stub.lastEventIDs[2] != "3" {
t.Fatalf("third connect Last-Event-Id = %q, want 3", stub.lastEventIDs[2])
}
for i, a := range stub.accepts {
if a != "text/event-stream" {
t.Fatalf("connect %d Accept = %q, want text/event-stream", i, a)
}
}
}

func TestEventStreamNon2xxIsTerminal(t *testing.T) {
t.Parallel()
stub := &sseStub{responses: []func() (*http.Response, error){
sseBody(503, "unavailable"),
}}
client := newEventStreamClient(stub)
req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://example.test/stream", nil)

var events int
var gotErr error
for ev, err := range client.EventStream(context.Background(), req, sse.WithReconnectDelay(0)) {
if err != nil {
gotErr = err
break
}
_ = ev
events++
}
if events != 0 {
t.Fatalf("got %d events, want 0 on a non-2xx connect", events)
}
if gotErr == nil {
t.Fatal("want a terminal error on a non-2xx connect")
}
}
1 change: 1 addition & 0 deletions header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
IfModifiedSince = "If-Modified-Since"
IfNoneMatch = "If-None-Match"
IfUnmodifiedSince = "If-Unmodified-Since"
LastEventID = "Last-Event-Id" // canonical form of "Last-Event-ID"
Location = "Location"
Range = "Range"
RetryAfter = "Retry-After"
Expand Down
Loading