Skip to content

fgrzl/es

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

51 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CI Dependabot Updates

es

A focused event sourcing library for Go: aggregates, domain events, a Store contract, repository load/save, optional audit batch streams, and OpenTelemetry hooks.

Documentation

Resource Description
docs/README.md Overview, mental model, principles, doc index
docs/getting-started.md Walkthrough: events, aggregate, repository
docs/api-reference.md Interfaces, factories, types, patterns
docs/audit_events.md Audit vs Raise, persistence order, consumers

Features

  • Aggregate Roots: Event-sourced aggregates with automatic event registration and replay
  • Event Handling: Type-safe event handlers with generic registration
  • Event storage: Store interface in this module; production implementations live in your codebase; NewInMemoryEventStore for tests and local development
  • Repository Pattern: High-level aggregate persistence with optimistic concurrency control
  • Derived audit streams: Aggregate.Audit stages immutable DomainEvent rows on fresh batch streams derived from the current aggregate (not replayed on Load); Repository.Save persists audits before domain events
  • Multi-tenancy: Support for global and tenant-scoped aggregates
  • Context Propagation: Built-in correlation and causation tracking
  • OpenTelemetry Spans: Repository load and save operations emit OTEL spans with aggregate metadata

Installation

go get github.com/fgrzl/es

Quick Start

1. Define Your Aggregate

package animals

import (
	"context"
	"github.com/fgrzl/es"
	"github.com/google/uuid"
)

type Cat struct {
	es.Aggregate
	Name    string
	Breed   string
	Age     int
	adopted bool
}

func NewCat(id uuid.UUID) *Cat {
	cat := &Cat{Aggregate: es.NewAggregate(context.Background(), "cats", id)}
	es.RegisterHandler(cat, cat.OnCatRenamed)
	es.RegisterHandler(cat, cat.OnCatAdopted)
	return cat
}

func (c *Cat) Rename(name string) error {
	if c.Name != name {
		return c.Raise(&CatRenamed{Name: name})
	}
	return nil
}

func (c *Cat) Adopt() error {
	if c.adopted {
		return nil
	}
	return c.Raise(&CatAdopted{})
}

func (c *Cat) OnCatRenamed(e *CatRenamed) {
	c.Name = e.Name
}

func (c *Cat) OnCatAdopted(e *CatAdopted) {
	c.adopted = true
}

2. Define Your Events

package animals

import "github.com/fgrzl/es"

func init() {
	// Register events for polymorphic serialization
	es.Register(func() *CatRenamed { return &CatRenamed{} })
	es.Register(func() *CatAdopted { return &CatAdopted{} })
}

type CatRenamed struct {
	es.DomainEventBase
	Name string `json:"name"`
}

func (e *CatRenamed) GetDiscriminator() string { return "cat.renamed" }
func (e *CatRenamed) GetAreas() []string         { return []string{"cats"} }
func (e *CatRenamed) GetSpaces() []string        { return e.GetAreas() }

type CatAdopted struct {
	es.DomainEventBase
}

func (e *CatAdopted) GetDiscriminator() string { return "cat.adopted" }
func (e *CatAdopted) GetAreas() []string         { return []string{"cats"} }
func (e *CatAdopted) GetSpaces() []string        { return e.GetAreas() }

3. Use the Repository

package main

import (
	"context"
	"log"
	"github.com/fgrzl/es"
	"github.com/google/uuid"
)

func main() {
	// Create event store and repository
	store := es.NewInMemoryEventStore()
	repo := es.NewRepository(store)
	
	// Create a new cat
	catID := uuid.New()
	cat := NewCat(catID)
	
	// Perform business operations
	if err := cat.Rename("Whiskers"); err != nil {
		log.Fatal(err)
	}
	
	if err := cat.Adopt(); err != nil {
		log.Fatal(err)
	}
	
	// Save the aggregate
	if err := repo.Save(context.Background(), cat); err != nil {
		log.Fatal(err)
	}
	
	// Later, load the aggregate
	loadedCat := NewCat(catID)
	if err := repo.Load(context.Background(), loadedCat); err != nil {
		log.Fatal(err)
	}
	
	log.Printf("Cat name: %s, adopted: %v", loadedCat.Name, loadedCat.adopted)
}

Core Concepts

Aggregates

Aggregates are the primary building blocks that represent business entities. They:

  • Maintain state through event sourcing
  • Enforce business invariants
  • Generate domain events when state changes
  • Provide methods for business operations

Domain Events

Events represent facts about what happened in your domain:

  • Immutable records of state changes
  • Include metadata (correlation ID, causation ID, timestamp, sequence)
  • Support polymorphic serialization for storage

Event Handlers

Type-safe event handlers that apply events to aggregate state:

  • Registered using generics for compile-time type safety
  • Automatically called when events are raised or loaded
  • Keep aggregates in sync with their event stream

Fail-Fast Wiring

Aggregate wiring is intentionally fail-fast in this library. The default aggregate implementation panics immediately when aggregate definitions are invalid, including:

  • missing aggregate IDs or tenant IDs
  • empty aggregate areas
  • duplicate handler registration
  • event types whose effective area list does not include the aggregate area

These are treated as programmer errors in aggregate design, not recoverable runtime conditions. Business-rule failures should still be returned from your command methods as ordinary error values.

Repository

High-level interface for aggregate persistence:

  • Handles event loading and saving
  • Provides optimistic concurrency control
  • Automatically commits events after successful save

Multi-tenancy

The library supports both global and tenant-scoped aggregates:

// Global aggregate
globalCat := es.NewAggregate(ctx, "cats", catID)

// Tenant-specific aggregate  
tenantCat := es.NewTenantAggregate(ctx, "cats", tenantID, catID)

Error Handling

The package exports standard sentinel errors for stores and aggregate workflows. The built-in in-memory store returns errors matching ErrConcurrency for optimistic concurrency conflicts. Repository.Load passes through store errors and does not synthesize ErrNotFound for empty streams.

Aggregate construction and handler wiring intentionally fail fast with panics on invalid design-time setup such as missing IDs, duplicate handlers, or invalid event-area mappings.

The default aggregate implementation does not treat invalid aggregate wiring as a recoverable error path. Use returned error values from your own command methods for business validation and state-transition failures.

(*Entity).TryGetNamespace remains available when you want a non-panicking namespace helper outside aggregate wiring.

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass: go test ./...
  5. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

Event sourcing library for Go with audit batch streams and OpenTelemetry hooks

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages