Skip to content

Make upload-slice sending resilient to WebSocket reconnects#108

Merged
DaxServer merged 6 commits into
mainfrom
feat/upload-slice-resend-resilience
Jul 4, 2026
Merged

Make upload-slice sending resilient to WebSocket reconnects#108
DaxServer merged 6 commits into
mainfrom
feat/upload-slice-resend-resilience

Conversation

@DaxServer

Copy link
Copy Markdown
Owner

The frontend now sends all upload slices for a batch upfront instead of waiting for each ack before sending the next, and resends any slice whose ack wasn't received after a WebSocket reconnect (useCollections.ts, useSocket.ts now exposes a reactive connected ref).

Resending a slice requires the backend to be idempotent: createUploadRequestsForBatch now upserts against a new unique index on (batchid, key) instead of plain-inserting, and returns each row's real current status plus an isNew flag so the handler only runs rate-limiting/enqueueing for genuinely new rows (resent rows are skipped, avoiding duplicate BullMQ jobs and unnecessary rate-limit consumption).

Also bundled: per-user rate limits are now cached in Redis for an hour instead of being recomputed from the MediaWiki API on every upload-slice call.

Fixed a pre-existing test-isolation bug surfaced by this work — upload.worker.test.ts and upload.worker.duplicate.test.ts globally mocked @backend/db/dal/uploads via mock.module(), which silently broke uploads.dal.test.ts's real import of UploadService when the full suite ran. Replaced with dependency injection (WorkerDeps.uploads, typed via Pick<UploadService, ...>) so the worker tests no longer need to mock the whole module.

— Claude Sonnet 5

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
@greptile-apps

greptile-apps Bot commented Jul 4, 2026

Copy link
Copy Markdown
Contributor

Confidence Score: 4/5

Safe to merge once the missing Drizzle migration for the unique index is generated and committed — without it, the upsert on reconnect silently inserts a duplicate row instead of no-oping.

The reconnect logic, idempotent upsert, Redis caching, and test-isolation fix are all correctly implemented and well-tested. The one outstanding gap is that schema.ts declares upload_requests_batchid_key_uidx but no migration file was generated and committed. MySQL's ON DUPLICATE KEY UPDATE path in createUploadRequestsForBatch relies on that constraint existing in the live database; without the DDL applied, each reconnect-triggered resend inserts a fresh row, leaving phantom queued rows that never get a BullMQ job and permanently block batch completion queries.

backend/src/db/schema.ts and backend/drizzle/ — the unique index needs a generated migration before deploying.

Important Files Changed

Filename Overview
frontend/src/composables/useCollections.ts Core reconnect logic rewrite: replaces sequential slice index with Set-based ack tracking, adds sendSlice/sendUnackedSlices helpers, and wires a sync watcher on connected to trigger reconnect resend. Logic is clean, guards are correct.
frontend/src/composables/useSocket.ts Promotes _connected boolean to a reactive ref and exports it; all prior callers updated. Pending queue behavior unchanged for non-slice messages.
frontend/src/stores/collections.store.ts Replaces uploadSliceIndex with ackedSliceIds (Set) in state and reset path; straightforward rename with no logic changes.
backend/src/db/dal/uploads.ts createUploadRequestsForBatch now does a pre-check SELECT, upserts with onDuplicateKeyUpdate, and returns isNew flag. Correct logic but depends on the unique index existing in the live database — migration still absent from the PR.
backend/src/db/schema.ts Adds uniqueIndex on (batchid, key) required for idempotent upsert. The DDL declaration is correct but a Drizzle migration has not been generated and committed alongside this schema change.
backend/src/core/rateLimiter.ts getRateLimitForBatch now accepts a Redis instance and caches computed rate limits with a 1-hour TTL; cache-miss path is unchanged.
backend/src/core/handler.ts Correctly threads this.redis to both getRateLimitForBatch call sites, and filters enqueue/rate-limit logic to only newlyCreated rows.
backend/src/workers/upload.worker.ts Adds optional uploads dep injection (WorkerDeps.uploads) with Pick type, falling back to real UploadService. Clean DI pattern.
backend/src/tests/uploads.dal.test.ts New test file covering empty list, fresh insert, upsert idempotency, resent-row detection, and mixed new/resent batches. All cases are meaningful and well-structured.
backend/src/tests/ws.handler.test.ts Adds isNew field to mock return type and a new test verifying resent rows skip enqueueing. Correct assertions on call counts and ACK response shape.
backend/src/tests/rateLimiter.test.ts Three new tests covering cache hit, cache miss with TTL assertion, and idempotency within the cache window. Correctly validates the 3600s EX argument.
backend/src/tests/upload.worker.test.ts Removes mock.module() for uploads DAL; injects mockUploads via WorkerDeps. Fixes test-isolation bug with real UploadService imports in parallel test files.
backend/src/tests/upload.worker.duplicate.test.ts Same mock.module → WorkerDeps.uploads migration as upload.worker.test.ts; adds proper Return type annotation to mockGetById.
frontend/src/composables/tests/useCollections.test.ts Comprehensive test updates: adds effectScope teardown (fixes watcher leak between tests), connected mock ref, and new tests for offline no-send, out-of-order ACK completion, reconnect resend, and stale-ACK null-batchId guard.
frontend/src/composables/tests/useSocket.test.ts Three new tests verifying connected ref lifecycle: false before open, true on open event, false on close event.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant FE as Frontend (useCollections)
    participant WS as WebSocket (useSocket)
    participant BE as Backend (Handler)
    participant DB as MySQL
    participant BQ as BullMQ

    Note over FE,WS: Initial batch upload
    FE->>WS: open()
    WS-->>FE: "onOpen → connected=true (sync watcher fires)"
    FE->>WS: sendUnackedSlices() → UPLOAD_SLICE[0..N]
    WS->>BE: "UPLOAD_SLICE (sliceid=0, items=[...])"
    BE->>DB: SELECT pre-existing keys
    BE->>DB: INSERT … ON DUPLICATE KEY UPDATE
    BE->>DB: SELECT current rows (id, key, status)
    DB-->>BE: rows with isNew flag
    BE->>BQ: "enqueueUpload (only isNew=true rows)"
    BE-->>WS: "UPLOAD_SLICE_ACK (sliceid=0, all row statuses)"
    WS-->>FE: onUploadSliceAck → ackedSliceIds.add(0)

    Note over FE,WS: WebSocket drop & reconnect
    WS-->>FE: "onClose → connected=false"
    Note over FE: Slices 1..N not yet acked
    WS-->>FE: "onOpen → connected=true (sync watcher fires)"
    FE->>FE: onSocketReconnect() → sendUnackedSlices()
    FE->>WS: UPLOAD_SLICE[1..N] (unacked only)
    WS->>BE: "UPLOAD_SLICE (sliceid=1, same items)"
    BE->>DB: "SELECT pre-existing keys → all found (isNew=false)"
    BE->>DB: INSERT … ON DUPLICATE KEY UPDATE (no-op)
    DB-->>BE: "rows (isNew=false, real current status)"
    Note over BE,BQ: Skips rate-limit and enqueue for resent rows
    BE-->>WS: "UPLOAD_SLICE_ACK (sliceid=1, current statuses)"
    WS-->>FE: onUploadSliceAck → ackedSliceIds.add(1)

    Note over FE: All slices acked → sendSubscribeBatch
    FE->>WS: SUBSCRIBE_BATCH(batchId)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant FE as Frontend (useCollections)
    participant WS as WebSocket (useSocket)
    participant BE as Backend (Handler)
    participant DB as MySQL
    participant BQ as BullMQ

    Note over FE,WS: Initial batch upload
    FE->>WS: open()
    WS-->>FE: "onOpen → connected=true (sync watcher fires)"
    FE->>WS: sendUnackedSlices() → UPLOAD_SLICE[0..N]
    WS->>BE: "UPLOAD_SLICE (sliceid=0, items=[...])"
    BE->>DB: SELECT pre-existing keys
    BE->>DB: INSERT … ON DUPLICATE KEY UPDATE
    BE->>DB: SELECT current rows (id, key, status)
    DB-->>BE: rows with isNew flag
    BE->>BQ: "enqueueUpload (only isNew=true rows)"
    BE-->>WS: "UPLOAD_SLICE_ACK (sliceid=0, all row statuses)"
    WS-->>FE: onUploadSliceAck → ackedSliceIds.add(0)

    Note over FE,WS: WebSocket drop & reconnect
    WS-->>FE: "onClose → connected=false"
    Note over FE: Slices 1..N not yet acked
    WS-->>FE: "onOpen → connected=true (sync watcher fires)"
    FE->>FE: onSocketReconnect() → sendUnackedSlices()
    FE->>WS: UPLOAD_SLICE[1..N] (unacked only)
    WS->>BE: "UPLOAD_SLICE (sliceid=1, same items)"
    BE->>DB: "SELECT pre-existing keys → all found (isNew=false)"
    BE->>DB: INSERT … ON DUPLICATE KEY UPDATE (no-op)
    DB-->>BE: "rows (isNew=false, real current status)"
    Note over BE,BQ: Skips rate-limit and enqueue for resent rows
    BE-->>WS: "UPLOAD_SLICE_ACK (sliceid=1, current statuses)"
    WS-->>FE: onUploadSliceAck → ackedSliceIds.add(1)

    Note over FE: All slices acked → sendSubscribeBatch
    FE->>WS: SUBSCRIBE_BATCH(batchId)
Loading

Reviews (4): Last reviewed commit: "chore: increase slice size to 100" | Re-trigger Greptile

Comment thread frontend/src/composables/useCollections.ts
Comment thread backend/src/db/dal/uploads.ts
Comment thread frontend/src/composables/useCollections.ts
…ncurrent dispatch

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
Comment thread frontend/src/composables/useCollections.ts
@DaxServer DaxServer merged commit 5abe3ef into main Jul 4, 2026
4 of 5 checks passed
@DaxServer DaxServer deleted the feat/upload-slice-resend-resilience branch July 4, 2026 12:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant