Skip to content

RFC: Break EventService/StateService circular dependency #385

@sampaiodiego

Description

@sampaiodiego

Problem

EventService and StateService in packages/federation-sdk/src/services/ have a circular dependency resolved via @inject(delay(() => require(...))) — a DI hack that hides a design problem.

The coupling:

  • EventService → StateService for: getRoomVersion(), getEvent(), getStateBeforeEvent(), _getStore() (state queries needed during event retrieval and federation API responses)
  • StateService → EventService for: notify() — the only reverse call, used in processInitialState() to emit events after persisting initial room state during a join

Why this is a problem:

  • The delay(() => require(...)) pattern defeats static analysis and makes the dependency graph opaque
  • StagingAreaService (the primary orchestrator) must inject both services and manually sequence stateService.handlePdu()eventService.notify()eventService.markEventAsUnstaged() — 3 calls across 2 services for every incoming event
  • _getStore(roomVersion) leaks an internal caching adapter through the public interface; callers like SendJoinService must understand state graph internals to walk auth chains
  • Testing either service requires mocking the other, and the circular delay() makes mock injection fragile
  • Both services share direct access to EventRepository, creating implicit coupling through shared mutable state

Affected files:

  • packages/federation-sdk/src/services/event.service.ts (~1000 lines)
  • packages/federation-sdk/src/services/state.service.ts (~1023 lines)
  • packages/federation-sdk/src/services/staging-area.service.ts (8 constructor dependencies)
  • All services that inject both: RoomService, SendJoinService, InviteService, EventAuthorizationService

Proposed Interface

Replace the two circularly-dependent services with three services that form a clean DAG:

1. EventPipelineService — orchestrates the event lifecycle

interface EventPipelineResult {
  eventId: EventID;
  rejected: boolean;
  rejectReason?: string;
}

@singleton()
class EventPipelineService {
  constructor(
    private readonly roomState: RoomStateService,
    private readonly notifier: EventNotifierPort,
    private readonly eventRepository: EventRepository,
    private readonly eventStagingRepository: EventStagingRepository,
    private readonly stateGraphRepository: StateGraphRepository,
    private readonly configService: ConfigService,
    private readonly serverService: ServerService,
    private readonly stagingAreaQueue: StagingAreaQueue,
  ) {}

  // ── Hot path (StagingAreaService calls this) ──
  async authorizeAndPersist(event: PersistentEventBase): Promise<EventPipelineResult>;

  // ── Transaction entry point (federation controller calls this) ──
  async processIncomingTransaction(params: { origin: string; pdus: Pdu[]; edus?: BaseEDU[] }): Promise<void>;
  async processIncomingPDUs(origin: string, pdus: Pdu[]): Promise<void>;

  // ── Join flow ──
  async processInitialState(pdus: Pdu[], authChain: Pdu[]): Promise<StateID>;

  // ── Staging area management ──
  async getLeastDepthEventForRoom(roomId: RoomID): Promise<EventStagingStore | null>;
  async markEventAsUnstaged(event: EventStagingStore): Promise<void>;
  async processOldStagedEvents(): Promise<void>;

  // ── Event queries (thin wrappers) ──
  async getEventById<T extends PduType>(eventId: EventID, type?: T): Promise<EventStore | null>;
  async checkIfEventsExists(eventIds: EventID[]): Promise<{ missing: EventID[]; found: EventID[] }>;
  async getLastEventForRoom(roomId: RoomID): Promise<EventStore | null>;
  async getEventsByIds(eventIds: EventID[]): Promise<Array<{ _id: EventID; event: Pdu }>>;
  async getCreateEventForRoom(roomId: RoomID): Promise<Pdu | null>;
  async findInviteEvent(roomId: string, userId: string): Promise<EventStore | null>;
  async getAuthEventIds(eventType: PduType, params: AuthEventParams): Promise<EventStore[]>;
  async checkUserPermission(powerLevelsEventId: EventID, userId: string, actionType: PduType): Promise<boolean>;
  async processRedaction(redactionEvent: RedactionEvent): Promise<void>;

  // ── Federation API responses (state + auth chain) ──
  async getState(roomId: RoomID, eventId: EventID): Promise<{ pdus: Record<string, unknown>[]; auth_chain: Record<string, unknown>[] }>;
  async getStateIds(roomId: RoomID, eventId: EventID): Promise<{ pdu_ids: string[]; auth_chain_ids: string[] }>;
  async getBackfillEvents(roomId: string, eventIds: EventID[], limit: number): Promise<{ origin: string; origin_server_ts: number; pdus: Pdu[] }>;
  async getMissingEvents(roomId: string, earliest: EventID[], latest: EventID[], limit?: number, minDepth?: number): Promise<{ events: Pdu[] }>;
}

2. RoomStateService — pure state queries + event building (no side effects)

@singleton()
class RoomStateService {
  constructor(
    private readonly eventRepository: EventRepository,
    private readonly stateGraphRepository: StateGraphRepository,
    private readonly configService: ConfigService,
  ) {}

  // ── State queries ──
  async getRoomVersion(roomId: RoomID): Promise<RoomVersion>;
  async getRoomInformation(roomId: RoomID): Promise<PduCreateEventContent>;
  async getLatestRoomState(roomId: RoomID): Promise<State>;
  async getStrippedRoomState(roomId: RoomID): Promise<StrippedEvent[]>;
  async getServerSetInRoom(roomId: RoomID, state?: State): Promise<Set<string>>;
  async isRoomStatePartial(roomId: RoomID): Promise<boolean>;
  async getEvent(eventId: EventID): Promise<PersistentEventBase | null>;
  async getStateBeforeEvent(event: PersistentEventBase): Promise<State>;
  async getStateAtEvent(event: PersistentEventBase): Promise<State>;
  async getAllPublicRoomIdsAndNames(): Promise<{ name: string; room_id: RoomID }[]>;
  async getAllRoomIds(): Promise<RoomID[]>;
  async getPartialEvents(roomId: RoomID): Promise<PersistentEventBase[]>;

  // ── Event building (outgoing events) ──
  async buildEvent<T extends PduType>(event: PduWithHashesAndSignaturesOptional<PduForType<T>>, roomVersion: RoomVersion): Promise<PersistentEventBase>;
  async signEvent<T extends PersistentEventBase>(event: T): Promise<T>;

  // ── PDU authorization + state graph mutation (called by EventPipelineService) ──
  async handlePdu(pdu: PersistentEventBase): Promise<void>;
  async saveRejectedEvent(event: PersistentEventBase, stateId: StateID): Promise<void>;
}

3. EventNotifierPort — interface for notification dispatch

interface EventNotifierPort {
  notify(event: { eventId: EventID; event: Pdu }): Promise<void>;
}

// Production adapter — wires to EventEmitterService
class EmitterNotifierAdapter implements EventNotifierPort {
  constructor(
    private readonly emitter: EventEmitterService,
    private readonly roomState: RoomStateService,  // for power-level diffs
  ) {}

  async notify(event: { eventId: EventID; event: Pdu }): Promise<void> {
    // Current EventService.notify() switch statement + power-level diff logic
  }
}

// Test adapter — collects notifications for assertions
class CollectingNotifierAdapter implements EventNotifierPort {
  public readonly notifications: Array<{ eventId: EventID; event: Pdu }> = [];

  async notify(event) {
    this.notifications.push(event);
  }
}

Usage: StagingAreaService hot path

// Before (3 calls, 2 services, 8 constructor deps):
await this.stateService.handlePdu(await toEventBase(event.event));
await this.eventService.notify({ eventId: event._id, event: event.event });
await this.eventService.markEventAsUnstaged(event);

// After (1+1 calls, 1 service, 4 constructor deps):
await this.eventPipeline.authorizeAndPersist(await toEventBase(event.event));
await this.eventPipeline.markEventAsUnstaged(event);

Dependency graph (no cycles)

EventPipelineService
  ├── RoomStateService (state queries, handlePdu, buildEvent)
  ├── EventNotifierPort (notification output)
  ├── EventRepository (event persistence)
  ├── EventStagingRepository (staging CRUD)
  ├── ConfigService (server name, EDU flags)
  ├── ServerService (signature verification)
  └── StagingAreaQueue (room processing queue)

RoomStateService
  ├── EventRepository (read + state graph writes)
  ├── StateGraphRepository (delta chain management)
  └── ConfigService (signing keys)

EmitterNotifierAdapter (production EventNotifierPort)
  ├── EventEmitterService (in-memory event bus)
  └── RoomStateService (power-level diffs)

No arrow points from RoomStateService back to EventPipelineService. The delay(() => require(...)) hacks are eliminated.

Dependency Strategy

  • In-process: State resolution algorithm (resolveStateV2Plus), event validation, auth chain walking — pure computation, merged directly
  • Local-substitutable: EventRepository, StateGraphRepository, EventStagingRepository — MongoDB collections. Production uses MongoDB adapters. Tests can mock at the repository level (existing pattern) or use future in-memory implementations
  • Ports & adapters: EventNotifierPort — the notification side-effect that caused the circular dependency. Production uses EmitterNotifierAdapter (wraps EventEmitterService). Tests use CollectingNotifierAdapter (records notifications for assertions). This is the single highest-value testability win

Testing Strategy

New boundary tests to write

  • EventPipelineService.authorizeAndPersist(): Given a PDU and pre-seeded room state, verify it is persisted with correct state ID and CollectingNotifierAdapter receives the expected notification. Test rejection (auth failure), soft-fail (current state mismatch), and partial state scenarios
  • EventPipelineService.processInitialState(): Given auth chain + state PDUs, verify events are persisted in topological order and notifications are emitted for each
  • EventPipelineService.processIncomingTransaction(): Verify PDUs are validated, staged, and rooms enqueued. Verify EDUs are dispatched through the notifier. Verify concurrency guard (one transaction per origin)
  • RoomStateService.handlePdu(): Test the 3-phase auth check (auth_events → state-before → current-state) with in-memory event/state-graph fixtures
  • EmitterNotifierAdapter.notify(): Test the event-type routing and power-level diff computation independently

Old tests to delete (once boundary tests exist)

  • event.service.spec.ts — currently tests notify() and transaction processing in isolation with heavy mocking of StateService
  • state.service.spec.ts — if one exists, tests that mock EventService via delay

Test environment needs

  • CollectingNotifierAdapter — new, ~20 lines, records notifications for assertion
  • Existing repository mocks remain usable during transition
  • No new infrastructure required (no database, no external services)

Implementation Recommendations

What the modules should own

  • EventPipelineService owns the event lifecycle: validate → stage → authorize → persist → notify. It is the single entry point for all incoming events (transactions, joins, missing event fetches). It owns the staging area, the concurrent-transaction guard, and EDU processing
  • RoomStateService owns room state: resolution algorithm, state graph delta management, state queries, event building/signing. It has no side effects — no event emission, no queue management
  • EventNotifierPort owns the mapping from Matrix event types to homeserver application events, including power-level diff computation

What they should hide

  • EventPipelineService hides: per-origin concurrency control, PDU/EDU size limits, schema validation per room version, signature verification, event deduplication, staging queue enqueue logic
  • RoomStateService hides: _getStore() caching adapter, StateGraphRepository entirely, branch detection (_isSameChain), state resolution for divergent branches, partial state tracking, _resolveStateAtEvent pipeline
  • EmitterNotifierAdapter hides: event-type → emitter topic routing, the power-level diff algorithm (old vs new m.room.power_levels), role derivation from numeric power levels

What they should expose

  • EventPipelineService: authorizeAndPersist() as the primary API, plus transaction/join entry points and event queries
  • RoomStateService: state queries (getLatestRoomState, getRoomVersion, getStateBeforeEvent, etc.) and event building (buildEvent, signEvent)
  • EventNotifierPort: notify(event) — a single method

Migration path (incremental, each step is a standalone commit)

  1. Extract EventNotifierPort interface + EmitterNotifierAdapter — move EventService.notify() (lines 803-999) into a new adapter class implementing the interface. EventService delegates to the adapter. No behavior change
  2. Remove StateService → EventService dependencyprocessInitialState() accepts an EventNotifierPort parameter (or moves to EventPipelineService). Delete the @inject(delay(() => require('./event.service'))) line from StateService
  3. Extract RoomStateService from StateService — rename the class, keep all methods. Move getState()/getStateIds()/getBackfillEvents() from EventService into EventPipelineService (they use internal state methods). Delete the @inject(delay(() => require('./state.service'))) line from EventService
  4. Introduce EventPipelineService.authorizeAndPersist() — wraps roomState.handlePdu() + notifier.notify(). Update StagingAreaService to use it
  5. Consolidate remaining EventService methods into EventPipelineService — transaction processing, staging management, event queries. Delete EventService

Each step can be landed as a separate PR with no behavior change visible to SDK consumers.


🤖 Generated with Claude Code

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions