Split Live Activities into a dedicated pipeline lane#45
Split Live Activities into a dedicated pipeline lane#45mithileshchellappan wants to merge 16 commits into
Conversation
Introduce LA_WORKER_COUNT, LA_SENDER_COUNT, and LA_QUEUE_SIZE so Live Activities get their own worker pool sizing, independent of the push notification lane.
Live Activity jobs previously shared the push notification job, task, and outcome pipelines, so a large LA fanout could block timed pushes (and vice versa). Give LAs their own pipelines, masters, senders, and outcome worker, route LA dispatches to the new lane in the server, and drain both lanes in parallel on shutdown via drainChain.
b4acb79 to
60d89a6
Compare
There was a problem hiding this comment.
Important
The LA-specific worker settings need to be wired up or removed before merge because this PR currently documents a separate lane that runtime never uses.
Reviewed changes — This PR makes LA update/end token fanout scoped by default while preserving broad fanout until the corresponding start dispatch has finished establishing activity associations.
- Replace gated scoped fanout —
GetLATokenBatchForDispatchnow loads dispatch/job scope first and always applies the activity-association predicate for non-start dispatches after start is no longer pending. - Handle pending start dispatches —
liveActivityDispatchFanoutScope.requiresActivityAssociation()defers association filtering while a same-job start dispatch isQUEUED,IN_PROGRESS, orDISPATCHED. - Add focused fanout tests —
internal/storage/live_activity_fanout_test.gocovers pending-start, completed-start unassociated, and completed-start associated token behavior with a fake SQL driver. - Add LA worker configuration knobs —
.env.exampleandinternal/config.Confignow exposeLA_WORKER_COUNT,LA_SENDER_COUNT, andLA_QUEUE_SIZE.
GPT | 𝕏
There was a problem hiding this comment.
ℹ️ No critical issues — one documentation/default mismatch inline.
Reviewed changes — This follow-up review covers the updated PR head, which now wires Live Activity dispatches through a dedicated pipeline lane instead of only loading LA-specific config.
- Add dedicated LA pipelines —
cmd/pushboy/main.gocreates separate LA job, task, and outcome pipelines sized byLA_QUEUE_SIZE. - Start independent LA workers — The app starts LA masters from
LA_WORKER_COUNT, LA senders fromLA_SENDER_COUNT, and a separate LA outcome worker. - Route LA dispatches to the LA lane —
Server.Newaccepts both push and LA job pipelines, and immediate LA dispatch enqueue now submits tolaJobPipeline. - Drain both lanes on shutdown —
drainChainshuts down each worker chain in order, and startup drains the push and LA chains before closing the database. - Add lane and shutdown tests — New tests cover LA routing, push/LA isolation, and shutdown ordering/join behavior.
GPT | 𝕏
There was a problem hiding this comment.
Caution
The new commits leave the PR non-buildable and disable the Live Activity API/pipeline path that this PR is meant to introduce.
Reviewed changes — Reviewed the delta after prior Pullfrog review 60d89a6, which refactors worker fanout/dispatch generics and changes the current LA lane wiring.
- Genericized worker fanout and dispatch —
MasterWorkerandSenderWorkernow accept generic job/task/outcome types with injected fanout/dispatch functions. - Split LA task models — LA-specific job/task/outcome payloads moved from
model.JobItemintomodel.LAJobItem,model.LASendTask, andmodel.LASendOutcome. - Commented out LA runtime wiring — The new head comments out LA route/handler code, LA pipeline startup, LA workers, and LA shutdown drain wiring.
- Disabled scheduler startup —
cmd/pushboy/main.gono longer starts or stopsscheduler, so scheduled push processing is not active. - Added fanout indexes — Migration
000013adds indexes for topic/user and active token fanout lookup paths.
GPT | 𝕏
| if got.JobType != model.JobTypeLA { | ||
| t.Fatalf("submitted JobType = %q, want %q", got.JobType, model.JobTypeLA) | ||
| } | ||
| if got.LAActivityID != "race-42" || got.LAJobID != "la-job-1" || got.LADispatchID == "" { |
There was a problem hiding this comment.
This test no longer compiles because model.JobItem no longer has LAActivityID, LAJobID, or LADispatchID after those fields moved to the LA-specific model. The server path and test need to agree on the restored LA pipeline item type; currently go test ./internal/server fails before executing the route assertions.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
pushboy/internal/server/server_test.go
Line 613 in 34d6c26
The store interface now declares ApplyLAOutcomeBatch(ctx, []model.LASendOutcome), but this test stub still implements the old []model.SendOutcome signature. Any test package that uses serverStoreStub as a storage.Store fails to compile until the stub is updated to the new LA outcome type.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 34d6c265f9
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| SELECT 1 FROM live_activity_dispatches newer | ||
| WHERE newer.live_activity_job_id = d.live_activity_job_id | ||
| AND newer.action = 'update' | ||
| AND newer.created_at > d.created_at |
There was a problem hiding this comment.
Ignore failed newer dispatches when superseding
When a later update row exists but never actually gets sent (for example, enqueueImmediateLADispatch marks the newer dispatch FAILED after the LA queue is closed/full), this EXISTS still marks the older queued/in-progress update as SUPERSEDED because it only checks created_at. FanoutLATokens then returns without delivering the older update, so clients can receive neither update; restrict the newer row to deliverable/successfully enqueued statuses instead of any newer row.
Useful? React with 👍 / 👎.
| // ever submits into a closed pipeline. | ||
| func drainChain(ctx context.Context, stages []drainStage) { | ||
| for _, stage := range stages { | ||
| if err := stage.pipe.Close(ctx); err != nil { |
There was a problem hiding this comment.
Drain queued items before closing each stage
In the SIGTERM path with buffered work, calling Close at the start of each stage does not drain MemoryPipeline: Close closes done, and Receive can return ErrClosed immediately even when the channel still contains jobs/tasks/outcomes. Because drainChain waits only after closing the pipe, the new LA lane (and the refactored push lane) can report graceful shutdown while dropping queued sends or unpersisted outcomes; the shutdown path needs to stop new submissions separately or make receivers drain buffered items after close.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Caution
This PR currently fails to compile in the changed server and worker test packages.
Reviewed changes — this review covers the split of Live Activity dispatching into its own worker lane while preserving push dispatch, scheduler startup, shutdown draining, and outcome processing semantics.
- Add a dedicated Live Activity lane —
cmd/pushboy/main.gonow creates separate LA job, task, and outcome pipelines with separate master, sender, and outcome workers. - Route Live Activity HTTP dispatches to LA jobs —
internal/server/live_activity.goenqueuesmodel.LAJobItemthroughs.laJobPipelinewhile push jobs continue usingmodel.JobItem. - Refactor worker primitives to generic pipelines —
internal/workersnow has generic master, sender, dispatch, fanout, and outcome worker paths for push and LA-specific task types. - Add LA superseding and query/index changes — storage adds
SupersedeLADispatchIfStale, split LA token batch queries, and new fanout-oriented indexes. - Update shutdown and tests — shutdown drains push and LA chains in parallel, with new
cmd/pushboytests for drain ordering and join behavior.
🚨 Changed packages do not compile
go test ./cmd/pushboy ./internal/server ./internal/workers passes cmd/pushboy, but both internal/server and internal/workers fail at compile time. The stale tests still instantiate the pre-refactor OutcomeWorker and mixed push/LA model.SendOutcome path after the production code split LA outcomes into model.LASendOutcome.
Technical details
# Changed packages do not compile
## Affected sites
- `internal/server/server_test.go:613` — `serverStoreStub.ApplyLAOutcomeBatch` still accepts `[]model.SendOutcome`, so `*serverStoreStub` no longer implements `storage.Store`.
- `internal/workers/outcome_test.go:32` — the test still uses the old non-generic `OutcomeWorker` type and the old mixed push/LA outcome behavior.
- `internal/workers/outcome_test.go:78` — `fakeOutcomeWriter.ApplyLAOutcomeBatch` still accepts `[]model.SendOutcome` instead of `[]model.LASendOutcome`.
## Required outcome
- `go test ./cmd/pushboy ./internal/server ./internal/workers` compiles and runs successfully.
- Tests exercise the new split push and LA outcome workers instead of the removed mixed-worker contract.
## Suggested approach
Update the stale stubs and outcome tests to instantiate `OutcomeWorker[model.SendOutcome]` and `OutcomeWorker[model.LASendOutcome]` through the new constructors, or rewrite the assertions around the public constructors rather than the removed mixed `processOutcome` contract.GPT | 𝕏
There was a problem hiding this comment.
Important
The new APNs pooling commit introduces a fresh package compile failure.
Reviewed changes — Reviewed the delta after prior Pullfrog review 34d6c26, which updates APNs delivery behavior.
- Added APNs client pooling —
internal/apns.Clientnow maintains a four-client HTTP pool and round-robins sends throughhttpClients. - Adjusted Live Activity timestamp fallback —
internal/apns/apns_la.gonow usesLiveActivityRequest.CreatedAtas the APNs timestamp whenoptions.timestampis omitted.
GPT | 𝕏
There was a problem hiding this comment.
ℹ️ No new issues found in the new dispatcher split; prior unresolved review threads still stand.
Reviewed changes — Reviewed the delta since prior Pullfrog review 3a32311, which separates runtime dispatcher clients between push and Live Activity senders.
- Separated dispatcher maps —
cmd/pushboy/main.gonow builds distinctdispatchersandlaDispatchersmaps during APNs/FCM startup. - Created LA-specific clients — startup now creates separate APNs and FCM client instances for the Live Activity lane.
- Routed LA senders through LA dispatchers —
DispatchLATasknow receiveslaDispatchers, while push senders continue using the push dispatcher map.
GPT | 𝕏
There was a problem hiding this comment.
✅ No new issues found in the new test-fix commit.
Reviewed changes — Reviewed the delta since prior Pullfrog review 0faa704, which updates stale tests after the Live Activity lane split and APNs client pooling changes.
- Fixed APNs pooling tests —
internal/apns/apns_test.gonow constructsClientwithhttpClients, matching the pooled client implementation. - Updated server LA lane tests —
internal/server/server_test.gonow uses a dedicatedmodel.LAJobItempipeline stub and verifies LA dispatches enqueue the LA-specific job shape. - Updated storage and outcome tests —
internal/storageandinternal/workerstests now usemodel.LASendOutcomeand the split push/LA outcome worker constructors. - Adjusted fanout query assertions —
internal/storage/live_activity_fanout_test.gonow asserts the selected split token-query path instead of the removed association-flag argument.
Validation: go test ./cmd/pushboy ./internal/server ./internal/workers ./internal/apns ./internal/storage and go test ./... both pass.
GPT | 𝕏
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fefec9b1d6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| Base: &http.Transport{ | ||
| ForceAttemptHTTP2: true, | ||
| MaxIdleConns: 100, | ||
| MaxIdleConnsPerHost: 100, | ||
| IdleConnTimeout: 90 * time.Second, |
There was a problem hiding this comment.
Preserve proxy settings for FCM transports
In deployments that rely on HTTP_PROXY/HTTPS_PROXY, replacing oauth2.NewClient with this custom http.Transport makes FCM requests bypass the environment proxy because Proxy is left nil. The previous default transport honored http.ProxyFromEnvironment, so these environments will start failing to connect to FCM unless the custom Base transport carries that setting.
Useful? React with 👍 / 👎.
|
@codex review |
There was a problem hiding this comment.
Important
The new supersede error branch still lets Live Activity fanout proceed when the stale-update guard fails.
Reviewed changes — Reviewed the delta since prior Pullfrog review fefec9b, which adjusts dispatch and Live Activity fanout error handling.
- Removed redundant push dispatcher assertion —
DispatchPushTasknow calls the selecteddispatch.Dispatcherdirectly after the platform map lookup. - Corrected supersede-check error logging —
FanoutLATokensnow logsSupersedeLADispatchIfStaleerrors as errors instead of logging that the dispatch was superseded.
Validation: go test ./internal/workers passed.
GPT | 𝕏
| func FanoutLATokens(ctx context.Context, store storage.Store, job model.LAJobItem, batchSize int, emit func(context.Context, model.LASendTask) error) error { | ||
| superseded, err := store.SupersedeLADispatchIfStale(ctx, job.DispatchID) | ||
| if err != nil { | ||
| log.Printf("Error checking supersede state for dispatch %s: %v", job.DispatchID, err) |
There was a problem hiding this comment.
SupersedeLADispatchIfStale returns an error: the worker logs the failed guard, then continues and may enqueue a Live Activity update whose stale status was never established. Because this guard is the only thing preventing an older update from faning out after a newer one exists, the error path should stop this dispatch instead of treating the stale check as best-effort.
Technical details
# Supersede-check errors continue fanout
## Affected sites
- `internal/workers/fanout_la.go:15-18` — the initial stale-update guard logs an error and continues before marking the dispatch `IN_PROGRESS`.
- `internal/workers/fanout_la.go:31-34` — the per-batch guard has the same fail-open behavior after some state may already have changed.
- `internal/storage/live_activity_postgres.go:725-743` — `SupersedeLADispatchIfStale` is an `UPDATE` guard; when it fails, the caller does not know whether the current dispatch should be suppressed.
## Required outcome
- A failed stale-update guard must not enqueue tasks as if the dispatch were current.
- The dispatch should end in an explicit retryable or failed state rather than silently continuing after the guard error.
## Suggested approach
Return the supersede-check error from `FanoutLATokens` before fanout starts. For the in-loop check, avoid leaving already-emitted work unaccounted for; this may share the same fix shape as the existing open thread about mid-fanout superseding.There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d0bcb0e595
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| WHERE lata.activity_id = $1 | ||
| AND lata.token_id > $2 | ||
| AND lat.invalidated_at IS NULL | ||
| AND (lat.expires_at IS NULL OR lat.expires_at > NOW()) | ||
| AND lat.token_type = 'update' |
There was a problem hiding this comment.
Keep activity fanout constrained to the job scope
For update/end fanout after the start dispatch is no longer pending, GetLATokenBatchForDispatch uses this activity-only query and no longer constrains lat.user_id to the live activity job's user or topic scope. Because RegisterLAToken can associate an update token with a supplied activityId, any token from another user/topic that has the same activity association will be selected here and receive that activity's updates/end events; the previous query kept the user/topic predicate in addition to the association check.
Useful? React with 👍 / 👎.
…rain, proxy, enqueue-failure accounting - SupersedeLADispatchIfStale ignores FAILED/SUPERSEDED newer dispatches and records the emitted task count on the superseded row - LA fanout fails closed when the pre-fanout stale check errors - Mid-fanout supersede flushes collected enqueue failures before returning - MemoryPipeline drains buffered items after Close so graceful shutdown does not drop queued work - Restore http.ProxyFromEnvironment on APNs and FCM transports - Align .env.example LA_WORKER_COUNT with the code default
There was a problem hiding this comment.
ℹ️ No new issues found in the new commits; one prior Pullfrog thread about the in-loop supersede-check error path remains open.
Reviewed changes — Reviewed the delta since prior Pullfrog review d0bcb0e, which addresses several Live Activity lane review items and shutdown/drain behavior.
- Aligned LA worker defaults —
.env.examplenow documentsLA_WORKER_COUNT=5, matchingconfig.Load. - Restored proxy-aware transports — APNs and FCM pooled HTTP transports now use
http.ProxyFromEnvironment. - Made the initial LA stale guard fail closed —
FanoutLATokensnow fails the dispatch before fanout when the initialSupersedeLADispatchIfStalecall errors. - Narrowed supersede semantics —
SupersedeLADispatchIfStaleno longer lets newerFAILEDorSUPERSEDEDupdate dispatches suppress an older update. - Accounted for mid-fanout supersedes — LA fanout now records the emitted count on supersede and flushes enqueue failures before returning.
- Drained closed memory pipelines —
MemoryPipeline.Receivenow returns buffered items after close so graceful shutdown can drain queued work.
Validation: go test ./internal/pipeline ./internal/workers ./internal/apns ./internal/fcm ./internal/storage passed.
GPT | 𝕏

Live Activity jobs previously shared the push notification job, task, and outcome pipelines, so a large LA fanout could block timed pushes (and vice versa — head-of-line blocking under backpressure).
Changes
LA_WORKER_COUNT,LA_SENDER_COUNT,LA_QUEUE_SIZEconfig for sizing the LA lane independentlydrainChainhelper; DB closes only after both lanes finishcmd/pushboy, lane-routing and push/LA isolation ininternal/server