From bbd1585954510386e700c655e3354dcd57fc95cb Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 23 Apr 2026 15:59:03 +0100 Subject: [PATCH 1/2] feat(data-pathways)!: id-keyed pump-state commands + sources metrics shape MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors data-pathways v3.0.0: - New commands: DataPathwayPumpStateFetchBySourceCommand + DataPathwayPumpStateSaveBySourceCommand hit /api/v1/pump-states/:pathwayId/sources/:sourceId (GET + PUT). Legacy flowType-based commands remain for the deprecated route. - New contract DataPathwayPumpStateBySourceSchema carries sourceId, flowType (nullable — may be unknown when the source has been removed from the pathway config), and the state union. - Throughput/metrics contract: endpoints[url].flowTypes → endpoints[url].sources (Record). Updated both the exported TThroughputSource shape used in DataPathwayMetricsSchema and the inline heartbeat input shape on DataPathwayAssignmentHeartbeatCommand. - Test coverage: new fetch + save cases for the sourceId-keyed commands. BREAKING CHANGE: the metrics throughput shape renames endpoints[url].flowTypes → endpoints[url].sources keyed by sourceId. Consumers reading the pathway metrics endpoint or sending heartbeats must adopt the new shape. Data-pathways v3.0.0 only accepts the new shape. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../data-pathways/assignment.heartbeat.ts | 4 +- src/commands/data-pathways/index.ts | 2 + .../pump-state.fetch-by-source.ts | 44 +++++++++++++++++++ .../pump-state.save-by-source.ts | 41 +++++++++++++++++ src/contracts/data-pathways.ts | 25 +++++++++-- test/tests/commands/data-pathways.test.ts | 39 ++++++++++++++++ 6 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 src/commands/data-pathways/pump-state.fetch-by-source.ts create mode 100644 src/commands/data-pathways/pump-state.save-by-source.ts diff --git a/src/commands/data-pathways/assignment.heartbeat.ts b/src/commands/data-pathways/assignment.heartbeat.ts index 15dc26c..3a74d7c 100644 --- a/src/commands/data-pathways/assignment.heartbeat.ts +++ b/src/commands/data-pathways/assignment.heartbeat.ts @@ -21,9 +21,11 @@ export interface DataPathwayAssignmentHeartbeatInput { totalFailed: number lastDeliveryAgeMs: number | null healthy: boolean - flowTypes: Record< + sources: Record< string, { + flowType: string + name?: string eventsPerSecond: number successRate: number avgDurationMs: number diff --git a/src/commands/data-pathways/index.ts b/src/commands/data-pathways/index.ts index f9d63ec..30db299 100644 --- a/src/commands/data-pathways/index.ts +++ b/src/commands/data-pathways/index.ts @@ -50,6 +50,8 @@ export * from "./quota.list.ts" // Pump State export * from "./pump-state.fetch.ts" export * from "./pump-state.save.ts" +export * from "./pump-state.fetch-by-source.ts" +export * from "./pump-state.save-by-source.ts" // Delivery Log export * from "./delivery-log.list.ts" diff --git a/src/commands/data-pathways/pump-state.fetch-by-source.ts b/src/commands/data-pathways/pump-state.fetch-by-source.ts new file mode 100644 index 0000000..4307f91 --- /dev/null +++ b/src/commands/data-pathways/pump-state.fetch-by-source.ts @@ -0,0 +1,44 @@ +import { Command } from "../../common/command.ts" +import { + type DataPathwayPumpStateBySource, + DataPathwayPumpStateBySourceSchema, +} from "../../contracts/data-pathways.ts" +import type { ClientError } from "../../exceptions/client-error.ts" +import { NotFoundException } from "../../exceptions/not-found.ts" +import { parseResponseHelper } from "../../utils/parse-response-helper.ts" + +export interface DataPathwayPumpStateFetchBySourceInput { + pathwayId: string + sourceId: string +} + +export class DataPathwayPumpStateFetchBySourceCommand + extends Command { + protected override allowedModes: ("apiKey" | "bearer")[] = ["apiKey"] + + protected override getMethod(): string { + return "GET" + } + + protected override getBaseUrl(): string { + return "https://data-pathways.api.flowcore.io" + } + + protected override getPath(): string { + return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}` + } + + protected override parseResponse(rawResponse: unknown): DataPathwayPumpStateBySource { + return parseResponseHelper(DataPathwayPumpStateBySourceSchema, rawResponse) + } + + protected override handleClientError(error: ClientError): void { + if (error.status === 404) { + throw new NotFoundException("DataPathwayPumpState", { + pathwayId: this.input.pathwayId, + sourceId: this.input.sourceId, + }) + } + throw error + } +} diff --git a/src/commands/data-pathways/pump-state.save-by-source.ts b/src/commands/data-pathways/pump-state.save-by-source.ts new file mode 100644 index 0000000..7c08eb8 --- /dev/null +++ b/src/commands/data-pathways/pump-state.save-by-source.ts @@ -0,0 +1,41 @@ +import { Command } from "../../common/command.ts" +import { + type DataPathwayPumpStateSaveResponse, + DataPathwayPumpStateSaveResponseSchema, +} from "../../contracts/data-pathways.ts" +import { parseResponseHelper } from "../../utils/parse-response-helper.ts" + +export interface DataPathwayPumpStateSaveBySourceInput { + pathwayId: string + sourceId: string + state: { + timeBucket: string + eventId?: string + } +} + +export class DataPathwayPumpStateSaveBySourceCommand + extends Command { + protected override retryOnFailure: boolean = false + protected override allowedModes: ("apiKey" | "bearer")[] = ["apiKey"] + + protected override getMethod(): string { + return "PUT" + } + + protected override getBaseUrl(): string { + return "https://data-pathways.api.flowcore.io" + } + + protected override getPath(): string { + return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}` + } + + protected override getBody(): Record { + return { state: this.input.state } + } + + protected override parseResponse(rawResponse: unknown): DataPathwayPumpStateSaveResponse { + return parseResponseHelper(DataPathwayPumpStateSaveResponseSchema, rawResponse) + } +} diff --git a/src/contracts/data-pathways.ts b/src/contracts/data-pathways.ts index e40c895..36d667d 100644 --- a/src/contracts/data-pathways.ts +++ b/src/contracts/data-pathways.ts @@ -513,6 +513,19 @@ export const DataPathwayPumpStateSchema: TObject<{ }) export type DataPathwayPumpState = Static +export const DataPathwayPumpStateBySourceSchema: TObject<{ + pathwayId: TString + sourceId: TString + flowType: TUnion<[TString, TNull]> + state: TUnion<[TPumpStateValue, TNull]> +}> = Type.Object({ + pathwayId: Type.String(), + sourceId: Type.String(), + flowType: Type.Union([Type.String(), Type.Null()]), + state: Type.Union([PumpStateValueSchema, Type.Null()]), +}) +export type DataPathwayPumpStateBySource = Static + export const DataPathwayPumpStateSaveResponseSchema: TObject<{ status: TString }> = Type.Object({ @@ -617,7 +630,9 @@ const ThroughputRecentResultSchema: TThroughputRecentResult = Type.Object({ type TNullableNumber = TUnion<[TNumber, TNull]> -type TThroughputFlowType = TObject<{ +type TThroughputSource = TObject<{ + flowType: TString + name: TOptional eventsPerSecond: TNumber successRate: TNumber avgDurationMs: TNumber @@ -627,7 +642,9 @@ type TThroughputFlowType = TObject<{ healthy: TBoolean recentResults: TArray }> -const ThroughputFlowTypeSchema: TThroughputFlowType = Type.Object({ +const ThroughputSourceSchema: TThroughputSource = Type.Object({ + flowType: Type.String(), + name: Type.Optional(Type.String()), eventsPerSecond: Type.Number(), successRate: Type.Number(), avgDurationMs: Type.Number(), @@ -645,7 +662,7 @@ type TThroughputEndpoint = TObject<{ totalFailed: TNumber lastDeliveryAgeMs: TNullableNumber healthy: TBoolean - flowTypes: TRecord + sources: TRecord }> const ThroughputEndpointSchema: TThroughputEndpoint = Type.Object({ eventsPerSecond: Type.Number(), @@ -654,7 +671,7 @@ const ThroughputEndpointSchema: TThroughputEndpoint = Type.Object({ totalFailed: Type.Number(), lastDeliveryAgeMs: Type.Union([Type.Number(), Type.Null()]), healthy: Type.Boolean(), - flowTypes: Type.Record(Type.String(), ThroughputFlowTypeSchema), + sources: Type.Record(Type.String(), ThroughputSourceSchema), }) type TThroughputGlobal = TObject<{ diff --git a/test/tests/commands/data-pathways.test.ts b/test/tests/commands/data-pathways.test.ts index 123e217..4f6f354 100644 --- a/test/tests/commands/data-pathways.test.ts +++ b/test/tests/commands/data-pathways.test.ts @@ -22,7 +22,9 @@ import { DataPathwayFetchByNameCommand, DataPathwayFetchCommand, DataPathwayListCommand, + DataPathwayPumpStateFetchBySourceCommand, DataPathwayPumpStateFetchCommand, + DataPathwayPumpStateSaveBySourceCommand, DataPathwayPumpStateSaveCommand, DataPathwayQuotaFetchCommand, DataPathwayQuotaListCommand, @@ -625,6 +627,43 @@ describe("DataPathways", () => { assertEquals(result, response) }) + it("should fetch pump state by pathway + sourceId", async () => { + const pathwayId = crypto.randomUUID() + const sourceId = crypto.randomUUID() + const response = { + pathwayId, + sourceId, + flowType: "data.0", + state: { timeBucket: "2025-01-01T00:00:00Z", eventId: "evt-1" }, + } + + base.get(`/api/v1/pump-states/${pathwayId}/sources/${sourceId}`).respondWith(200, response) + + const result = await apiKeyClient.execute( + new DataPathwayPumpStateFetchBySourceCommand({ pathwayId, sourceId }), + ) + assertEquals(result, response) + }) + + it("should save pump state by pathway + sourceId", async () => { + const pathwayId = crypto.randomUUID() + const sourceId = crypto.randomUUID() + const response = { status: "ok" } + + base.put(`/api/v1/pump-states/${pathwayId}/sources/${sourceId}`) + .matchBody({ state: { timeBucket: "2025-01-01T00:00:00Z" } }) + .respondWith(200, response) + + const result = await apiKeyClient.execute( + new DataPathwayPumpStateSaveBySourceCommand({ + pathwayId, + sourceId, + state: { timeBucket: "2025-01-01T00:00:00Z" }, + }), + ) + assertEquals(result, response) + }) + // ── Command Fetch (GET /api/v1/commands/:commandId) ── it("should fetch a command by id", async () => { From f1ac7d357b78549d0c1b4936974fe4ff5ef9a92d Mon Sep 17 00:00:00 2001 From: jbiskur Date: Thu, 23 Apr 2026 16:57:07 +0100 Subject: [PATCH 2/2] style(fmt): single-line import in pump-state.fetch-by-source --- src/commands/data-pathways/pump-state.fetch-by-source.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/commands/data-pathways/pump-state.fetch-by-source.ts b/src/commands/data-pathways/pump-state.fetch-by-source.ts index 4307f91..21e90f9 100644 --- a/src/commands/data-pathways/pump-state.fetch-by-source.ts +++ b/src/commands/data-pathways/pump-state.fetch-by-source.ts @@ -1,8 +1,5 @@ import { Command } from "../../common/command.ts" -import { - type DataPathwayPumpStateBySource, - DataPathwayPumpStateBySourceSchema, -} from "../../contracts/data-pathways.ts" +import { type DataPathwayPumpStateBySource, DataPathwayPumpStateBySourceSchema } from "../../contracts/data-pathways.ts" import type { ClientError } from "../../exceptions/client-error.ts" import { NotFoundException } from "../../exceptions/not-found.ts" import { parseResponseHelper } from "../../utils/parse-response-helper.ts"