Skip to content

[new tool] @agent-tools/event-sourcing — Type-safe event sourcing with aggregate snapshots and pluggable stores #213

@burner-agent

Description

@burner-agent

Tool Name

@agent-tools/event-sourcing

Description

Type-safe event sourcing primitives: define aggregates with typed events, append events to streams, fold event history into current state, snapshot for fast rehydration, and project read models — with pluggable storage backends.

Why It's Useful for Agents

AI agents that manage stateful workflows (approval chains, multi-step pipelines, conversation state) benefit from event sourcing because every state transition is an immutable, auditable record. Instead of overwriting state, agents append events — enabling full replay, undo, branching, and debugging of agent decision history. Built on research into Castore (272 stars, MIT, TypeScript-native, modular adapters for DynamoDB/SQS/EventBridge) and KurrentDB Node client (175 stars, Apache-2.0, gRPC-based event-native database client). Castore's stack-agnostic, validation-library-flexible approach is the primary inspiration.

Proposed API

import { defineAggregate, createEventStore, InMemoryAdapter } from '@agent-tools/event-sourcing';

// Define typed events
const orderEvents = {
  OrderCreated: z.object({ orderId: z.string(), items: z.array(z.string()) }),
  OrderShipped: z.object({ trackingNumber: z.string() }),
  OrderCancelled: z.object({ reason: z.string() }),
} as const;

// Define aggregate with reducer
const orderAggregate = defineAggregate({
  name: 'Order',
  events: orderEvents,
  initialState: { status: 'pending' as const, items: [] as string[] },
  reducer: (state, event) => {
    switch (event.type) {
      case 'OrderCreated': return { ...state, status: 'created', items: event.data.items };
      case 'OrderShipped': return { ...state, status: 'shipped' };
      case 'OrderCancelled': return { ...state, status: 'cancelled' };
    }
  },
});

// Create store with pluggable backend
const store = createEventStore({
  aggregate: orderAggregate,
  adapter: new InMemoryAdapter(), // or DynamoDBAdapter, PostgresAdapter, FileAdapter
});

// Append events
await store.append('order-123', { type: 'OrderCreated', data: { orderId: 'order-123', items: ['widget'] } });
await store.append('order-123', { type: 'OrderShipped', data: { trackingNumber: 'TRACK-456' } });

// Get current state (folds all events)
const state = await store.getState('order-123');
// => { status: 'shipped', items: ['widget'] }

// Get full event history
const events = await store.getEvents('order-123');

// Snapshot for fast rehydration
await store.snapshot('order-123');

// Project read models
const projection = store.project({
  name: 'shipped-orders',
  handler: (event, emit) => {
    if (event.type === 'OrderShipped') emit({ orderId: event.aggregateId, trackingNumber: event.data.trackingNumber });
  },
});

Scope

In scope:

  • Aggregate definition with typed events and reducers
  • Event appending with optimistic concurrency (expected version)
  • State rehydration by folding event streams
  • Snapshotting for performance
  • Read model projections
  • Pluggable storage adapters (in-memory, filesystem, PostgreSQL, DynamoDB)
  • Event metadata (timestamp, causation ID, correlation ID)
  • Zod / ArkType / custom validation support

Out of scope:

  • Full CQRS framework with command bus
  • Distributed event bus (use @agent-tools/nats or @agent-tools/amqp)
  • Saga / process manager orchestration
  • UI components

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is neededinfrastructureCI, workflows, build toolingnew-toolProposal for a new tool packagetier:autonomyTier 3 — self-extension, shell, orchestration

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions