From bac33ec9352bbd56efe4578a09a56306b8a0b616 Mon Sep 17 00:00:00 2001 From: DaxServer Date: Sun, 28 Jun 2026 15:53:46 +0200 Subject: [PATCH 1/4] feat: redesign UPLOADS_UPDATE to send only changed items (partial updates) Co-Authored-By: Claude Sonnet 4.6 --- backend/src/core/handler.ts | 41 ++++++++++------------ backend/src/db/dal/uploads.ts | 37 +++++++++++++++++++ backend/src/types/ws.ts | 1 + frontend/src/App.vue | 1 - frontend/src/composables/useCollections.ts | 8 +++-- 5 files changed, 62 insertions(+), 26 deletions(-) diff --git a/backend/src/core/handler.ts b/backend/src/core/handler.ts index 2d0ad8e..5b550e6 100644 --- a/backend/src/core/handler.ts +++ b/backend/src/core/handler.ts @@ -24,14 +24,6 @@ import type { Redis } from 'ioredis' const BATCH_UPLOADS_PAGE_SIZE = 100 -const UPLOAD_DONE_STATUSES = new Set([ - 'completed', - 'failed', - 'duplicate', - 'duplicated_sdc_updated', - 'duplicated_sdc_not_updated', -]) - const BATCH_RETRIEVAL_CHUNK_SIZE = 100 type SessionUserWithAuth = SessionUser & { @@ -644,23 +636,28 @@ export class Handler { } private startUploadStream(batchid: number): ReturnType { - let lastSerialized: string | null = null + let lastUpdateTime: Date | null = null const poll = async () => { try { - const items = await this.getAllUploadsForBatch(batchid) - const updateItems = items.map(toUploadUpdateItem) - const serialized = JSON.stringify(updateItems) - if (serialized !== lastSerialized) { - this.sender.send({ - type: 'UPLOADS_UPDATE', - data: updateItems, - nonce: nonce(), - }) - lastSerialized = serialized + const current = await this.services.uploads.getLatestUploadUpdateTime(batchid) + if (current && (!lastUpdateTime || current > lastUpdateTime)) { + const since = lastUpdateTime ?? new Date(0) + const changed = await this.services.uploads.getUploadsByBatchChangedSince(batchid, since) + if (changed.length > 0) { + this.sender.send({ + type: 'UPLOADS_UPDATE', + data: changed.map(toUploadUpdateItem), + partial: true, + nonce: nonce(), + }) + } + lastUpdateTime = current } - const total = await this.services.batches.countUploadsInBatch(batchid) - const completed = items.filter((i) => UPLOAD_DONE_STATUSES.has(i.status)).length - if (total > 0 && completed >= total) { + const [total, active] = await Promise.all([ + this.services.batches.countUploadsInBatch(batchid), + this.services.uploads.countActiveUploadsInBatch(batchid), + ]) + if (total > 0 && active === 0) { this.sender.send({ type: 'UPLOADS_COMPLETE', data: batchid, diff --git a/backend/src/db/dal/uploads.ts b/backend/src/db/dal/uploads.ts index 9687ae3..0c28d8c 100644 --- a/backend/src/db/dal/uploads.ts +++ b/backend/src/db/dal/uploads.ts @@ -14,6 +14,7 @@ import { inArray, like, lt, + max, or, sql, } from 'drizzle-orm' @@ -529,4 +530,40 @@ export class UploadService { return { items, total } } + + async getLatestUploadUpdateTime(batchId: number): Promise { + const [row] = await this.db + .select({ t: max(uploadRequests.updated_at) }) + .from(uploadRequests) + .where(eq(uploadRequests.batchid, batchId)) + return row?.t ?? null + } + + async getUploadsByBatchChangedSince(batchId: number, since: Date): Promise { + const { + access_token: _, + collection: __, + copyright_override: ___, + celery_task_id: ____, + ...cols + } = getTableColumns(uploadRequests) + return this.db + .select(cols) + .from(uploadRequests) + .where(and(eq(uploadRequests.batchid, batchId), gt(uploadRequests.updated_at, since))) + .orderBy(asc(uploadRequests.id)) + } + + async countActiveUploadsInBatch(batchId: number): Promise { + const [row] = await this.db + .select({ n: count(uploadRequests.id) }) + .from(uploadRequests) + .where( + and( + eq(uploadRequests.batchid, batchId), + inArray(uploadRequests.status, ['queued', 'in_progress']), + ), + ) + return row?.n ?? 0 + } } diff --git a/backend/src/types/ws.ts b/backend/src/types/ws.ts index 0a2dc3a..80f9934 100644 --- a/backend/src/types/ws.ts +++ b/backend/src/types/ws.ts @@ -386,6 +386,7 @@ export const SubscribedSchema = t.Object({ export const UploadsUpdateSchema = t.Object({ type: t.Literal('UPLOADS_UPDATE'), data: t.Array(UploadUpdateItemSchema), + partial: t.Boolean(), nonce: t.String(), }) diff --git a/frontend/src/App.vue b/frontend/src/App.vue index e6cb5d7..87568c4 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -19,7 +19,6 @@ initCollectionsListeners()
-