From cff61862c44a74bbbe8ca311fc1c7561d36b204f Mon Sep 17 00:00:00 2001 From: jbiskur Date: Tue, 5 May 2026 15:31:37 +0100 Subject: [PATCH] fix(pump): preserve factory arity 2 so per-pumpGroup state managers are honored MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit createPostgresPumpStateManagerFactory returned a function with a default value on `pumpGroup`, making `Function.prototype.length === 1`. PathwayPump's arity check (`stateManagerFactoryArity <= 1`) treated the factory as legacy and called it with `flowType` only — so the defaulted `pumpGroup` collapsed every group onto a single shared state manager keyed `(flowType, "default")`. Production effect (verified in service-tenant-store-api@1.6.3): - _pathway_pump_state had ONE row for tenant.0 (the default group). - The default pump's stored event_id was an api-key.used.0 id, even though the default pump's eventTypes filter excludes that event type. - The hot pump (pumpGroup="hot", eventTypes=["api-key.used.0"]) never wrote state — both pumps shared the same manager, default pump's setState calls advanced the shared cursor past hot pump's events, hot pump fetched empty batches, never accumulated events to ack, never called setState. The isolation guarantee promised by 2.4.0 was silently broken. Fix: drop the default value on pumpGroup so .length === 2. PathwayPump now takes the non-legacy branch and forwards both (flowType, pumpGroup) to the factory. Each (flowType, pumpGroup) pair gets its own PostgresPumpStateManager instance and its own row in _pathway_pump_state. `pumpGroup ?? DEFAULT_PUMP_GROUP` inside the body keeps runtime safety for any external caller passing undefined — same observable behavior as before for legacy callers. Tests: - postgres-pump-state.test.ts — assert factory.length === 2 (the direct regression catch). - pathway-pump.test.ts — assert PathwayPump forwards (flowType, pumpGroup) to an arity-2 factory and creates one manager per (flowType, pumpGroup), with no legacy-arity warning. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/pathways/pump/state.ts | 10 ++++- tests/pathway-pump.test.ts | 65 +++++++++++++++++++++++++++++++ tests/postgres-pump-state.test.ts | 25 ++++++++++++ 3 files changed, 98 insertions(+), 2 deletions(-) diff --git a/src/pathways/pump/state.ts b/src/pathways/pump/state.ts index 4ab14a3..2e80b3a 100644 --- a/src/pathways/pump/state.ts +++ b/src/pathways/pump/state.ts @@ -112,7 +112,13 @@ export async function createPostgresPumpStateManagerFactory( const adapter = new PostgresJsAdapter(pgConfig) await adapter.connect() - return (flowType: string, pumpGroup: string = DEFAULT_PUMP_GROUP): PumpStateManager => { - return new PostgresPumpStateManager(adapter, flowType, pumpGroup, table) + // The returned function MUST declare `pumpGroup` without a default value so + // `Function.prototype.length === 2`. PathwayPump's arity check + // (`stateManagerFactoryArity <= 1`) treats single-arg factories as legacy and + // calls them with `flowType` only — collapsing every pumpGroup onto one + // shared state manager. The `?? DEFAULT_PUMP_GROUP` fallback inside the body + // preserves runtime safety for any external caller passing `undefined`. + return (flowType: string, pumpGroup: string): PumpStateManager => { + return new PostgresPumpStateManager(adapter, flowType, pumpGroup ?? DEFAULT_PUMP_GROUP, table) } } diff --git a/tests/pathway-pump.test.ts b/tests/pathway-pump.test.ts index 78ca765..1a0b52d 100644 --- a/tests/pathway-pump.test.ts +++ b/tests/pathway-pump.test.ts @@ -349,6 +349,71 @@ Deno.test({ assertEquals([...pump.registeredFlowTypes].sort(), ["orders", "users"]) }) + await t.step( + "arity-2 state factory receives both (flowType, pumpGroup) so each group gets its own manager", + async () => { + // Regression: a factory declared with a default value on `pumpGroup` + // (e.g. `(flowType, pumpGroup = "default") => …`) has `.length === 1`, + // is detected as legacy, and is called with `flowType` only — collapsing + // every group onto a shared state manager. The factory MUST declare + // arity 2 (no defaults) so per-(flowType, pumpGroup) isolation works. + const calls: Array<{ flowType: string; pumpGroup: string | undefined }> = [] + const stateByKey = new Map() + // Explicit arity 2 — `.length === 2`. + const factory = (flowType: string, pumpGroup: string): PumpStateManager => { + calls.push({ flowType, pumpGroup }) + const key = `${flowType}::${pumpGroup}` + if (!stateByKey.has(key)) { + stateByKey.set(key, new InMemoryPumpStateManager()) + } + return stateByKey.get(key)! + } + + assertEquals(factory.length, 2, "factory must declare arity 2 — sanity check") + + const warns: string[] = [] + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + }, { + debug: () => {}, + info: () => {}, + warn: (msg: string) => warns.push(msg), + error: () => {}, + }) + + pump.configure({ + tenant: "t", + dataCore: "dc", + apiKey: "k", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const internal = pump as unknown as InternalPump + internal.dataPumpConstructor = { + create: (_options: Record) => Promise.resolve({ start: async () => {} }), + } + + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "hot", eventTypes: ["placed.fast"] }) + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "default", eventTypes: ["placed"] }) + + // Both args forwarded — legacy fallback was NOT taken. + assertEquals(calls, [ + { flowType: "orders", pumpGroup: "hot" }, + { flowType: "orders", pumpGroup: "default" }, + ]) + // Two distinct state manager instances exist, one per (flowType, pumpGroup). + assertEquals(stateByKey.size, 2) + // No legacy-arity warning. + assertEquals( + warns.filter((m) => m.includes("legacy single-arg signature")).length, + 0, + "arity-2 factory must not trigger the legacy deprecation warning", + ) + }, + ) + await t.step("legacy single-arg state factory is accepted with a deprecation warning", async () => { const created = new Map() // Arity 1 — old factory signature. diff --git a/tests/postgres-pump-state.test.ts b/tests/postgres-pump-state.test.ts index 7c4a152..f064b86 100644 --- a/tests/postgres-pump-state.test.ts +++ b/tests/postgres-pump-state.test.ts @@ -18,6 +18,31 @@ Deno.test({ sanitizeResources: false, sanitizeOps: false, fn: async (t) => { + await t.step( + "factory function declares arity 2 — PathwayPump treats arity<=1 as legacy and would call it with flowType only", + async () => { + const factory = await createPostgresPumpStateManagerFactory({ ...config, tableName: TABLE_NEW }) + try { + // Function.prototype.length skips parameters with default values. + // PathwayPump (`stateManagerFactoryArity <= 1`) treats arity-1 + // factories as legacy and collapses every pumpGroup onto a single + // shared state manager. The factory MUST declare both parameters + // without defaults to keep .length at 2 — runtime safety for + // `pumpGroup === undefined` is handled inside the factory body. + assertEquals( + factory.length, + 2, + "createPostgresPumpStateManagerFactory must return a function with arity 2", + ) + } finally { + const adapter = new PostgresJsAdapter(config) + await adapter.connect() + await adapter.execute(`DROP TABLE IF EXISTS ${TABLE_NEW}`) + await adapter.disconnect() + } + }, + ) + await t.step("greenfield schema accepts composite (flow_type, pump_group)", async () => { const factory = await createPostgresPumpStateManagerFactory({ ...config, tableName: TABLE_NEW }) const adapter = new PostgresJsAdapter(config)