Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { CodexAdapter } from "../src/provider/Services/CodexAdapter.ts";
import { ProviderService } from "../src/provider/Services/ProviderService.ts";
import { AnalyticsService } from "../src/telemetry/Services/AnalyticsService.ts";
import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointReactor.ts";
import { ModelChangeReactorLive } from "../src/orchestration/Layers/ModelChangeReactor.ts";
import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts";
import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts";
import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts";
Expand Down Expand Up @@ -311,10 +312,14 @@ export const makeOrchestrationIntegrationHarness = (
const checkpointReactorLayer = CheckpointReactorLive.pipe(
Layer.provideMerge(runtimeServicesLayer),
);
const modelChangeReactorLayer = ModelChangeReactorLive.pipe(
Layer.provideMerge(runtimeServicesLayer),
);
const orchestrationReactorLayer = OrchestrationReactorLive.pipe(
Layer.provideMerge(runtimeIngestionLayer),
Layer.provideMerge(providerCommandReactorLayer),
Layer.provideMerge(checkpointReactorLayer),
Layer.provideMerge(modelChangeReactorLayer),
);
const layer = orchestrationReactorLayer.pipe(
Layer.provide(persistenceLayer),
Expand Down
192 changes: 192 additions & 0 deletions apps/server/src/orchestration/Layers/ModelChangeReactor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import type { OrchestrationThreadActivity } from "@t3tools/contracts";
import {
CommandId,
DEFAULT_PROVIDER_INTERACTION_MODE,
ProjectId,
ThreadId,
} from "@t3tools/contracts";
import { Effect, Exit, Layer, ManagedRuntime, Scope } from "effect";
import { afterEach, describe, expect, it } from "vitest";

import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts";
import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts";
import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
import { ModelChangeReactorLive } from "./ModelChangeReactor.ts";
import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts";
import {
OrchestrationEngineService,
type OrchestrationEngineShape,
} from "../Services/OrchestrationEngine.ts";
import { ModelChangeReactor } from "../Services/ModelChangeReactor.ts";
import { ServerConfig } from "../../config.ts";
import * as NodeServices from "@effect/platform-node/NodeServices";

const asProjectId = (value: string): ProjectId => ProjectId.makeUnsafe(value);
const asThreadId = (value: string): ThreadId => ThreadId.makeUnsafe(value);

async function waitForActivity(
engine: OrchestrationEngineShape,
predicate: (activity: OrchestrationThreadActivity) => boolean,
timeoutMs = 2000,
): Promise<OrchestrationThreadActivity> {
const deadline = Date.now() + timeoutMs;
const poll = async (): Promise<OrchestrationThreadActivity> => {
const readModel = await Effect.runPromise(engine.getReadModel());
const activities =
readModel.threads.find((thread) => thread.id === "thread-1")?.activities ?? [];
const match = activities.find(predicate);
if (match) {
return match;
}
if (Date.now() >= deadline) {
throw new Error("Timed out waiting for model change activity");
}
await new Promise((resolve) => setTimeout(resolve, 10));
return poll();
};
return poll();
}

describe("ModelChangeReactor", () => {
let runtime: ManagedRuntime.ManagedRuntime<
OrchestrationEngineService | ModelChangeReactor,
unknown
> | null = null;
let scope: Scope.Closeable | null = null;

afterEach(async () => {
if (scope) {
await Effect.runPromise(Scope.close(scope, Exit.void));
}
scope = null;
if (runtime) {
await runtime.dispose();
}
runtime = null;
});

async function createHarness() {
const orchestrationLayer = OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionPipelineLive),
Layer.provide(OrchestrationEventStoreLive),
Layer.provide(OrchestrationCommandReceiptRepositoryLive),
Layer.provide(SqlitePersistenceMemory),
);
const layer = ModelChangeReactorLive.pipe(
Layer.provideMerge(orchestrationLayer),
Layer.provideMerge(ServerConfig.layerTest(process.cwd(), process.cwd())),
Layer.provideMerge(NodeServices.layer),
);
runtime = ManagedRuntime.make(layer);

const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
const reactor = await runtime.runPromise(Effect.service(ModelChangeReactor));
scope = await Effect.runPromise(Scope.make("sequential"));
await Effect.runPromise(reactor.start.pipe(Scope.provide(scope)));

const createdAt = new Date().toISOString();
await Effect.runPromise(
engine.dispatch({
type: "project.create",
commandId: CommandId.makeUnsafe("cmd-project-create"),
projectId: asProjectId("project-1"),
title: "Project",
workspaceRoot: "/tmp/project-1",
defaultModel: "gpt-5-codex",
createdAt,
}),
);
await Effect.runPromise(
engine.dispatch({
type: "thread.create",
commandId: CommandId.makeUnsafe("cmd-thread-create"),
threadId: asThreadId("thread-1"),
projectId: asProjectId("project-1"),
title: "Thread",
model: "gpt-5-codex",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "full-access",
branch: null,
worktreePath: null,
createdAt,
}),
);

return { engine, reactor };
}

it("appends a user-triggered model change notice activity", async () => {
const { engine } = await createHarness();

await Effect.runPromise(
engine.dispatch({
type: "thread.model.set",
commandId: CommandId.makeUnsafe("cmd-model-set-user"),
threadId: asThreadId("thread-1"),
model: "gpt-5.4",
source: "client",
}),
);

const activity = await waitForActivity(
engine,
(entry) => entry.kind === "thread.model.changed" && entry.summary === "Model changed",
);

expect(activity.payload).toMatchObject({
fromModel: "gpt-5-codex",
toModel: "gpt-5.4",
source: "user",
});
});

it("appends a provider-reroute notice with reason", async () => {
const { engine } = await createHarness();

await Effect.runPromise(
engine.dispatch({
type: "thread.model.set",
commandId: CommandId.makeUnsafe("cmd-model-set-reroute"),
threadId: asThreadId("thread-1"),
model: "gpt-5.4-mini",
source: "provider-reroute",
reason: "capacity",
}),
);

const activity = await waitForActivity(
engine,
(entry) =>
entry.kind === "thread.model.changed" &&
typeof (entry.payload as { reason?: unknown }).reason === "string",
);

expect(activity.payload).toMatchObject({
fromModel: "gpt-5-codex",
toModel: "gpt-5.4-mini",
source: "provider-reroute",
reason: "capacity",
});
});

it("does not append a notice when the model does not change", async () => {
const { engine, reactor } = await createHarness();

await Effect.runPromise(
engine.dispatch({
type: "thread.model.set",
commandId: CommandId.makeUnsafe("cmd-model-set-noop"),
threadId: asThreadId("thread-1"),
model: "gpt-5-codex",
source: "client",
}),
);
await Effect.runPromise(reactor.drain);

const readModel = await Effect.runPromise(engine.getReadModel());
expect(readModel.threads.find((thread) => thread.id === "thread-1")?.activities ?? []).toEqual(
[],
);
});
});
68 changes: 68 additions & 0 deletions apps/server/src/orchestration/Layers/ModelChangeReactor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import {
CommandId,
EventId,
type OrchestrationEvent,
type ThreadModelChangedActivityPayload,
} from "@t3tools/contracts";
import { Effect, Layer, Stream } from "effect";
import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker";

import {
ModelChangeReactor,
type ModelChangeReactorShape,
} from "../Services/ModelChangeReactor.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";

type ModelSetDomainEvent = Extract<OrchestrationEvent, { type: "thread.model-set" }>;

const serverCommandId = (tag: string): CommandId =>
CommandId.makeUnsafe(`server:${tag}:${crypto.randomUUID()}`);

const make = Effect.gen(function* () {
const orchestrationEngine = yield* OrchestrationEngineService;

const appendModelChangeActivity = (event: ModelSetDomainEvent) => {
const payload: ThreadModelChangedActivityPayload = {
fromModel: event.payload.previousModel,
toModel: event.payload.model,
source: event.payload.source === "client" ? "user" : "provider-reroute",
...(event.payload.reason !== undefined ? { reason: event.payload.reason } : {}),
};

return orchestrationEngine.dispatch({
type: "thread.activity.append",
commandId: serverCommandId("thread-model-change-notice"),
threadId: event.payload.threadId,
activity: {
id: EventId.makeUnsafe(crypto.randomUUID()),
tone: "info",
kind: "thread.model.changed",
summary: "Model changed",
payload,
turnId: null,
createdAt: event.payload.updatedAt,
},
createdAt: event.payload.updatedAt,
});
};

const worker = yield* makeDrainableWorker((event: ModelSetDomainEvent) =>
appendModelChangeActivity(event),
);

const start: ModelChangeReactorShape["start"] = Effect.forkScoped(
Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => {
if (event.type !== "thread.model-set") {
return Effect.void;
}
return worker.enqueue(event);
}),
).pipe(Effect.asVoid);

return {
start,
drain: worker.drain,
} satisfies ModelChangeReactorShape;
});

export const ModelChangeReactorLive = Layer.effect(ModelChangeReactor, make);
64 changes: 64 additions & 0 deletions apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,70 @@ describe("OrchestrationEngine", () => {
await system.dispose();
});

it("emits thread.model-set when a thread model changes", async () => {
const system = await createOrchestrationSystem();
const { engine } = system;
const createdAt = now();

await system.run(
engine.dispatch({
type: "project.create",
commandId: CommandId.makeUnsafe("cmd-project-stream-model-create"),
projectId: asProjectId("project-stream-model"),
title: "Stream Model Project",
workspaceRoot: "/tmp/project-stream-model",
defaultModel: "gpt-5-codex",
createdAt,
}),
);

await system.run(
engine.dispatch({
type: "thread.create",
commandId: CommandId.makeUnsafe("cmd-stream-thread-model-create"),
threadId: ThreadId.makeUnsafe("thread-stream-model"),
projectId: asProjectId("project-stream-model"),
title: "domain-stream-model",
model: "gpt-5-codex",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt,
}),
);

const event = await system.run(
Effect.gen(function* () {
const eventQueue = yield* Queue.unbounded<OrchestrationEvent>();
yield* Effect.forkScoped(
Stream.take(engine.streamDomainEvents, 1).pipe(
Stream.runForEach((nextEvent) =>
Queue.offer(eventQueue, nextEvent).pipe(Effect.asVoid),
),
),
);
yield* Effect.sleep("10 millis");
yield* engine.dispatch({
type: "thread.model.set",
commandId: CommandId.makeUnsafe("cmd-stream-thread-model-set"),
threadId: ThreadId.makeUnsafe("thread-stream-model"),
model: "gpt-5.4",
source: "client",
});
return yield* Queue.take(eventQueue);
}).pipe(Effect.scoped),
);

expect(event.type).toBe("thread.model-set");
if (event.type === "thread.model-set") {
expect(event.payload.previousModel).toBe("gpt-5-codex");
expect(event.payload.model).toBe("gpt-5.4");
}

await system.dispose();
});

it("stores completed checkpoint summaries even when no files changed", async () => {
const system = await createOrchestrationSystem();
const { engine } = system;
Expand Down
22 changes: 19 additions & 3 deletions apps/server/src/orchestration/Layers/OrchestrationEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ const makeOrchestrationEngine = Effect.gen(function* () {
readModel,
});
const eventBases = Array.isArray(eventBase) ? eventBase : [eventBase];
const aggregateRef = commandToAggregateRef(envelope.command);
const committedCommand = yield* sql
.withTransaction(
Effect.gen(function* () {
Expand All @@ -122,10 +123,25 @@ const makeOrchestrationEngine = Effect.gen(function* () {

const lastSavedEvent = committedEvents.at(-1) ?? null;
if (lastSavedEvent === null) {
return yield* new OrchestrationCommandInvariantError({
commandType: envelope.command.type,
detail: "Command produced no events.",
// No-op: the decider intentionally produced zero events
// (e.g. idempotent model set where the model is already current).
// Record a "skipped" receipt so the command is not retried, and
// return the read-model unchanged.
yield* commandReceiptRepository.upsert({
commandId: envelope.command.commandId,
aggregateKind: aggregateRef.aggregateKind,
aggregateId: aggregateRef.aggregateId,
acceptedAt: new Date().toISOString(),
resultSequence: 0,
status: "accepted",
error: null,
});

return {
committedEvents: [],
lastSequence: 0,
nextReadModel,
} as const;
}

yield* commandReceiptRepository.upsert({
Expand Down
Loading