Thank you for your interest in contributing to Streamkit! We appreciate contributions of all kinds—bug reports, feature requests, documentation, and code.
Be respectful, inclusive, and professional. We aim to maintain a welcoming community for everyone.
- Go 1.25.6 or later — Install Go
- Git — Install Git
- Docker (optional) — For Azure Storage emulation during testing
-
Clone the repository:
git clone https://github.com/fgrzl/streamkit.git cd streamkit -
Install dependencies:
go mod download
-
Optional: Start Azure Storage emulator (for integration testing):
docker-compose up -d
-
Verify your setup:
go test -race ./...
develop— Main development branch. All PRs target this branch.- Feature branches — Create from
developwith descriptive names:git checkout -b feature/add-retry-logic git checkout -b fix/race-condition-in-muxer
- Write clear, concise commit messages in the present tense:
Add context-aware retry sleep in http_client Fix data race in pseudoRandState global variable - Keep commits focused on a single logical change
- Reference issues when applicable:
Fixes #123
-
Before opening a PR:
- Run tests:
go test -race ./... - Format code:
go fmt ./... - Verify no new lint issues
- Run tests:
-
PR description should include:
- What problem does this solve?
- How does it solve the problem?
- Any breaking changes or new dependencies?
- Link to related issues
-
PR checklist:
- Tests added/modified to cover changes
- All tests pass with
-raceflag - Code follows project conventions
- Documentation updated (if applicable)
- No new public API without docs
- Follow the Effective Go guidelines
- Use
gofmtfor formatting (enforced by CI) - Keep functions focused and single-purpose
- Use clear, descriptive names
- Packages: Lowercase, concise (
api,client,storage,transport) - Interfaces: End with
-ersuffix (Reader,Consumable,Routeable) - Constants: All caps for unexported (
const maxRetries,const maxActiveHandlers) - Types: Public types start with capital letter
Use log/slog and be strategic about levels so production logs stay actionable:
- Info — Lifecycle and important operational events only (e.g. factory initialized, first connection, reconnect successful, connection closed, enqueuing reconnection). Avoid per-request or per-message Info.
- Warn — Recoverable or degraded conditions (e.g. handler timeout, reconnect queue full, backoff).
- Error — Failures that need attention (e.g. stream creation failed, handler panic, table creation failed).
- Debug — Reserved for troubleshooting; keep minimal so default (Info) stays quiet.
Prefer structured fields that make correlation and triage fast. Use the identifiers that matter for the operation instead of free-form text parsing:
store_id,space,segment,channel_idfor stream and transport scoperequest_id,subscription_id,transaction_idfor end-to-end correlationattempt,max_attempts,last_sequence,record_count,entry_countfor progress and retry stateerror_typeplus the originalerrorvalue for grouping common failure modes
Do not add Info logs on hot paths (peek, publish, produce, per-subscription connect, per-offset retry). Prefer metrics or tracing for high-cardinality observability. Routine reconnect success, benign disconnects, and ordinary close events belong in Debug unless they indicate a degraded state that needs operator action.
Streamkit has a clear layered architecture:
┌─ api: Domain models and interfaces (Entry, Record, Space, Segment)
├─ client: Public SDK for consuming/producing (client.Client interface)
├─ server: Server-side routing and session management
├─ storage: Persistence layer with multiple backends
│ ├─ pebblekit: Local embedded storage (PebbleDB)
│ └─ azurekit: Cloud storage (Azure Table Storage)
├─ transport: Multiple protocol implementations
│ ├─ inprockit: In-process (direct method calls)
│ ├─ wskit: WebSocket with multiplexing
│ ├─ grpckit: gRPC (future)
│ └─ http2kit: HTTP/2 (future)
└─ internal: Private utilities not part of public API
├─ codec: Binary serialization + compression
├─ cache: Expiring cache for segments
└─ txn: Write-Ahead Log transaction support
When adding features, ensure they follow this separation of concerns:
- Core logic in
pkg/api/— Define interfaces first - Implementation in backends — Implement storage interface, add transport
- Expose via
pkg/client/— Add public API methods
Streamkit uses behavioral tests with clear Arrange-Act-Assert structure. See docs/test_guidelines.md for detailed guidelines.
Use behavioral style names that describe the assertion:
func TestShouldReturnErrorWhenUserIsInvalid(t *testing.T)
func TestShouldStoreResultGivenValidInput(t *testing.T)
func TestHandlerPanicResilience(t *testing.T) // Behavior-focusedfunc TestShouldConsumeSegmentWithOffset(t *testing.T) {
// Arrange — Set up test fixtures and dependencies
require.NoError(t, err) // Use require to fail fast during setup
store := setupTestStore(t)
defer store.Close()
// Act — Perform the action being tested
result, err := store.ConsumeSegment(ctx, "space", "segment")
// Assert — Verify the outcome
assert.NoError(t, err) // Use assert for actual test assertions
assert.Equal(t, expected, result)
}# All tests with race detector
go test -race ./...
# Specific package with verbose output
go test -race -v ./pkg/client/...
# Single test
go test -race -run TestShouldConsumeSegment ./pkg/client/...
# With coverage
go test -race -cover ./...
# With timeout (default 10m)
go test -race -timeout=2m ./...
# Local-only Azure-backed WebSocket chaos soak (skipped in CI unless explicitly enabled)
STREAMKIT_SOAK=1 STREAMKIT_SOAK_DURATION=90s go test -run TestLocalWebSocketChaosSoak -count=1 ./pkg/transport/wskitThe chaos soak uses the local Azure Table emulator by default (http://127.0.0.1:10002/devstoreaccount1), so start docker compose up -d first if it is not already running. The soak is opt-in on purpose and remains skipped during normal go test ./... and CI runs unless STREAMKIT_SOAK is set explicitly.
- ✅ All new code must have tests
- ✅ Tests must pass with
-raceflag (no data races) - ✅ Each test should test one behavior only
- ✅ Use table-driven tests for input variations
- ✅ Mock external dependencies (WebSocket, Azure, etc.)
-
Exported functions/types: Add a comment starting with the declaration name
// Consume reads entries from the stream until the context is canceled. func (s *Stream) Consume(ctx context.Context) error { ... }
-
Non-obvious logic: Explain the "why", not the "what"
// Issue #3: Use per-segment locking to prevent concurrent writes // from overwriting each other in the same segment. mu := s.getLock(segmentKey)
When fixing a known issue, reference it in code comments:
// Issue #15: Set sentinel header so token failures are visible
req.Header.Set("X-Auth-Failure", "token_acquisition_failed")Document significant changes in CHANGELOG.md under [Unreleased]:
### Fixed
- Client: Fixed data race in pseudoRandState (#123)
### Added
- Storage: New caching layer for frequently accessed segments
### Changed
- Transport: Use context for WebSocket cancellationgo build ./...Streamkit is a production library (no standalone binary). Import packages from the module:
import "github.com/fgrzl/streamkit/pkg/client"See docs/production.md for deployment patterns.
Include:
- Go version:
go version - Streamkit version: Git commit hash or local
go.mod - Steps to reproduce
- Expected vs. actual behavior
- Relevant logs or stack traces
Example:
**Description:** WebSocket connection fails on startup with nil pointer panic
**Steps:**
1. Create client with wskit provider
2. Call client.Consume()
3. Observe panic in server.go connect()
**Expected:** Graceful 401 Unauthorized response
**Actual:** Panic in session.CanAccessStore()
**Logs:**
panic: runtime error: invalid memory address or nil pointer dereference [recovered] at main.(*webSocketServer).connect (server.go:35)
### Feature Requests
Include:
- Use case — what would this enable?
- Proposed API design (if applicable)
- Examples of how users would use this feature
## Concurrency and Safety
Streamkit is designed for high-throughput concurrent access. When contributing:
- ✅ Always run tests with `-race` flag
- ✅ Use `sync.Mutex`, `sync.Map`, or `atomic.Int*` for shared state
- ✅ Avoid global mutable variables
- ✅ Document which fields require synchronization
- ✅ Prefer channels for goroutine coordination
Example of proper synchronization:
```go
// Good: atomic access to shared counter
var activeHandlers atomic.Int32
// Good: synchronized access to lock map
var segLocks sync.Map
// ❌ Bad: unsynchronized global variable
var globalState = 0 // Data race!
For performance-critical paths (codec, muxer, cache), include benchmarks:
# Run benchmarks
go test -bench=. ./internal/codec/...
# Compare against baseline
go test -bench=. -benchmem ./internal/codec/...Benchmarks should be in *_bench_test.go files.
Streamkit runs in production. When changing client or protocol behavior, preserve documented contracts in docs/limitations.md:
- Lock map — Per-segment produce locks persist; document impact for high-churn segment keys.
- At-least-once delivery — Consumers must dedupe on sequence; do not imply exactly-once without app logic.
- Subscriptions — Reconnect delivers snapshot then live updates; no durable missed-update replay.
- Consume on disconnect — Cursors are application-owned; do not break resume semantics without a major version.
See docs/production.md and CHANGELOG.md.
Active contributors with a track record of quality contributions may be invited to become maintainers. This includes:
- Code review authority
- Merge privileges
- Issue triage
- Release management
- Check README.md and docs/README.md for documentation index
- Read docs/production.md for operational guidance
- Read docs/test_guidelines.md for testing details
- Review CHANGELOG.md for release notes
- Open a discussion or issue on GitHub
Thank you for helping make Streamkit better! 🚀