Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/commands/data-pathways/assignment.heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ export interface DataPathwayAssignmentHeartbeatInput {
totalFailed: number
lastDeliveryAgeMs: number | null
healthy: boolean
flowTypes: Record<
sources: Record<
string,
{
flowType: string
name?: string
eventsPerSecond: number
successRate: number
avgDurationMs: number
Expand Down
2 changes: 2 additions & 0 deletions src/commands/data-pathways/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ export * from "./quota.list.ts"
// Pump State
export * from "./pump-state.fetch.ts"
export * from "./pump-state.save.ts"
export * from "./pump-state.fetch-by-source.ts"
export * from "./pump-state.save-by-source.ts"

// Delivery Log
export * from "./delivery-log.list.ts"
Expand Down
41 changes: 41 additions & 0 deletions src/commands/data-pathways/pump-state.fetch-by-source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Command } from "../../common/command.ts"
import { type DataPathwayPumpStateBySource, DataPathwayPumpStateBySourceSchema } from "../../contracts/data-pathways.ts"
import type { ClientError } from "../../exceptions/client-error.ts"
import { NotFoundException } from "../../exceptions/not-found.ts"
import { parseResponseHelper } from "../../utils/parse-response-helper.ts"

export interface DataPathwayPumpStateFetchBySourceInput {
pathwayId: string
sourceId: string
}

export class DataPathwayPumpStateFetchBySourceCommand
extends Command<DataPathwayPumpStateFetchBySourceInput, DataPathwayPumpStateBySource> {
protected override allowedModes: ("apiKey" | "bearer")[] = ["apiKey"]

protected override getMethod(): string {
return "GET"
}

protected override getBaseUrl(): string {
return "https://data-pathways.api.flowcore.io"
}

protected override getPath(): string {
return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}`
}
Comment on lines +24 to +26
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Encode path parameters before building the source route.

Raw sourceId interpolation will misroute IDs containing path or query delimiters.

🐛 Proposed fix
   protected override getPath(): string {
-    return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}`
+    const pathwayId = encodeURIComponent(this.input.pathwayId)
+    const sourceId = encodeURIComponent(this.input.sourceId)
+    return `/api/v1/pump-states/${pathwayId}/sources/${sourceId}`
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/commands/data-pathways/pump-state.fetch-by-source.ts` around lines 27 -
29, The getPath method builds a URL with raw path params which can break for IDs
containing slashes or query chars; update protected override getPath() to encode
this.input.pathwayId and this.input.sourceId (e.g., via encodeURIComponent)
before interpolating them into
`/api/v1/pump-states/{pathwayId}/sources/{sourceId}` so the route is safe for
all ID values and preserves the exact parameter values.


protected override parseResponse(rawResponse: unknown): DataPathwayPumpStateBySource {
return parseResponseHelper(DataPathwayPumpStateBySourceSchema, rawResponse)
}

protected override handleClientError(error: ClientError): void {
if (error.status === 404) {
throw new NotFoundException("DataPathwayPumpState", {
pathwayId: this.input.pathwayId,
sourceId: this.input.sourceId,
})
}
throw error
}
}
41 changes: 41 additions & 0 deletions src/commands/data-pathways/pump-state.save-by-source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Command } from "../../common/command.ts"
import {
type DataPathwayPumpStateSaveResponse,
DataPathwayPumpStateSaveResponseSchema,
} from "../../contracts/data-pathways.ts"
import { parseResponseHelper } from "../../utils/parse-response-helper.ts"

export interface DataPathwayPumpStateSaveBySourceInput {
pathwayId: string
sourceId: string
state: {
timeBucket: string
eventId?: string
}
}

export class DataPathwayPumpStateSaveBySourceCommand
extends Command<DataPathwayPumpStateSaveBySourceInput, DataPathwayPumpStateSaveResponse> {
protected override retryOnFailure: boolean = false
protected override allowedModes: ("apiKey" | "bearer")[] = ["apiKey"]

protected override getMethod(): string {
return "PUT"
}

protected override getBaseUrl(): string {
return "https://data-pathways.api.flowcore.io"
}

protected override getPath(): string {
return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}`
}
Comment on lines +30 to +32
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Encode path parameters before building the source route.

sourceId is a public string input; reserved characters can change the path/query and target the wrong endpoint.

🐛 Proposed fix
   protected override getPath(): string {
-    return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}`
+    const pathwayId = encodeURIComponent(this.input.pathwayId)
+    const sourceId = encodeURIComponent(this.input.sourceId)
+    return `/api/v1/pump-states/${pathwayId}/sources/${sourceId}`
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
protected override getPath(): string {
return `/api/v1/pump-states/${this.input.pathwayId}/sources/${this.input.sourceId}`
}
protected override getPath(): string {
const pathwayId = encodeURIComponent(this.input.pathwayId)
const sourceId = encodeURIComponent(this.input.sourceId)
return `/api/v1/pump-states/${pathwayId}/sources/${sourceId}`
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/commands/data-pathways/pump-state.save-by-source.ts` around lines 30 -
32, The getPath() builder currently interpolates raw user input into the route,
so special/reserved characters in input.sourceId (and pathwayId) can break the
path; update the getPath() method to URL-encode those path parameters (e.g.,
apply encodeURIComponent to this.input.sourceId and this.input.pathwayId) before
inserting them into the template string so the generated route is safe and
cannot be manipulated by reserved characters.


protected override getBody(): Record<string, unknown> {
return { state: this.input.state }
}

protected override parseResponse(rawResponse: unknown): DataPathwayPumpStateSaveResponse {
return parseResponseHelper(DataPathwayPumpStateSaveResponseSchema, rawResponse)
}
}
25 changes: 21 additions & 4 deletions src/contracts/data-pathways.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,19 @@ export const DataPathwayPumpStateSchema: TObject<{
})
export type DataPathwayPumpState = Static<typeof DataPathwayPumpStateSchema>

export const DataPathwayPumpStateBySourceSchema: TObject<{
pathwayId: TString
sourceId: TString
flowType: TUnion<[TString, TNull]>
state: TUnion<[TPumpStateValue, TNull]>
}> = Type.Object({
pathwayId: Type.String(),
sourceId: Type.String(),
flowType: Type.Union([Type.String(), Type.Null()]),
state: Type.Union([PumpStateValueSchema, Type.Null()]),
})
export type DataPathwayPumpStateBySource = Static<typeof DataPathwayPumpStateBySourceSchema>

export const DataPathwayPumpStateSaveResponseSchema: TObject<{
status: TString
}> = Type.Object({
Expand Down Expand Up @@ -617,7 +630,9 @@ const ThroughputRecentResultSchema: TThroughputRecentResult = Type.Object({

type TNullableNumber = TUnion<[TNumber, TNull]>

type TThroughputFlowType = TObject<{
type TThroughputSource = TObject<{
flowType: TString
name: TOptional<TString>
eventsPerSecond: TNumber
successRate: TNumber
avgDurationMs: TNumber
Expand All @@ -627,7 +642,9 @@ type TThroughputFlowType = TObject<{
healthy: TBoolean
recentResults: TArray<TThroughputRecentResult>
}>
const ThroughputFlowTypeSchema: TThroughputFlowType = Type.Object({
const ThroughputSourceSchema: TThroughputSource = Type.Object({
flowType: Type.String(),
name: Type.Optional(Type.String()),
eventsPerSecond: Type.Number(),
successRate: Type.Number(),
avgDurationMs: Type.Number(),
Expand All @@ -645,7 +662,7 @@ type TThroughputEndpoint = TObject<{
totalFailed: TNumber
lastDeliveryAgeMs: TNullableNumber
healthy: TBoolean
flowTypes: TRecord<TString, TThroughputFlowType>
sources: TRecord<TString, TThroughputSource>
}>
const ThroughputEndpointSchema: TThroughputEndpoint = Type.Object({
eventsPerSecond: Type.Number(),
Expand All @@ -654,7 +671,7 @@ const ThroughputEndpointSchema: TThroughputEndpoint = Type.Object({
totalFailed: Type.Number(),
lastDeliveryAgeMs: Type.Union([Type.Number(), Type.Null()]),
healthy: Type.Boolean(),
flowTypes: Type.Record(Type.String(), ThroughputFlowTypeSchema),
sources: Type.Record(Type.String(), ThroughputSourceSchema),
})

type TThroughputGlobal = TObject<{
Expand Down
39 changes: 39 additions & 0 deletions test/tests/commands/data-pathways.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import {
DataPathwayFetchByNameCommand,
DataPathwayFetchCommand,
DataPathwayListCommand,
DataPathwayPumpStateFetchBySourceCommand,
DataPathwayPumpStateFetchCommand,
DataPathwayPumpStateSaveBySourceCommand,
DataPathwayPumpStateSaveCommand,
DataPathwayQuotaFetchCommand,
DataPathwayQuotaListCommand,
Expand Down Expand Up @@ -625,6 +627,43 @@ describe("DataPathways", () => {
assertEquals(result, response)
})

it("should fetch pump state by pathway + sourceId", async () => {
const pathwayId = crypto.randomUUID()
const sourceId = crypto.randomUUID()
const response = {
pathwayId,
sourceId,
flowType: "data.0",
state: { timeBucket: "2025-01-01T00:00:00Z", eventId: "evt-1" },
}

base.get(`/api/v1/pump-states/${pathwayId}/sources/${sourceId}`).respondWith(200, response)

const result = await apiKeyClient.execute(
new DataPathwayPumpStateFetchBySourceCommand({ pathwayId, sourceId }),
)
assertEquals(result, response)
})

it("should save pump state by pathway + sourceId", async () => {
const pathwayId = crypto.randomUUID()
const sourceId = crypto.randomUUID()
const response = { status: "ok" }

base.put(`/api/v1/pump-states/${pathwayId}/sources/${sourceId}`)
.matchBody({ state: { timeBucket: "2025-01-01T00:00:00Z" } })
.respondWith(200, response)

const result = await apiKeyClient.execute(
new DataPathwayPumpStateSaveBySourceCommand({
pathwayId,
sourceId,
state: { timeBucket: "2025-01-01T00:00:00Z" },
}),
)
assertEquals(result, response)
})

// ── Command Fetch (GET /api/v1/commands/:commandId) ──

it("should fetch a command by id", async () => {
Expand Down
Loading