Skip to content

feat(event-process): event-process consumer + wildcard subscribe for events.received.* (EVO-1208)#28

Merged
dpaes merged 4 commits into
developfrom
feat/EVO-1208
Jun 9, 2026
Merged

feat(event-process): event-process consumer + wildcard subscribe for events.received.* (EVO-1208)#28
dpaes merged 4 commits into
developfrom
feat/EVO-1208

Conversation

@nickoliveira23

Copy link
Copy Markdown

Summary

Wires RUN_MODE=event-process (story 3.3): the consumer end of the webhook event pipeline that subscribes to the whole events.received.<platform> topic family, propagates correlationId, and runs each envelope through a validating stub handler (stories 3.4–3.7 fill in the real pipeline).

Resolves the wildcard-subscribe gap the family needs:

  • New IMessageBroker.subscribePattern(prefix, handler)Kafka subscribes with a native RegExp (^events\.received\.[^.]+$) under groupId ${runMode}-${prefix}, no topic creation for the pattern; RabbitMQ binds a durable queue to a shared events.received topic exchange with the events.received.# binding.
  • Aligns the RabbitMQ topic→exchange mapping for the events.received family only (shared exchange + routing key per platform), per the EVO-1195 contract (EVENTS_RECEIVED_RABBITMQ_BINDING). campaigns.* keeps the per-topic model — proven unchanged by the existing contract scenarios.
  • Adds a parametrized wildcard fan-in scenario to the contract suite, closing the EVO-1199 coverage gap.

Stacked PR

Based on feat/EVO-1199 (PR #27, in review) because it edits broker.contract.spec.ts, which is not yet on develop. Retarget the base to develop once #27 merges. The EVO-1208 delta is the 13 files below.

Security

  • A schema-invalid envelope is dropped (terminal nack + terminal_failures metric) instead of requeued — prevents an infinite redelivery loop on a poison message. Only transient errors requeue.
  • No secrets; broker creds live only in the CI/dev compose.

Test plan

  • evo-flow: npm run typecheck / npm run lint — clean
  • evo-flow: npm test — 62 broker + event-process unit specs pass (incl. subscribePattern, terminal-drop, bootstrap subscription). One unrelated pre-existing failure (campaigns.controller.spec, confirmed on base).
  • evo-flow: BROKER_CONTRACT=1 npm run test:contract — 14/14 deterministic scenarios incl. wildcard fan-in, both adapters, against the CI broker stack
  • evo-flow: RUN_MODE=event-process npm run dev — boot smoke: EventProcessModule loads, broker boots, consumer subscribes on onApplicationBootstrap, no crash on cold start (zero topics)

Changed Files

  • src/shared/broker/interfaces/message-broker.interface.ts
  • src/shared/broker/adapters/{kafka,rabbitmq}-broker.adapter.ts (+ specs)
  • src/shared/broker/broker.contract.spec.ts
  • src/runners/event-process/event-process.module.ts
  • src/runners/event-process/services/{event-process.service,events-received.consumer}.ts (+ specs)
  • src/main.ts, src/app.module.ts, src/bootstrap/bootstrap.service.ts

Notes (out of scope, not blocking)

  • Kafka regex consumers only pick up brand-new topics on a metadata refresh (~metadataMaxAge) — a cold event-process may lag a first-ever platform topic by minutes. Mitigated once EVO-1200 [1.7] pre-provisions the topics.
  • Pattern-subscription reconnect reuses the (tested) single-topic attachConsumer path; no dedicated reconnect test.

Linked Issue

  • EVO-1208

🤖 Generated with Claude Code

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @nickoliveira23, you have reached your weekly rate limit of 500000 diff characters.

Please try again later or upgrade to continue using Sourcery

@dpaes dpaes changed the base branch from feat/EVO-1199 to develop June 8, 2026 19:40
@dpaes

dpaes commented Jun 8, 2026

Copy link
Copy Markdown

Code reviewed (positive) — merge blocked on a rebase after #27 landed

The review itself is done and positive:

  • EventsReceivedConsumer subscribes the events.received.* family, runs each envelope in the correlation context, acks. Poison-message handling correct: InvalidEnvelopeErrornack(false) (terminal drop), transient → nack(true) (requeue). Subscription in onApplicationBootstrap so the broker adapter is active first.
  • subscribePattern: Kafka anchored RegExp ^events\.received\.[^.]+$; RabbitMQ shared topic exchange scoped to events.received (campaigns.* untouched), publish + subscribe agreeing via resolveExchange + events.received.# binding.
  • EventProcessService is the validating stub per story 3.3.

Why it can't merge yet: #27 (EVO-1199) is merged to develop. I retargeted this PR's base from feat/EVO-1199 to develop, and GitHub now reports it CONFLICTING — the stacked-squash effect: #27 landed as one squash commit, but this branch still carries #27's original commits in its history, so the three-dot merge conflicts.

To unblock (dev): rebase feat/EVO-1208 onto the latest develop (drop the now-squashed #27 commits) and force-push. After the rebase:

  • the broker-contract gate will finally run here (it didn't before — the base was a feature branch, not develop), and
  • I'll confirm it green, then approve + merge.

Not approving yet — this is purely a branch-state rebase, not a code change. Holding until rebase + green gate.

@dpaes

dpaes commented Jun 9, 2026

Copy link
Copy Markdown

Review — code looks sound; merge blocked on a rebase

Reviewed the consumer, both adapters' subscribePattern/ack/nack, the events.received contract, and the wiring. The three risk areas are correctly handled: invalid envelope → typed InvalidEnvelopeError → terminal drop (nack requeue=false) while transient errors requeue; the wildcard is anchored/scoped so it can't swallow campaigns.*; subscription runs in onApplicationBootstrap after the adapter is active.

One LOW — a contract follow-up, not this PR: the consumer terminally drops any envelope failing the strict correlationId: z.uuidv4() schema, while the EVO-1209 producer (webhook-intake) deliberately publishes envelopes with a non-UUID SAFE_CORRELATION_ID token ("dropping is worse than publishing"). External providers don't send X-Correlation-Id, so the common path mints a UUID and passes; the divergence only bites an internal/propagated non-UUID token, and the drop is observable (terminalFailures metric + log). Fix belongs in the contract/producer layer (relax correlationId to SAFE_CORRELATION_ID, or mint a dedicated envelope UUID) — NOT by weakening this consumer's terminal-drop, which would reintroduce the round-1 poison-requeue. Tracking as a correlation/contract follow-up; does not block this PR.

Merge blocked on a rebase: this PR now conflicts with develop on kafka-broker.adapter.ts + rabbitmq-broker.adapter.ts — EVO-1200's provisionTopic landed at the same anchor as this PR's subscribePattern (additive — keep both methods). app.module.ts/main.ts auto-merge cleanly. Rebase onto origin/develop and I'll merge. Card stays In Review.

nickoliveira23 and others added 4 commits June 9, 2026 10:50
…ange (EVO-1208)

Adds IMessageBroker.subscribePattern(prefix, handler): Kafka subscribes with a
native RegExp under groupId ${runMode}-${prefix} (no topic creation for the
pattern); RabbitMQ binds a durable queue to a shared <prefix> topic exchange
with the <prefix>.# binding. Aligns the RabbitMQ topic->exchange mapping for
the events.received family only (shared exchange + routing key per platform),
per the EVO-1195 contract — campaigns.* keeps the per-topic model. Closes the
EVO-1199 coverage gap with a parametrized wildcard fan-in scenario.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…received.* (EVO-1208)

EventProcessModule + EventsReceivedConsumer subscribe the events.received
family via subscribePattern, run each envelope under the correlation context,
and ack (nack+requeue on failure); EventProcessService is the validating stub
for stories 3.4-3.7. Removes EVENT_PROCESS from STUB_RUN_MODES and conditionally
imports the module (AppFactory.shouldStartEventProcess()).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…otstrap (EVO-1208)

Addresses self-review findings:
- HIGH: a schema-invalid envelope threw and was nacked with requeue, looping
  forever (tight redelivery, DoS-class). Validation failures now raise a typed
  InvalidEnvelopeError and are dropped (terminal nack); only transient errors
  requeue.
- MEDIUM: move the broker subscription from onModuleInit to
  onApplicationBootstrap so the broker adapter is guaranteed active first
  (hook order across modules is otherwise undefined).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…-1208)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@dpaes dpaes left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review — Approved ✅

Re-rebase confirmed clean: the 4 EVO-1208 commits replay onto current develop (0 EVO-1199 commits), and the additive conflicts were resolved correctly — provisionTopic (EVO-1200) + subscribePattern (EVO-1208) side-by-side in both adapters; app.module.ts keeps both the CampaignPackerModule (EVO-1215) and event-process registrations. Scope is unchanged from the version I reviewed (541+/11−, 14 files). The EVO-1199 contract-suite merge gate passed (deterministic contract scenarios + tenant-db-context green); only the explicitly non-blocking broker-restart reconnect scenario is still pending.

Code was already reviewed and is sound: invalid envelope → typed InvalidEnvelopeErrorterminal drop while transient errors requeue (closes the round-1 infinite-redelivery HIGH); the wildcard is anchored/scoped so it can't swallow campaigns.*; subscription runs in onApplicationBootstrap after the adapter is active.

Known LOW (contract follow-up, not a blocker): the consumer terminally drops an envelope whose correlationId fails the strict z.uuidv4() schema, while the EVO-1209 producer deliberately publishes non-UUID SAFE_CORRELATION_ID tokens. External providers don't send X-Correlation-Id (common path mints a UUID and passes); fix belongs in the contract/correlation story (relax the contract to SAFE_CORRELATION_ID), NOT by weakening this consumer's terminal-drop.

Merging to develop.

@dpaes dpaes merged commit 277e8b5 into develop Jun 9, 2026
4 checks passed
@dpaes dpaes deleted the feat/EVO-1208 branch June 9, 2026 14:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants