diff --git a/backend/src/core/handler.ts b/backend/src/core/handler.ts index 2d0ad8e..c266361 100644 --- a/backend/src/core/handler.ts +++ b/backend/src/core/handler.ts @@ -24,14 +24,6 @@ import type { Redis } from 'ioredis' const BATCH_UPLOADS_PAGE_SIZE = 100 -const UPLOAD_DONE_STATUSES = new Set([ - 'completed', - 'failed', - 'duplicate', - 'duplicated_sdc_updated', - 'duplicated_sdc_not_updated', -]) - const BATCH_RETRIEVAL_CHUNK_SIZE = 100 type SessionUserWithAuth = SessionUser & { @@ -644,23 +636,40 @@ export class Handler { } private startUploadStream(batchid: number): ReturnType { - let lastSerialized: string | null = null + let cursorTime: Date = new Date(0) + // Track last-sent state per id: catches same-second re-updates that id-ordering misses + const sentState = new Map() const poll = async () => { try { - const items = await this.getAllUploadsForBatch(batchid) - const updateItems = items.map(toUploadUpdateItem) - const serialized = JSON.stringify(updateItems) - if (serialized !== lastSerialized) { + // minId=0 makes this equivalent to updated_at >= cursorTime; sentState deduplicates + const candidates = await this.services.uploads.getUploadsByBatchChangedSinceWithIdCursor( + batchid, + cursorTime, + 0, + ) + const toSend = candidates.filter((r) => { + const key = `${r.status}|${JSON.stringify(r.error)}|${r.success ?? ''}` + if (sentState.get(r.id) === key) return false + sentState.set(r.id, key) + return true + }) + if (toSend.length > 0) { this.sender.send({ type: 'UPLOADS_UPDATE', - data: updateItems, + data: toSend.map(toUploadUpdateItem), + partial: true, nonce: nonce(), }) - lastSerialized = serialized } - const total = await this.services.batches.countUploadsInBatch(batchid) - const completed = items.filter((i) => UPLOAD_DONE_STATUSES.has(i.status)).length - if (total > 0 && completed >= total) { + if (candidates.length > 0) { + const maxTime = candidates[candidates.length - 1]!.updated_at + if (maxTime > cursorTime) cursorTime = maxTime + } + const [total, active] = await Promise.all([ + this.services.batches.countUploadsInBatch(batchid), + this.services.uploads.countActiveUploadsInBatch(batchid), + ]) + if (total > 0 && active === 0) { this.sender.send({ type: 'UPLOADS_COMPLETE', data: batchid, diff --git a/backend/src/db/dal/uploads.ts b/backend/src/db/dal/uploads.ts index 9687ae3..028db6e 100644 --- a/backend/src/db/dal/uploads.ts +++ b/backend/src/db/dal/uploads.ts @@ -529,4 +529,44 @@ export class UploadService { return { items, total } } + + async getUploadsByBatchChangedSinceWithIdCursor( + batchId: number, + since: Date, + minId: number, + ): Promise { + const { + access_token: _, + collection: __, + copyright_override: ___, + celery_task_id: ____, + ...cols + } = getTableColumns(uploadRequests) + return this.db + .select(cols) + .from(uploadRequests) + .where( + and( + eq(uploadRequests.batchid, batchId), + or( + gt(uploadRequests.updated_at, since), + and(eq(uploadRequests.updated_at, since), gt(uploadRequests.id, minId)), + ), + ), + ) + .orderBy(asc(uploadRequests.updated_at), asc(uploadRequests.id)) + } + + async countActiveUploadsInBatch(batchId: number): Promise { + const [row] = await this.db + .select({ n: count(uploadRequests.id) }) + .from(uploadRequests) + .where( + and( + eq(uploadRequests.batchid, batchId), + inArray(uploadRequests.status, ['queued', 'in_progress']), + ), + ) + return row?.n ?? 0 + } } diff --git a/backend/src/types/ws.ts b/backend/src/types/ws.ts index 0a2dc3a..80f9934 100644 --- a/backend/src/types/ws.ts +++ b/backend/src/types/ws.ts @@ -386,6 +386,7 @@ export const SubscribedSchema = t.Object({ export const UploadsUpdateSchema = t.Object({ type: t.Literal('UPLOADS_UPDATE'), data: t.Array(UploadUpdateItemSchema), + partial: t.Boolean(), nonce: t.String(), }) diff --git a/frontend/src/App.vue b/frontend/src/App.vue index e6cb5d7..87568c4 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -19,7 +19,6 @@ initCollectionsListeners()
-