diff --git a/src/commands/data-pathways/assignment.heartbeat.ts b/src/commands/data-pathways/assignment.heartbeat.ts index 15dc26c..3a74d7c 100644 --- a/src/commands/data-pathways/assignment.heartbeat.ts +++ b/src/commands/data-pathways/assignment.heartbeat.ts @@ -21,9 +21,11 @@ export interface DataPathwayAssignmentHeartbeatInput { totalFailed: number lastDeliveryAgeMs: number | null healthy: boolean - flowTypes: Record< + sources: Record< string, { + flowType: string + name?: string eventsPerSecond: number successRate: number avgDurationMs: number diff --git a/src/commands/data-pathways/index.ts b/src/commands/data-pathways/index.ts index f9d63ec..30db299 100644 --- a/src/commands/data-pathways/index.ts +++ b/src/commands/data-pathways/index.ts @@ -50,6 +50,8 @@ export * from "./quota.list.ts" // Pump State export * from "./pump-state.fetch.ts" export * from "./pump-state.save.ts" +export * from "./pump-state.fetch-by-source.ts" +export * from "./pump-state.save-by-source.ts" // Delivery Log export * from "./delivery-log.list.ts" diff --git a/src/commands/data-pathways/pump-state.fetch-by-source.ts b/src/commands/data-pathways/pump-state.fetch-by-source.ts new file mode 100644 index 0000000..21e90f9 --- /dev/null +++ b/src/commands/data-pathways/pump-state.fetch-by-source.ts @@ -0,0 +1,41 @@ +import { Command } from "../../common/command.ts" +import { type DataPathwayPumpStateBySource, DataPathwayPumpStateBySourceSchema } from "../../contracts/data-pathways.ts" +import type { ClientError } from "../../exceptions/client-error.ts" +import { NotFoundException } from "../../exceptions/not-found.ts" +import { parseResponseHelper } from "../../utils/parse-response-helper.ts" + +export interface DataPathwayPumpStateFetchBySourceInput { + pathwayId: string + sourceId: string +} + +export class DataPathwayPumpStateFetchBySourceCommand + extends Command { + protected override allowedModes: ("apiKey" | "bearer")[] = ["apiKey"] + + protected override getMethod(): string { + return "GET" + } + + protected override getBaseUrl(): string { + return "https://data-pathways.api.flowcore.io" + } + + protected override getPath(): string { + return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}` + } + + protected override parseResponse(rawResponse: unknown): DataPathwayPumpStateBySource { + return parseResponseHelper(DataPathwayPumpStateBySourceSchema, rawResponse) + } + + protected override handleClientError(error: ClientError): void { + if (error.status === 404) { + throw new NotFoundException("DataPathwayPumpState", { + pathwayId: this.input.pathwayId, + sourceId: this.input.sourceId, + }) + } + throw error + } +} diff --git a/src/commands/data-pathways/pump-state.save-by-source.ts b/src/commands/data-pathways/pump-state.save-by-source.ts new file mode 100644 index 0000000..7c08eb8 --- /dev/null +++ b/src/commands/data-pathways/pump-state.save-by-source.ts @@ -0,0 +1,41 @@ +import { Command } from "../../common/command.ts" +import { + type DataPathwayPumpStateSaveResponse, + DataPathwayPumpStateSaveResponseSchema, +} from "../../contracts/data-pathways.ts" +import { parseResponseHelper } from "../../utils/parse-response-helper.ts" + +export interface DataPathwayPumpStateSaveBySourceInput { + pathwayId: string + sourceId: string + state: { + timeBucket: string + eventId?: string + } +} + +export class DataPathwayPumpStateSaveBySourceCommand + extends Command { + protected override retryOnFailure: boolean = false + protected override allowedModes: ("apiKey" | "bearer")[] = ["apiKey"] + + protected override getMethod(): string { + return "PUT" + } + + protected override getBaseUrl(): string { + return "https://data-pathways.api.flowcore.io" + } + + protected override getPath(): string { + return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}` + } + + protected override getBody(): Record { + return { state: this.input.state } + } + + protected override parseResponse(rawResponse: unknown): DataPathwayPumpStateSaveResponse { + return parseResponseHelper(DataPathwayPumpStateSaveResponseSchema, rawResponse) + } +} diff --git a/src/contracts/data-pathways.ts b/src/contracts/data-pathways.ts index e40c895..36d667d 100644 --- a/src/contracts/data-pathways.ts +++ b/src/contracts/data-pathways.ts @@ -513,6 +513,19 @@ export const DataPathwayPumpStateSchema: TObject<{ }) export type DataPathwayPumpState = Static +export const DataPathwayPumpStateBySourceSchema: TObject<{ + pathwayId: TString + sourceId: TString + flowType: TUnion<[TString, TNull]> + state: TUnion<[TPumpStateValue, TNull]> +}> = Type.Object({ + pathwayId: Type.String(), + sourceId: Type.String(), + flowType: Type.Union([Type.String(), Type.Null()]), + state: Type.Union([PumpStateValueSchema, Type.Null()]), +}) +export type DataPathwayPumpStateBySource = Static + export const DataPathwayPumpStateSaveResponseSchema: TObject<{ status: TString }> = Type.Object({ @@ -617,7 +630,9 @@ const ThroughputRecentResultSchema: TThroughputRecentResult = Type.Object({ type TNullableNumber = TUnion<[TNumber, TNull]> -type TThroughputFlowType = TObject<{ +type TThroughputSource = TObject<{ + flowType: TString + name: TOptional eventsPerSecond: TNumber successRate: TNumber avgDurationMs: TNumber @@ -627,7 +642,9 @@ type TThroughputFlowType = TObject<{ healthy: TBoolean recentResults: TArray }> -const ThroughputFlowTypeSchema: TThroughputFlowType = Type.Object({ +const ThroughputSourceSchema: TThroughputSource = Type.Object({ + flowType: Type.String(), + name: Type.Optional(Type.String()), eventsPerSecond: Type.Number(), successRate: Type.Number(), avgDurationMs: Type.Number(), @@ -645,7 +662,7 @@ type TThroughputEndpoint = TObject<{ totalFailed: TNumber lastDeliveryAgeMs: TNullableNumber healthy: TBoolean - flowTypes: TRecord + sources: TRecord }> const ThroughputEndpointSchema: TThroughputEndpoint = Type.Object({ eventsPerSecond: Type.Number(), @@ -654,7 +671,7 @@ const ThroughputEndpointSchema: TThroughputEndpoint = Type.Object({ totalFailed: Type.Number(), lastDeliveryAgeMs: Type.Union([Type.Number(), Type.Null()]), healthy: Type.Boolean(), - flowTypes: Type.Record(Type.String(), ThroughputFlowTypeSchema), + sources: Type.Record(Type.String(), ThroughputSourceSchema), }) type TThroughputGlobal = TObject<{ diff --git a/test/tests/commands/data-pathways.test.ts b/test/tests/commands/data-pathways.test.ts index 123e217..4f6f354 100644 --- a/test/tests/commands/data-pathways.test.ts +++ b/test/tests/commands/data-pathways.test.ts @@ -22,7 +22,9 @@ import { DataPathwayFetchByNameCommand, DataPathwayFetchCommand, DataPathwayListCommand, + DataPathwayPumpStateFetchBySourceCommand, DataPathwayPumpStateFetchCommand, + DataPathwayPumpStateSaveBySourceCommand, DataPathwayPumpStateSaveCommand, DataPathwayQuotaFetchCommand, DataPathwayQuotaListCommand, @@ -625,6 +627,43 @@ describe("DataPathways", () => { assertEquals(result, response) }) + it("should fetch pump state by pathway + sourceId", async () => { + const pathwayId = crypto.randomUUID() + const sourceId = crypto.randomUUID() + const response = { + pathwayId, + sourceId, + flowType: "data.0", + state: { timeBucket: "2025-01-01T00:00:00Z", eventId: "evt-1" }, + } + + base.get(`/api/v1/pump-states/${pathwayId}/sources/${sourceId}`).respondWith(200, response) + + const result = await apiKeyClient.execute( + new DataPathwayPumpStateFetchBySourceCommand({ pathwayId, sourceId }), + ) + assertEquals(result, response) + }) + + it("should save pump state by pathway + sourceId", async () => { + const pathwayId = crypto.randomUUID() + const sourceId = crypto.randomUUID() + const response = { status: "ok" } + + base.put(`/api/v1/pump-states/${pathwayId}/sources/${sourceId}`) + .matchBody({ state: { timeBucket: "2025-01-01T00:00:00Z" } }) + .respondWith(200, response) + + const result = await apiKeyClient.execute( + new DataPathwayPumpStateSaveBySourceCommand({ + pathwayId, + sourceId, + state: { timeBucket: "2025-01-01T00:00:00Z" }, + }), + ) + assertEquals(result, response) + }) + // ── Command Fetch (GET /api/v1/commands/:commandId) ── it("should fetch a command by id", async () => {