Skip to content

gtoxlili/streamhub

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

streamhub

Go Reference Go Report Card

Resumable LLM streaming for Go, backed by Redis.

streamhub is meant for the fairly common case where the code producing a stream and the code consuming it don't share the same lifetime — they might not even be on the same instance. Think LLM responses, SSE endpoints, or anything where you need the stream to survive reconnects.

It uses Redis Streams for chunk persistence (so subscribers can replay what they missed) and Redis Pub/Sub for cancel signals (so you can stop a generation from anywhere). Each producer gets a generation ID as a fencing token, and only one producer can own a session at a time.

Requirements

  • Go 1.26
  • Redis

Install

go get github.com/gtoxlili/streamhub

Usage

Create a Hub:

client, err := rueidis.NewClient(rueidis.ClientOption{
	InitAddress: []string{"127.0.0.1:6379"},
})
if err != nil {
	log.Fatal(err)
}
defer client.Close()

hub := streamhub.New(client)

Register a producer:

stream, created, err := hub.Register("chat:123", func() {
	// called when someone cancels this session
})
if err != nil {
	log.Fatal(err)
}
if !created {
	return // another instance already owns this session
}
defer stream.Close()

stream.SetMetadata(map[string]any{"model": "claude-sonnet-4-20250514"})

stream.Publish("hello")
stream.Publish(" world")

created is the important bit — if it's false, a producer is already running for this session.

Subscribe (from any instance):

stream := hub.Get("chat:123")
if stream == nil {
	return
}

chunks, unsubscribe := stream.Subscribe(streamhub.WithBuffer(128))
defer unsubscribe()

for chunk := range chunks {
	// replays existing chunks first, then streams live
	println(chunk)
}

Cancel:

if stream := hub.Get("chat:123"); stream != nil {
	// Pass context.Background() for fire-and-forget, or a timeout to wait
	// for the producer to finish persisting.
	stream.Cancel(context.Background())
}

API

streamhub.New(client)

Creates a Hub.

hub.Register(sessionID, cancelRuntime)

Tries to claim a session as producer. Returns (stream, created, err) — check created before writing.

hub.Get(sessionID)

Returns a handle for an existing stream, or nil.

hub.Active(sessionIDs)

Checks which sessions are still active.

hub.Remove(sessionID)

Deletes Redis keys and local state for a stream.

stream.Publish(chunk)

Publishes a chunk.

stream.Subscribe(opts...)

Subscribes. Replays existing chunks, then delivers new ones live. Options: WithBuffer(n) tunes the channel buffer; WithBatchReplay() concatenates replay into one string instead of sending chunks one-by-one.

stream.SetMetadata(v) / stream.Metadata(&target)

Stores / loads per-stream JSON metadata.

stream.Cancel(ctx)

Sends a cancel signal via Pub/Sub and waits for the producer to finish (or ctx to expire). Use context.Background() for fire-and-forget.

stream.Close()

Marks the stream as done.

stream.Done()

Reports whether the stream has finished.

Typical flow

  1. Register when a request comes in
  2. Only start the job if created == true
  3. Publish chunks as they're generated
  4. Consumers call Get + Subscribe from any instance
  5. Close when done, Cancel if the user aborts

Notes

  • Don't start a second producer when created == false
  • Call SetMetadata before Close
  • Always call unsubscribe
  • Ensure Close runs on every producer exit path, including panics. If your producer goroutine panics or its input channel never closes, Close never runs, heartbeat keeps refreshing TTL, and the session looks "streaming forever" to every client. The safe pattern:
    go func() {
        defer func() {
            if r := recover(); r != nil { /* log */ }
            live.Close()
            cancelRuntime()
        }()
        for chunk := range runtime.Chunks {
            live.Publish(chunk)
        }
        live.SetMetadata(usage)
    }()

See also

License

GPL-3.0. See LICENSE.

About

Resumable LLM streaming for Go, backed by Redis.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages