diff --git a/CLAUDE.md b/CLAUDE.md index 37fd21f..f1bc7a6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -95,7 +95,8 @@ go-sdk/ ├── instrumentation/ # tracing + metrics SPIs, no-op defaults, policies ├── config/ # layered override→env→default settings ├── serde/ # Marshaler/Unmarshaler seam + JSON default + Tristate -├── sse/ webhook/ # placeholders (doc.go only) +├── sse/ # Server-Sent Events (WHATWG) parser +├── webhook/ # placeholder (doc.go only) ├── .golangci.yml Makefile .github/workflows/ci.yml └── CONTRIBUTING.md CLAUDE.md README.md LICENSE ``` diff --git a/README.md b/README.md index 8f97c86..8f5e549 100644 --- a/README.md +++ b/README.md @@ -60,9 +60,10 @@ standard library. | [`conditions`](./conditions) | Conditional- and range-request value types (ETag, Range, Conditions). | | [`config`](./config) | Layered override → environment → default settings resolver; non-failing typed getters. | | [`serde`](./serde) | Serialization seam (Marshaler/Unmarshaler) with a JSON default, plus Tristate for PATCH payloads. | +| [`sse`](./sse) | Server-Sent Events (text/event-stream) WHATWG parser. | | root [`dexpace`](.) | Umbrella `Client` wiring the default policy stack. | -Reserved for upcoming work (placeholder packages today): `sse`, `webhook`. +Reserved for upcoming work (placeholder packages today): `webhook`. ### Pipeline order diff --git a/doc.go b/doc.go index b2ba0b5..60d174a 100644 --- a/doc.go +++ b/doc.go @@ -56,5 +56,8 @@ // JSON default) and Tristate for JSON PATCH payloads; httperr.ResponseError.DecodeInto // decodes an error body into a typed value. // +// The sse package parses Server-Sent Events (text/event-stream) into a +// range-over-func iterator of events. +// // All of core depends only on the Go standard library. package dexpace diff --git a/docs/superpowers/plans/2026-06-16-sse.md b/docs/superpowers/plans/2026-06-16-sse.md new file mode 100644 index 0000000..841a93c --- /dev/null +++ b/docs/superpowers/plans/2026-06-16-sse.md @@ -0,0 +1,470 @@ +# SSE Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Implement the WHATWG Server-Sent Events parser — an `Event` type and `Parse(r io.Reader) iter.Seq2[Event, error]` — in the `sse` package. + +**Architecture:** `Parse` runs the WHATWG event-stream interpretation algorithm over a bounded `bufio.Scanner`, accumulating `data`/`event`/`id`/`retry` fields and dispatching an `Event` on each blank line, yielded through a range-over-func iterator. + +**Tech Stack:** Go 1.26+, standard library only (`bufio`, `io`, `iter`, `strconv`, `strings`, `time`). Zero third-party dependencies. + +**Conventions every task must follow:** +- MIT license header on every `.go` file before the `package` clause: + ```go + // Copyright (c) 2026 dexpace and Omar Aljarrah. + // Licensed under the MIT License. See LICENSE in the repository root for details. + ``` +- Import groups: stdlib only here. +- Tests use `t.Parallel()`; table-driven where natural; stdlib-only. +- Tools: Go 1.26.3; `gofumpt`/`golangci-lint` NOT installed — use `gofmt`, `go vet`, `go test -race`. +- Run commands from the repo root `/Users/omar/dexpace/go-sdk`. + +--- + +## File Structure + +| Path | Responsibility | +|---|---| +| `sse/doc.go` (modify) | real package comment | +| `sse/event.go` (new) | the `Event` type | +| `sse/parse.go` (new) | `Parse` + WHATWG interpreter + helpers | +| `sse/parse_test.go` (new) | algorithm tests | +| `doc.go`, `README.md`, `CLAUDE.md` (modify) | document; de-placeholder `sse` | + +--- + +## Task 1: `Event` and `Parse` + +**Files:** +- Modify: `sse/doc.go` +- Create: `sse/event.go`, `sse/parse.go`, `sse/parse_test.go` + +- [ ] **Step 1: Write the failing tests** + +```go +// sse/parse_test.go +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse_test + +import ( + "bufio" + "errors" + "strings" + "testing" + "time" + + "github.com/dexpace/go-sdk/sse" +) + +func collectEvents(t *testing.T, input string) []sse.Event { + t.Helper() + var events []sse.Event + for ev, err := range sse.Parse(strings.NewReader(input)) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + events = append(events, ev) + } + return events +} + +func TestParseSingleEvent(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "data: hello\n\n") + if len(events) != 1 { + t.Fatalf("got %d events, want 1", len(events)) + } + got := events[0] + if got.Type != "message" || got.Data != "hello" { + t.Fatalf("event = %+v, want Type=message Data=hello", got) + } +} + +func TestParseMultiLineData(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "data: a\ndata: b\n\n") + if len(events) != 1 || events[0].Data != "a\nb" { + t.Fatalf("events = %+v, want one event with Data=\"a\\nb\"", events) + } +} + +func TestParseEventTypeAndStickyID(t *testing.T) { + t.Parallel() + + input := "event: greeting\nid: 1\ndata: hi\n\ndata: again\n\n" + events := collectEvents(t, input) + if len(events) != 2 { + t.Fatalf("got %d events, want 2", len(events)) + } + if events[0].Type != "greeting" || events[0].ID != "1" || events[0].Data != "hi" { + t.Fatalf("event[0] = %+v", events[0]) + } + // Second event has no id/event fields: type defaults to message, id is sticky. + if events[1].Type != "message" || events[1].ID != "1" || events[1].Data != "again" { + t.Fatalf("event[1] = %+v, want Type=message ID=1 Data=again", events[1]) + } +} + +func TestParseRetry(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "retry: 2500\ndata: x\n\n") + if len(events) != 1 || events[0].Retry != 2500*time.Millisecond { + t.Fatalf("events = %+v, want Retry=2.5s", events) + } + + // Non-numeric retry is ignored. + events = collectEvents(t, "retry: soon\ndata: x\n\n") + if len(events) != 1 || events[0].Retry != 0 { + t.Fatalf("events = %+v, want Retry=0 for non-numeric", events) + } +} + +func TestParseCommentsAndBlankProduceNoEvents(t *testing.T) { + t.Parallel() + + if events := collectEvents(t, ": keep-alive\n\n"); len(events) != 0 { + t.Fatalf("comment+blank produced %d events, want 0", len(events)) + } + if events := collectEvents(t, "\n\n\n"); len(events) != 0 { + t.Fatalf("blank lines produced %d events, want 0", len(events)) + } +} + +func TestParseCRLF(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "data: hello\r\n\r\n") + if len(events) != 1 || events[0].Data != "hello" { + t.Fatalf("CRLF events = %+v, want Data=hello", events) + } +} + +func TestParseLeadingSpaceStripping(t *testing.T) { + t.Parallel() + + tests := map[string]string{ + "data: x\n\n": "x", // one leading space stripped + "data:x\n\n": "x", // no space + "data: x\n\n": " x", // only one space stripped + } + for input, want := range tests { + events := collectEvents(t, input) + if len(events) != 1 || events[0].Data != want { + t.Fatalf("input %q -> %+v, want Data=%q", input, events, want) + } + } +} + +func TestParseDiscardsPartialEventAtEOF(t *testing.T) { + t.Parallel() + + // No trailing blank line: the event is never dispatched. + if events := collectEvents(t, "data: incomplete\n"); len(events) != 0 { + t.Fatalf("got %d events, want 0 (partial discarded at EOF)", len(events)) + } +} + +// errReader returns its data on the first read, then err. +type errReader struct { + data []byte + err error + done bool +} + +func (r *errReader) Read(p []byte) (int, error) { + if r.done { + return 0, r.err + } + r.done = true + return copy(p, r.data), nil +} + +func TestParseSurfacesReadError(t *testing.T) { + t.Parallel() + + boom := errors.New("boom") + r := &errReader{data: []byte("data: x\n\ndata: y"), err: boom} + + var events []sse.Event + var gotErr error + for ev, err := range sse.Parse(r) { + if err != nil { + gotErr = err + break + } + events = append(events, ev) + } + if len(events) != 1 || events[0].Data != "x" { + t.Fatalf("events = %+v, want one event with Data=x before the error", events) + } + if !errors.Is(gotErr, boom) { + t.Fatalf("err = %v, want boom", gotErr) + } +} + +func TestParseOverLongLine(t *testing.T) { + t.Parallel() + + long := "data: " + strings.Repeat("a", 2<<20) // exceeds the 1 MiB cap + var gotErr error + for _, err := range sse.Parse(strings.NewReader(long)) { + if err != nil { + gotErr = err + break + } + } + if !errors.Is(gotErr, bufio.ErrTooLong) { + t.Fatalf("err = %v, want bufio.ErrTooLong", gotErr) + } +} + +func TestParseEarlyBreak(t *testing.T) { + t.Parallel() + + count := 0 + for range sse.Parse(strings.NewReader("data: a\n\ndata: b\n\n")) { + count++ + break + } + if count != 1 { + t.Fatalf("consumed %d events, want 1 after break", count) + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `go test ./sse/ -v` +Expected: FAIL — `sse.Event`/`sse.Parse` undefined. + +- [ ] **Step 3: Replace `sse/doc.go`** + +```go +// sse/doc.go +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +// Package sse parses a text/event-stream (Server-Sent Events) per the WHATWG +// algorithm, yielding each dispatched [Event] through [Parse] as a range-over-func +// iterator. It operates on any io.Reader; a reconnecting connection +// (Last-Event-ID replay, server retry backoff) is intentionally left to the +// caller and a future addition. +package sse +``` + +- [ ] **Step 4: Create `sse/event.go`** + +```go +// sse/event.go +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import "time" + +// Event is one Server-Sent Event dispatched by [Parse]. +type Event struct { + // Type is the event type, or "message" when the stream did not specify one. + Type string + // Data is the event payload: multiple data lines joined with "\n", with the + // trailing newline removed. + Data string + // ID is the most recent event id. It is sticky: an event without an id field + // keeps the previous value, per the WHATWG specification. + ID string + // Retry is the reconnection-time hint from a retry field on this event, or 0 + // when none was given. + Retry time.Duration +} +``` + +- [ ] **Step 5: Create `sse/parse.go`** + +```go +// sse/parse.go +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import ( + "bufio" + "io" + "iter" + "strconv" + "strings" + "time" +) + +// maxLineBytes caps a single event-stream line, so a malformed stream cannot grow +// the read buffer without bound. An over-long line surfaces bufio.ErrTooLong. +const maxLineBytes = 1 << 20 // 1 MiB + +// Parse interprets r as a text/event-stream and yields each dispatched event, +// following the WHATWG event-stream algorithm. Fields (data, event, id, retry) +// are accumulated and an event is dispatched on a blank line. Lines may end with +// LF or CRLF; comment lines (beginning with ":") are ignored. A read error or an +// over-long line is delivered as the second iteration value, after which +// iteration stops. A partially-accumulated event at end of stream is discarded. +// The iterator is single-pass. +func Parse(r io.Reader) iter.Seq2[Event, error] { + return func(yield func(Event, error) bool) { + sc := bufio.NewScanner(r) + sc.Buffer(make([]byte, 0, 64<<10), maxLineBytes) + + var data strings.Builder + eventType := "" + lastID := "" + var retry time.Duration + hasData := false + + for sc.Scan() { + line := sc.Text() + + if line == "" { // dispatch + if hasData { + payload := strings.TrimSuffix(data.String(), "\n") + typ := eventType + if typ == "" { + typ = "message" + } + if !yield(Event{Type: typ, Data: payload, ID: lastID, Retry: retry}, nil) { + return + } + } + data.Reset() + eventType = "" + retry = 0 + hasData = false + continue + } + + if line[0] == ':' { // comment + continue + } + + field, value, _ := strings.Cut(line, ":") + value = strings.TrimPrefix(value, " ") + switch field { + case "data": + data.WriteString(value) + data.WriteByte('\n') + hasData = true + case "event": + eventType = value + case "id": + if !strings.ContainsRune(value, '\x00') { + lastID = value + } + case "retry": + if ms, ok := parseRetry(value); ok { + retry = ms + } + } + } + + if err := sc.Err(); err != nil { + yield(Event{}, err) + } + // EOF: any partially-accumulated event is discarded per the spec. + } +} + +// parseRetry parses an all-ASCII-digits retry value into a duration of +// milliseconds. It reports ok=false for empty or non-numeric input. +func parseRetry(s string) (time.Duration, bool) { + if s == "" { + return 0, false + } + for _, r := range s { + if r < '0' || r > '9' { + return 0, false + } + } + n, err := strconv.Atoi(s) + if err != nil { + return 0, false + } + return time.Duration(n) * time.Millisecond, true +} +``` + +- [ ] **Step 6: Run tests to verify they pass** + +Run: `go test ./sse/ -v` +Expected: PASS — all parser tests. + +- [ ] **Step 7: Commit** + +```bash +git add sse/doc.go sse/event.go sse/parse.go sse/parse_test.go +git commit -m "feat(sse): add WHATWG event-stream parser" +``` + +--- + +## Task 2: docs and full gate + +**Files:** +- Modify: `doc.go`, `README.md`, `CLAUDE.md` + +- [ ] **Step 1: Mention sse in `doc.go`** + +Read `doc.go`. Within the `package dexpace` doc comment (single contiguous `//` +block; no second package clause / no duplicate header), add: + +```go +// The sse package parses Server-Sent Events (text/event-stream) into a +// range-over-func iterator of events. +``` + +- [ ] **Step 2: Update `README.md`** + +Read `README.md`. Add an `sse` row to the architecture/package table (matching the +column/link style): "Server-Sent Events (text/event-stream) WHATWG parser." If a +"reserved/placeholder packages" line lists `sse`, remove `sse` from it. + +- [ ] **Step 3: De-placeholder `sse` in `CLAUDE.md`** + +Read `CLAUDE.md`. In the Repository Layout tree: +1. Remove `sse/` from the placeholder line (likely now `sse/ webhook/ # placeholders (doc.go only)`), leaving `webhook/`. +2. Add an `sse/` line near the other packages: `sse/ # Server-Sent Events parser`. + +- [ ] **Step 4: Run the full gate** + +Run: +```bash +gofmt -l . +go vet ./... +go test -race ./... +``` +Expected: `gofmt -l .` prints nothing; `go vet` clean; every package passes under +the race detector (`sse` now has tests; only `webhook` remains a placeholder). + +- [ ] **Step 5: Commit** + +```bash +git add doc.go README.md CLAUDE.md +git commit -m "docs: document sse package and de-placeholder it" +``` + +--- + +## Self-Review notes (for the implementer) + +- **Spec coverage:** `Event` + `Parse` with the full WHATWG algorithm (Task 1); + docs + de-placeholder (Task 2). The reconnecting layer is intentionally deferred. +- **Type consistency:** `sse.Event{Type,Data,ID,Retry}` and `sse.Parse(io.Reader) + iter.Seq2[Event, error]` are used identically across tasks/tests. +- **Bounded:** the scanner buffer is capped at `maxLineBytes`; an over-long line + surfaces `bufio.ErrTooLong`. +- **WHATWG details verified by tests:** sticky id, multi-line data join + trailing + newline strip, one-leading-space strip, comment/blank no-dispatch, partial-event + discard at EOF, CRLF, retry parse, read-error surfacing, early break. +- **`make check`** green before opening the PR. diff --git a/docs/superpowers/specs/2026-06-16-sse-design.md b/docs/superpowers/specs/2026-06-16-sse-design.md new file mode 100644 index 0000000..ac7b7eb --- /dev/null +++ b/docs/superpowers/specs/2026-06-16-sse-design.md @@ -0,0 +1,127 @@ +# SSE (Server-Sent Events) — design + +**Date:** 2026-06-16 +**Status:** Approved (standing delegation); ready for implementation planning +**Subsystem:** #9 of the Go SDK platform-parity roadmap + +## Context + +The `sse` package is a placeholder. Java/Python ship a WHATWG event-stream parser +and a reconnecting connection (Last-Event-ID replay, server `retry` backoff). This +subsystem delivers the parser core; the reconnecting layer is deferred. + +## Decisions + +1. **Scope: the WHATWG parser.** `Event` plus `Parse(r io.Reader) iter.Seq2[Event, error]` + implementing the WHATWG "event stream interpretation" algorithm. +2. **Idiomatic streaming.** Events are yielded through a range-over-func iterator, + matching `pagination`. +3. **Bounded lines.** The scanner buffer is capped (`maxLineBytes`); an over-long + line yields an error rather than growing without bound. +4. **Deferred (documented): the reconnecting connection.** Last-Event-ID replay, + server `retry` honoring, and reconnect/backoff are a resilience layer that + needs timer-driven control flow; it lands in a follow-up. The parser is fully + usable on its own (a caller can reconnect by re-invoking `Parse` on a fresh + stream). + +## Architecture + +### `Event` and `Parse` (`sse` package, stdlib-only) + +```go +// Event is one Server-Sent Event. +type Event struct { + // Type is the event type ("message" when the stream did not specify one). + Type string + // Data is the event payload with its trailing newline removed; multi-line + // data fields are joined with "\n". + Data string + // ID is the last seen event id (sticky across events, per the WHATWG spec). + ID string + // Retry is the reconnection-time hint from a retry field on this event, or 0 + // when none was given. + Retry time.Duration +} + +// Parse interprets r as a text/event-stream and yields each dispatched event. +// It follows the WHATWG event-stream algorithm: data/event/id/retry fields are +// accumulated and an event is dispatched on a blank line. Lines may end with LF +// or CRLF. Comment lines (beginning with ":") are ignored. A read error (or an +// over-long line) is delivered as the second iteration value, after which +// iteration stops. The iterator is single-pass. +func Parse(r io.Reader) iter.Seq2[Event, error] +``` + +### Algorithm (WHATWG event-stream interpretation) + +Line scanning uses `bufio.Scanner` (LF and CRLF; bare-CR-only terminators — vanishingly +rare — are not split and are documented as unsupported). Buffers: `data` (a +`strings.Builder`), `eventType` (string), `lastID` (string, sticky). + +Per line: +- **empty line** → dispatch: if the `data` buffer is non-empty, yield an `Event` + with `Type = eventType or "message"`, `Data = data` with one trailing `"\n"` + removed, `ID = lastID`, `Retry = retry` (the value parsed for this event, else + 0). Reset `data`, `eventType`, and the per-event `retry` (NOT `lastID`). If the + `data` buffer is empty, reset `eventType`/`retry` and continue without yielding. +- **line starting with `:`** → comment, ignored. +- **field line** → split on the first `:`; the value has at most one leading space + stripped: + - `data` → append `value + "\n"` to the `data` buffer. + - `event` → `eventType = value`. + - `id` → if the value contains no NUL, `lastID = value`. + - `retry` → if the value is all ASCII digits, set this event's `retry` to that + many milliseconds. + - any other field name → ignored. +- **EOF** → a partially-accumulated (never blank-line-terminated) event is + discarded, per the spec. + +`maxLineBytes` caps the scanner buffer; `bufio.ErrTooLong` is surfaced as the +iteration error. + +## Edge cases + +- `data:` with no value → appends `"\n"` (an empty data line is still data). +- Multiple `data:` lines → joined with `"\n"`; the final trailing `"\n"` is + stripped on dispatch (so two `data: a` / `data: b` lines → `"a\nb"`). +- `id` is sticky: an event without an `id` field keeps the previous `lastID`. +- `id` containing a NUL byte is ignored (lastID unchanged), per spec. +- `retry: abc` (non-numeric) is ignored. +- A blank line with no preceding data dispatches nothing. +- A comment-only line (`: keep-alive`) is ignored and does not dispatch. +- Leading-space stripping: `data: x` → `x`; `data:x` → `x`; `data: x` → `" x"`. +- Read error mid-stream → yielded once, then iteration stops; any partial event is + discarded. +- Early `break` from the iterator stops scanning (range-over-func semantics). + +## Package layout + +| Path | Change | +|---|---| +| `sse/doc.go` (modify) | real package comment (note the deferred reconnect layer) | +| `sse/event.go` (new) | the `Event` type | +| `sse/parse.go` (new) | `Parse` + the WHATWG interpreter | +| `sse/parse_test.go` (new) | algorithm tests | +| `doc.go`, `README.md`, `CLAUDE.md` | document; de-placeholder `sse` | + +## Testing + +- Single event: `data: hello\n\n` → one event, `Type=message`, `Data=hello`. +- Multi-line data joined with `\n`; trailing newline stripped. +- `event:` sets the type; `id:` is sticky across events; `retry:` parsed to a + duration (and non-numeric ignored). +- Comment lines and blank-line-only input dispatch nothing. +- CRLF line endings parse identically to LF. +- Leading-space stripping rules. +- An over-long line yields an error. +- A mid-stream read error (a fake reader) is surfaced and ends iteration. +- Early `break` stops consuming. +- Table-driven, parallel; stdlib-only; `gofmt`/`go vet`/`go test -race` clean. + +## Out of scope (deferred) + +- Reconnecting connection: Last-Event-ID replay, server `retry` honoring, + reconnect with backoff. (Follow-up subsystem.) +- Bare-CR-only line terminators (documented as unsupported; no real server uses + them). +- An SSE client integrated with `dexpace.Client` (the parser takes any `io.Reader`). diff --git a/sse/doc.go b/sse/doc.go index d7dd8ba..3cd8be1 100644 --- a/sse/doc.go +++ b/sse/doc.go @@ -1,10 +1,9 @@ // Copyright (c) 2026 dexpace and Omar Aljarrah. // Licensed under the MIT License. See LICENSE in the repository root for details. -// Package sse will provide a Server-Sent Events (text/event-stream) parser and a -// streaming reader that yields events as a range-over-func iterator, mirroring -// the SSE support in the sibling Java and Python SDKs. -// -// Status: placeholder. The package is reserved as part of the initial repository -// structure and has no exported API yet. +// Package sse parses a text/event-stream (Server-Sent Events) per the WHATWG +// algorithm, yielding each dispatched [Event] through [Parse] as a range-over-func +// iterator. It operates on any io.Reader; a reconnecting connection +// (Last-Event-ID replay, server retry backoff) is intentionally left to the +// caller and a future addition. package sse diff --git a/sse/event.go b/sse/event.go new file mode 100644 index 0000000..86769a9 --- /dev/null +++ b/sse/event.go @@ -0,0 +1,21 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import "time" + +// Event is one Server-Sent Event dispatched by [Parse]. +type Event struct { + // Type is the event type, or "message" when the stream did not specify one. + Type string + // Data is the event payload: multiple data lines joined with "\n", with the + // trailing newline removed. + Data string + // ID is the most recent event id. It is sticky: an event without an id field + // keeps the previous value, per the WHATWG specification. + ID string + // Retry is the reconnection-time hint from a retry field on this event, or 0 + // when none was given. + Retry time.Duration +} diff --git a/sse/parse.go b/sse/parse.go new file mode 100644 index 0000000..ff46001 --- /dev/null +++ b/sse/parse.go @@ -0,0 +1,104 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse + +import ( + "bufio" + "io" + "iter" + "strconv" + "strings" + "time" +) + +// maxLineBytes caps a single event-stream line, so a malformed stream cannot grow +// the read buffer without bound. An over-long line surfaces bufio.ErrTooLong. +const maxLineBytes = 1 << 20 // 1 MiB + +// Parse interprets r as a text/event-stream and yields each dispatched event, +// following the WHATWG event-stream algorithm. Fields (data, event, id, retry) +// are accumulated and an event is dispatched on a blank line. Lines may end with +// LF or CRLF; comment lines (beginning with ":") are ignored. A read error or an +// over-long line is delivered as the second iteration value, after which +// iteration stops. A partially-accumulated event at end of stream is discarded. +// The iterator is single-pass. +func Parse(r io.Reader) iter.Seq2[Event, error] { + return func(yield func(Event, error) bool) { + sc := bufio.NewScanner(r) + sc.Buffer(make([]byte, 0, 64<<10), maxLineBytes) + + var data strings.Builder + eventType := "" + lastID := "" + var retry time.Duration + hasData := false + + for sc.Scan() { + line := sc.Text() + + if line == "" { + if hasData { + payload := strings.TrimSuffix(data.String(), "\n") + typ := eventType + if typ == "" { + typ = "message" + } + if !yield(Event{Type: typ, Data: payload, ID: lastID, Retry: retry}, nil) { + return + } + } + data.Reset() + eventType = "" + retry = 0 + hasData = false + continue + } + + if line[0] == ':' { + continue + } + + field, value, _ := strings.Cut(line, ":") + value = strings.TrimPrefix(value, " ") + switch field { + case "data": + data.WriteString(value) + data.WriteByte('\n') + hasData = true + case "event": + eventType = value + case "id": + if !strings.ContainsRune(value, '\x00') { + lastID = value + } + case "retry": + if ms, ok := parseRetry(value); ok { + retry = ms + } + } + } + + if err := sc.Err(); err != nil { + yield(Event{}, err) + } + } +} + +// parseRetry parses an all-ASCII-digits retry value into a duration of +// milliseconds. It reports ok=false for empty or non-numeric input. +func parseRetry(s string) (time.Duration, bool) { + if s == "" { + return 0, false + } + for _, r := range s { + if r < '0' || r > '9' { + return 0, false + } + } + n, err := strconv.Atoi(s) + if err != nil { + return 0, false + } + return time.Duration(n) * time.Millisecond, true +} diff --git a/sse/parse_test.go b/sse/parse_test.go new file mode 100644 index 0000000..73e30b1 --- /dev/null +++ b/sse/parse_test.go @@ -0,0 +1,229 @@ +// Copyright (c) 2026 dexpace and Omar Aljarrah. +// Licensed under the MIT License. See LICENSE in the repository root for details. + +package sse_test + +import ( + "bufio" + "errors" + "strings" + "testing" + "time" + + "github.com/dexpace/go-sdk/sse" +) + +func collectEvents(t *testing.T, input string) []sse.Event { + t.Helper() + var events []sse.Event + for ev, err := range sse.Parse(strings.NewReader(input)) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + events = append(events, ev) + } + return events +} + +func TestParseSingleEvent(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "data: hello\n\n") + if len(events) != 1 { + t.Fatalf("got %d events, want 1", len(events)) + } + got := events[0] + if got.Type != "message" || got.Data != "hello" { + t.Fatalf("event = %+v, want Type=message Data=hello", got) + } +} + +func TestParseMultiLineData(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "data: a\ndata: b\n\n") + if len(events) != 1 || events[0].Data != "a\nb" { + t.Fatalf("events = %+v, want one event with Data=\"a\\nb\"", events) + } +} + +func TestParseEventTypeAndStickyID(t *testing.T) { + t.Parallel() + + input := "event: greeting\nid: 1\ndata: hi\n\ndata: again\n\n" + events := collectEvents(t, input) + if len(events) != 2 { + t.Fatalf("got %d events, want 2", len(events)) + } + if events[0].Type != "greeting" || events[0].ID != "1" || events[0].Data != "hi" { + t.Fatalf("event[0] = %+v", events[0]) + } + if events[1].Type != "message" || events[1].ID != "1" || events[1].Data != "again" { + t.Fatalf("event[1] = %+v, want Type=message ID=1 Data=again", events[1]) + } +} + +func TestParseRetry(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "retry: 2500\ndata: x\n\n") + if len(events) != 1 || events[0].Retry != 2500*time.Millisecond { + t.Fatalf("events = %+v, want Retry=2.5s", events) + } + + events = collectEvents(t, "retry: soon\ndata: x\n\n") + if len(events) != 1 || events[0].Retry != 0 { + t.Fatalf("events = %+v, want Retry=0 for non-numeric", events) + } +} + +func TestParseCommentsAndBlankProduceNoEvents(t *testing.T) { + t.Parallel() + + if events := collectEvents(t, ": keep-alive\n\n"); len(events) != 0 { + t.Fatalf("comment+blank produced %d events, want 0", len(events)) + } + if events := collectEvents(t, "\n\n\n"); len(events) != 0 { + t.Fatalf("blank lines produced %d events, want 0", len(events)) + } +} + +func TestParseCRLF(t *testing.T) { + t.Parallel() + + events := collectEvents(t, "data: hello\r\n\r\n") + if len(events) != 1 || events[0].Data != "hello" { + t.Fatalf("CRLF events = %+v, want Data=hello", events) + } +} + +func TestParseLeadingSpaceStripping(t *testing.T) { + t.Parallel() + + tests := map[string]string{ + "data: x\n\n": "x", + "data:x\n\n": "x", + "data: x\n\n": " x", + } + for input, want := range tests { + events := collectEvents(t, input) + if len(events) != 1 || events[0].Data != want { + t.Fatalf("input %q -> %+v, want Data=%q", input, events, want) + } + } +} + +func TestParseDiscardsPartialEventAtEOF(t *testing.T) { + t.Parallel() + + if events := collectEvents(t, "data: incomplete\n"); len(events) != 0 { + t.Fatalf("got %d events, want 0 (partial discarded at EOF)", len(events)) + } +} + +type errReader struct { + data []byte + err error + done bool +} + +func (r *errReader) Read(p []byte) (int, error) { + if r.done { + return 0, r.err + } + r.done = true + return copy(p, r.data), nil +} + +func TestParseSurfacesReadError(t *testing.T) { + t.Parallel() + + boom := errors.New("boom") + r := &errReader{data: []byte("data: x\n\ndata: y"), err: boom} + + var events []sse.Event + var gotErr error + for ev, err := range sse.Parse(r) { + if err != nil { + gotErr = err + break + } + events = append(events, ev) + } + if len(events) != 1 || events[0].Data != "x" { + t.Fatalf("events = %+v, want one event with Data=x before the error", events) + } + if !errors.Is(gotErr, boom) { + t.Fatalf("err = %v, want boom", gotErr) + } +} + +func TestParseOverLongLine(t *testing.T) { + t.Parallel() + + long := "data: " + strings.Repeat("a", 2<<20) + var gotErr error + for _, err := range sse.Parse(strings.NewReader(long)) { + if err != nil { + gotErr = err + break + } + } + if !errors.Is(gotErr, bufio.ErrTooLong) { + t.Fatalf("err = %v, want bufio.ErrTooLong", gotErr) + } +} + +func TestParseEarlyBreak(t *testing.T) { + t.Parallel() + + count := 0 + for range sse.Parse(strings.NewReader("data: a\n\ndata: b\n\n")) { + count++ + break + } + if count != 1 { + t.Fatalf("consumed %d events, want 1 after break", count) + } +} + +func TestParseDataFieldNoColon(t *testing.T) { + t.Parallel() + + // A bare "data" line (no colon) is the field name with an empty value. + events := collectEvents(t, "data\n\n") + if len(events) != 1 || events[0].Data != "" { + t.Fatalf("events = %+v, want one event with empty Data", events) + } +} + +func TestParseIDWithNULIgnored(t *testing.T) { + t.Parallel() + + // The second id field contains a NUL and must be ignored, so the second + // event keeps the first id. + events := collectEvents(t, "id: abc\ndata: first\n\nid: x\x00y\ndata: second\n\n") + if len(events) != 2 { + t.Fatalf("got %d events, want 2", len(events)) + } + if events[0].ID != "abc" || events[1].ID != "abc" { + t.Fatalf("IDs = %q, %q, want both abc (NUL id ignored)", events[0].ID, events[1].ID) + } +} + +func TestParseRetryNotStickyAcrossEvents(t *testing.T) { + t.Parallel() + + // retry applies to the event it appears on; the next event without a retry + // field has Retry == 0. + events := collectEvents(t, "retry: 1000\ndata: a\n\ndata: b\n\n") + if len(events) != 2 { + t.Fatalf("got %d events, want 2", len(events)) + } + if events[0].Retry != 1000*time.Millisecond { + t.Fatalf("event[0].Retry = %v, want 1s", events[0].Retry) + } + if events[1].Retry != 0 { + t.Fatalf("event[1].Retry = %v, want 0 (retry not sticky)", events[1].Retry) + } +}