diff --git a/backend/src/__tests__/rateLimiter.test.ts b/backend/src/__tests__/rateLimiter.test.ts index a3a5974..7e797e6 100644 --- a/backend/src/__tests__/rateLimiter.test.ts +++ b/backend/src/__tests__/rateLimiter.test.ts @@ -1,6 +1,74 @@ import { makeRedisMock } from '@backend/__tests__/helpers' -import { getNextUploadDelay } from '@backend/core/rateLimiter' -import { describe, expect, it } from 'bun:test' +import { + getNextUploadDelay, + getRateLimitForBatch, + type RateLimitInfo, +} from '@backend/core/rateLimiter' +import type { MediaWikiClient } from '@backend/mediawiki/client' +import { describe, expect, it, mock } from 'bun:test' + +const makeMwClient = ( + overrides: Partial>> = {}, +) => + ({ + getUserRateLimits: mock(async () => ({ + ratelimits: {}, + rights: [], + ...overrides, + })), + }) as unknown as MediaWikiClient + +describe('getRateLimitForBatch', () => { + it('returns cached value without calling MediaWiki on cache hit', async () => { + const cached: RateLimitInfo = { uploadsPerPeriod: 20, periodSeconds: 30 } + const { redis, getMock } = makeRedisMock(JSON.stringify(cached)) + const client = makeMwClient() + + const result = await getRateLimitForBatch('u1', client, redis) + + expect(result).toEqual(cached) + expect(client.getUserRateLimits as ReturnType).not.toHaveBeenCalled() + expect(getMock).toHaveBeenCalledWith(expect.stringContaining('u1')) + }) + + it('calls MediaWiki and caches result with 1-hour TTL on cache miss', async () => { + const { redis, setMock } = makeRedisMock(null) + const client = makeMwClient({ + ratelimits: { upload: { user: { hits: 8, seconds: 60 } } }, + rights: [], + }) + + const result = await getRateLimitForBatch('u1', client, redis) + + expect(client.getUserRateLimits as ReturnType).toHaveBeenCalledTimes(1) + expect(setMock).toHaveBeenCalledTimes(1) + const [key, value, ex, ttl] = ( + setMock.mock.calls as unknown as [string, string, string, number][] + )[0] + expect(key).toContain('u1') + expect(JSON.parse(value)).toEqual(result) + expect(ex).toBe('EX') + expect(ttl).toBe(3600) + }) + + it('does not call MediaWiki again within the cache window', async () => { + let stored: string | null = null + const redis = { + get: mock(async () => stored), + set: mock(async (_k: string, v: string) => { + stored = v + return 'OK' as const + }), + del: mock(async () => 1), + } as unknown as import('ioredis').Redis + const client = makeMwClient() + + await getRateLimitForBatch('u1', client, redis) + await getRateLimitForBatch('u1', client, redis) + + expect(client.getUserRateLimits as ReturnType).toHaveBeenCalledTimes(1) + }) +}) describe('getNextUploadDelay', () => { it('returns the existing delay without updating Redis when uploadsPerPeriod is 0', async () => { diff --git a/backend/src/__tests__/upload.worker.duplicate.test.ts b/backend/src/__tests__/upload.worker.duplicate.test.ts index 5c2ae02..ac5ef03 100644 --- a/backend/src/__tests__/upload.worker.duplicate.test.ts +++ b/backend/src/__tests__/upload.worker.duplicate.test.ts @@ -6,11 +6,13 @@ // // mock.module() is intentionally avoided for @backend/core/crypto and // @backend/mediawiki/client (both imported directly by other test files). -// Instead, WorkerDeps injection supplies mock implementations inline. +// @backend/db/dal/uploads is a real value import in uploads.dal.test.ts, so it is +// injected via WorkerDeps.uploads instead of mock.module() here. // @backend/handlers/mapillary is module-mocked here because no other test // file imports it directly. import { DuplicateUploadError } from '@backend/core/errors' +import type { UploadService } from '@backend/db/dal/uploads' import type { MediaWikiClient } from '@backend/mediawiki/client' import { buildStatementsFromMapillaryImage } from '@backend/mediawiki/sdc' import type { MediaImage } from '@backend/types/ws' @@ -34,15 +36,15 @@ mock.module('@backend/db/client', () => ({ lazyDb: { client: {} } })) const mockUpdateStatus = mock(async () => {}) const mockClearToken = mock(async () => {}) -const mockGetById = mock(async (_id: number) => null as unknown) - -mock.module('@backend/db/dal/uploads', () => ({ - UploadService: class { - updateUploadStatus = mockUpdateStatus - clearUploadAccessToken = mockClearToken - getUploadById = mockGetById - }, -})) +const mockGetById = mock( + async (_id: number): Promise>> => null, +) + +const mockUploads = { + updateUploadStatus: mockUpdateStatus, + clearUploadAccessToken: mockClearToken, + getUploadById: mockGetById, +} // MapillaryHandler is NOT imported by any other test file — safe to mock. const FAKE_IMAGE: MediaImage = { @@ -148,9 +150,13 @@ function setupWorker(clientStub: MediaWikiClient, uploadOverrides: Record makeUpload(uploadOverrides)) + mockGetById.mockImplementation( + async () => + makeUpload(uploadOverrides) as unknown as Awaited>, + ) createUploadWorker({} as Redis, { + uploads: mockUploads, decryptToken: () => ['tok-key', 'tok-secret'], makeClient: () => clientStub, }) diff --git a/backend/src/__tests__/upload.worker.test.ts b/backend/src/__tests__/upload.worker.test.ts index a34659b..402a607 100644 --- a/backend/src/__tests__/upload.worker.test.ts +++ b/backend/src/__tests__/upload.worker.test.ts @@ -1,4 +1,5 @@ import { HashLockError, SourceCdnError, StorageError } from '@backend/core/errors' +import type { UploadService } from '@backend/db/dal/uploads' import { beforeEach, describe, expect, it, mock } from 'bun:test' import type { Redis } from 'ioredis' @@ -7,7 +8,8 @@ import type { Redis } from 'ioredis' // Only modules that no other test file imports directly are safe to mock here. // - bullmq: not imported by any other test ✓ // - @backend/db/client: not imported by any other test ✓ -// - @backend/db/dal/uploads: only type-imported (erased at runtime) by ws.handler.test.ts ✓ +// @backend/db/dal/uploads is a real value import in uploads.dal.test.ts, so it is +// injected via WorkerDeps.uploads instead of mock.module() here. // Capture the processor and event handlers registered by createUploadWorker. let capturedProcessor: ((job: unknown) => Promise) | undefined @@ -28,15 +30,15 @@ mock.module('@backend/db/client', () => ({ lazyDb: { client: {} } })) const mockUpdateStatus = mock(async () => {}) const mockClearToken = mock(async () => {}) -const mockGetById = mock(async (_id: number) => null as unknown) - -mock.module('@backend/db/dal/uploads', () => ({ - UploadService: class { - updateUploadStatus = mockUpdateStatus - clearUploadAccessToken = mockClearToken - getUploadById = mockGetById - }, -})) +const mockGetById = mock( + async (_id: number): Promise>> => null, +) + +const mockUploads = { + updateUploadStatus: mockUpdateStatus, + clearUploadAccessToken: mockClearToken, + getUploadById: mockGetById, +} import { createUploadWorker } from '@backend/workers/upload.worker' @@ -66,7 +68,7 @@ beforeEach(() => { mockUpdateStatus.mockClear() mockClearToken.mockClear() mockGetById.mockClear() - createUploadWorker(mockRedis) + createUploadWorker(mockRedis, { uploads: mockUploads }) }) // === Processor rethrows retryable errors without touching the DB === @@ -157,6 +159,7 @@ describe('upload worker — permanent BullMQ failure marks upload as failed in D const mockGetDelay = mock(async () => 1500) const mockRemoveJob = mock(async () => {}) createUploadWorker(mockRedis, { + uploads: mockUploads, enqueueUpload: mockEnqueue, getNextUploadDelay: mockGetDelay, removeUploadJob: mockRemoveJob, @@ -178,7 +181,11 @@ describe('upload worker — permanent BullMQ failure marks upload as failed in D it('StorageError: permanently fails when requeueCount reaches the limit', async () => { const mockEnqueue = mock(async () => 'new-job-id') const mockGetDelay = mock(async () => 1500) - createUploadWorker(mockRedis, { enqueueUpload: mockEnqueue, getNextUploadDelay: mockGetDelay }) + createUploadWorker(mockRedis, { + uploads: mockUploads, + enqueueUpload: mockEnqueue, + getNextUploadDelay: mockGetDelay, + }) const failedHandler = capturedHandlers.get('failed') expect(failedHandler).toBeDefined() diff --git a/backend/src/__tests__/uploads.dal.test.ts b/backend/src/__tests__/uploads.dal.test.ts new file mode 100644 index 0000000..13d3713 --- /dev/null +++ b/backend/src/__tests__/uploads.dal.test.ts @@ -0,0 +1,129 @@ +import type { DB } from '@backend/db/client' +import { UploadService } from '@backend/db/dal/uploads' +import type { Handler, UploadItem } from '@backend/types/ws' +import { describe, expect, it, mock } from 'bun:test' + +const makeItem = (id: string): UploadItem => ({ + id, + input: 'col-1', + title: `${id}.jpg`, + wikitext: '== wikitext ==', + labels: null, + copyright_override: false, +}) + +const BASE_PARAMS = { + userid: 'u1', + username: 'alice', + batchid: 42, + handler: 'mapillary' as Handler, + encryptedAccessToken: 'tok', +} + +const makeDb = ({ + preExisting, + existing, +}: { + preExisting: { key: string }[] + existing: { id: number; key: string; status: string }[] +}) => { + const insertChain = { + values: mock(() => insertChain), + onDuplicateKeyUpdate: mock(() => Promise.resolve()), + } + const preExistingChain = { + from: mock(() => preExistingChain), + where: mock(() => Promise.resolve(preExisting)), + } + const existingChain = { + from: mock(() => existingChain), + where: mock(() => existingChain), + orderBy: mock(() => Promise.resolve(existing)), + } + let selectCallCount = 0 + const db = { + insert: mock(() => insertChain), + select: mock(() => { + selectCallCount += 1 + return selectCallCount % 2 === 1 ? preExistingChain : existingChain + }), + } as unknown as DB + return { db, insertChain, preExistingChain, existingChain } +} + +describe('createUploadRequestsForBatch', () => { + it('returns empty array and skips DB when items list is empty', async () => { + const { db } = makeDb({ preExisting: [], existing: [] }) + const service = new UploadService(db) + const result = await service.createUploadRequestsForBatch({ ...BASE_PARAMS, items: [] }) + expect(result).toEqual([]) + expect(db.insert).not.toHaveBeenCalled() + expect(db.select).not.toHaveBeenCalled() + }) + + it('marks freshly inserted rows as new with queued status', async () => { + const { db } = makeDb({ + preExisting: [], + existing: [ + { id: 1, key: 'img-1', status: 'queued' }, + { id: 2, key: 'img-2', status: 'queued' }, + ], + }) + const service = new UploadService(db) + const result = await service.createUploadRequestsForBatch({ + ...BASE_PARAMS, + items: [makeItem('img-1'), makeItem('img-2')], + }) + expect(result).toEqual([ + { id: 1, key: 'img-1', status: 'queued', isNew: true }, + { id: 2, key: 'img-2', status: 'queued', isNew: true }, + ]) + }) + + it('uses onDuplicateKeyUpdate so the same slice can be resent on reconnect', async () => { + const { db, insertChain } = makeDb({ + preExisting: [], + existing: [{ id: 1, key: 'img-1', status: 'queued' }], + }) + const service = new UploadService(db) + const params = { ...BASE_PARAMS, items: [makeItem('img-1')] } + + await service.createUploadRequestsForBatch(params) + await service.createUploadRequestsForBatch(params) + + expect(db.insert).toHaveBeenCalledTimes(2) + expect(insertChain.onDuplicateKeyUpdate).toHaveBeenCalledTimes(2) + }) + + it('marks a resent row as not new and reports its real current status', async () => { + const { db } = makeDb({ + preExisting: [{ key: 'img-1' }], + existing: [{ id: 1, key: 'img-1', status: 'in_progress' }], + }) + const service = new UploadService(db) + const params = { ...BASE_PARAMS, items: [makeItem('img-1')] } + + const result = await service.createUploadRequestsForBatch(params) + + expect(result).toEqual([{ id: 1, key: 'img-1', status: 'in_progress', isNew: false }]) + }) + + it('reports a mix of new and resent rows independently within the same slice', async () => { + const { db } = makeDb({ + preExisting: [{ key: 'img-1' }], + existing: [ + { id: 1, key: 'img-1', status: 'completed' }, + { id: 2, key: 'img-2', status: 'queued' }, + ], + }) + const service = new UploadService(db) + const params = { ...BASE_PARAMS, items: [makeItem('img-1'), makeItem('img-2')] } + + const result = await service.createUploadRequestsForBatch(params) + + expect(result).toEqual([ + { id: 1, key: 'img-1', status: 'completed', isNew: false }, + { id: 2, key: 'img-2', status: 'queued', isNew: true }, + ]) + }) +}) diff --git a/backend/src/__tests__/ws.handler.test.ts b/backend/src/__tests__/ws.handler.test.ts index 028e3da..d98d590 100644 --- a/backend/src/__tests__/ws.handler.test.ts +++ b/backend/src/__tests__/ws.handler.test.ts @@ -51,7 +51,7 @@ const mockRetrySelectedUploadsToNewBatch = mock(async () => ({ newBatchId: 0, })) const mockCreateUploadRequestsForBatch = mock( - async () => [] as { id: number; key: string; status: string }[], + async () => [] as { id: number; key: string; status: string; isNew: boolean }[], ) const mockCancelBatchDal = mock(async () => new Map()) const mockUpdateJobTaskId = mock(async () => undefined) @@ -679,8 +679,8 @@ describe('Handler.uploadSlice (batch found, items created)', () => { const { handler, sender } = makeHandler() mockGetBatch.mockImplementation(async () => fakeBatchItem({ id: 3, edit_group_id: 'eg-abc' })) mockCreateUploadRequestsForBatch.mockImplementation(async () => [ - { id: 101, key: 'img-1', status: 'queued' }, - { id: 102, key: 'img-2', status: 'queued' }, + { id: 101, key: 'img-1', status: 'queued', isNew: true }, + { id: 102, key: 'img-2', status: 'queued', isNew: true }, ]) mockGetRateLimitForBatch.mockImplementation(async () => ({ uploadsPerPeriod: 10, @@ -712,6 +712,49 @@ describe('Handler.uploadSlice (batch found, items created)', () => { expect(msg!.data[0]!.id).toBe('img-1') expect(mockEnqueueUpload).toHaveBeenCalledTimes(2) }) + + it('does not re-enqueue or re-consume rate limit for rows that already existed (resent slice)', async () => { + const { handler, sender } = makeHandler() + mockGetBatch.mockImplementation(async () => fakeBatchItem({ id: 3, edit_group_id: 'eg-abc' })) + mockCreateUploadRequestsForBatch.mockImplementation(async () => [ + { id: 101, key: 'img-1', status: 'in_progress', isNew: false }, + { id: 102, key: 'img-2', status: 'queued', isNew: true }, + ]) + mockGetRateLimitForBatch.mockImplementation(async () => ({ + uploadsPerPeriod: 10, + periodSeconds: 60, + })) + mockGetNextUploadDelay.mockImplementation(async () => 0) + mockEnqueueUpload.mockImplementation(async () => 'job-1') + mockUpdateJobTaskId.mockImplementation(async () => undefined) + + await handler.uploadSlice({ + batchid: 3, + sliceid: 1, + items: [ + { id: 'img-1', input: 'seq-1', title: 'File1.jpg', wikitext: '...' }, + { id: 'img-2', input: 'seq-1', title: 'File2.jpg', wikitext: '...' }, + ], + }) + + expect(mockGetNextUploadDelay).toHaveBeenCalledTimes(1) + expect(mockEnqueueUpload).toHaveBeenCalledTimes(1) + expect(mockUpdateJobTaskId).toHaveBeenCalledTimes(1) + expect(mockUpdateJobTaskId).toHaveBeenCalledWith(102, 'job-1') + + const msg = sender.messages.find((m) => m.type === 'UPLOAD_SLICE_ACK') as + | { + type: 'UPLOAD_SLICE_ACK' + data: { id: string; status: string }[] + sliceid: number + } + | undefined + expect(msg).toBeDefined() + expect(msg!.data).toEqual([ + { id: 'img-1', status: 'in_progress' }, + { id: 'img-2', status: 'queued' }, + ]) + }) }) describe('Handler.uploadSlice (batch not found)', () => { diff --git a/backend/src/core/handler.ts b/backend/src/core/handler.ts index c266361..76a2533 100644 --- a/backend/src/core/handler.ts +++ b/backend/src/core/handler.ts @@ -231,7 +231,11 @@ export class Handler { return } const mwRetry = new MediaWikiClient(this.user.access_token) - const rateLimitRetry = await this.rateLimiter.getRateLimitForBatch(this.userid, mwRetry) + const rateLimitRetry = await this.rateLimiter.getRateLimitForBatch( + this.userid, + mwRetry, + this.redis, + ) for (const uploadId of newUploadIds) { const delayMs = await this.rateLimiter.getNextUploadDelay( this.userid, @@ -520,10 +524,11 @@ export class Handler { handler: handlerName, encryptedAccessToken, }) - if (created.length > 0) { + const newlyCreated = created.filter((c) => c.isNew) + if (newlyCreated.length > 0) { const mw = new MediaWikiClient(this.user.access_token) - const rateLimit = await this.rateLimiter.getRateLimitForBatch(this.userid, mw) - for (const c of created) { + const rateLimit = await this.rateLimiter.getRateLimitForBatch(this.userid, mw, this.redis) + for (const c of newlyCreated) { const delayMs = await this.rateLimiter.getNextUploadDelay( this.userid, rateLimit, diff --git a/backend/src/core/rateLimiter.ts b/backend/src/core/rateLimiter.ts index 38a8dbd..4ca70c8 100644 --- a/backend/src/core/rateLimiter.ts +++ b/backend/src/core/rateLimiter.ts @@ -9,6 +9,9 @@ export interface RateLimitInfo { const RATE_LIMIT_DEFAULT_NORMAL = 10 const RATE_LIMIT_DEFAULT_PERIOD = 60 const NEXT_AVAILABLE_KEY = 'ratelimit:{userid}:next_available' +const RATE_LIMIT_INFO_KEY = 'ratelimit:{userid}:limits' +const RATE_LIMIT_CACHE_TTL = 3600 + const NO_RATE_LIMIT: RateLimitInfo = { uploadsPerPeriod: 9999, periodSeconds: 60 } export const RATE_LIMIT_DEFAULT: RateLimitInfo = { @@ -41,39 +44,42 @@ function moreRestrictive( } export async function getRateLimitForBatch( - _userid: string, + userid: string, client: MediaWikiClient, + redis: Redis, ): Promise { + const cacheKey = RATE_LIMIT_INFO_KEY.replace('{userid}', userid) + const cached = await redis.get(cacheKey) + if (cached) return JSON.parse(cached) as RateLimitInfo + const { ratelimits, rights } = await client.getUserRateLimits() - if (rights.includes('noratelimit')) { - return NO_RATE_LIMIT - } + let result: RateLimitInfo - const uploadLimits = ratelimits.upload ?? {} - const editLimits = ratelimits.edit ?? {} + if (rights.includes('noratelimit')) { + result = NO_RATE_LIMIT + } else { + const uploadLimits = ratelimits.upload ?? {} + const editLimits = ratelimits.edit ?? {} - const bestUpload = mostPermissive(uploadLimits) - const bestEdit = mostPermissive(editLimits) + const bestUpload = mostPermissive(uploadLimits) + const bestEdit = mostPermissive(editLimits) - const adjustedEdit = - bestEdit !== null - ? ([Math.max(1, Math.floor(bestEdit[0] / 2)), bestEdit[1]] as [number, number]) - : null + const adjustedEdit = + bestEdit !== null + ? ([Math.max(1, Math.floor(bestEdit[0] / 2)), bestEdit[1]] as [number, number]) + : null - const effective = moreRestrictive(bestUpload, adjustedEdit) + const effective = moreRestrictive(bestUpload, adjustedEdit) - if (effective === null) { - return { - uploadsPerPeriod: RATE_LIMIT_DEFAULT_NORMAL, - periodSeconds: RATE_LIMIT_DEFAULT_PERIOD, - } + result = + effective === null + ? { uploadsPerPeriod: RATE_LIMIT_DEFAULT_NORMAL, periodSeconds: RATE_LIMIT_DEFAULT_PERIOD } + : { uploadsPerPeriod: effective[0], periodSeconds: effective[1] } } - return { - uploadsPerPeriod: effective[0], - periodSeconds: effective[1], - } + await redis.set(cacheKey, JSON.stringify(result), 'EX', RATE_LIMIT_CACHE_TTL) + return result } export async function getNextUploadDelay( diff --git a/backend/src/db/dal/uploads.ts b/backend/src/db/dal/uploads.ts index 028db6e..533cd04 100644 --- a/backend/src/db/dal/uploads.ts +++ b/backend/src/db/dal/uploads.ts @@ -328,8 +328,15 @@ export class UploadService { items: UploadItem[] handler: Handler encryptedAccessToken: string - }): Promise<{ id: number; key: string; status: UploadStatus }[]> { + }): Promise<{ id: number; key: string; status: UploadStatus; isNew: boolean }[]> { if (items.length === 0) return [] + const keys = items.map((it) => it.id) + const preExisting = await this.db + .select({ key: uploadRequests.key }) + .from(uploadRequests) + .where(and(eq(uploadRequests.batchid, batchid), inArray(uploadRequests.key, keys))) + const preExistingKeys = new Set(preExisting.map((r) => r.key)) + const rows = items.map((it) => ({ batchid, userid, @@ -349,14 +356,21 @@ export class UploadService { created_at: sql`CURRENT_TIMESTAMP`, updated_at: sql`CURRENT_TIMESTAMP`, })) - await this.db.insert(uploadRequests).values(rows) - const keys = rows.map((r) => r.key) - const inserted = await this.db - .select({ id: uploadRequests.id, key: uploadRequests.key }) + await this.db + .insert(uploadRequests) + .values(rows) + .onDuplicateKeyUpdate({ set: { id: sql`${uploadRequests.id}` } }) + const existing = await this.db + .select({ id: uploadRequests.id, key: uploadRequests.key, status: uploadRequests.status }) .from(uploadRequests) .where(and(eq(uploadRequests.batchid, batchid), inArray(uploadRequests.key, keys))) .orderBy(asc(uploadRequests.id)) - return inserted.map((r) => ({ id: r.id, key: r.key, status: 'queued' as const })) + return existing.map((r) => ({ + id: r.id, + key: r.key, + status: r.status as UploadStatus, + isNew: !preExistingKeys.has(r.key), + })) } async markUploadsExpired(ids: number[]): Promise { diff --git a/backend/src/db/schema.ts b/backend/src/db/schema.ts index 3fbf87f..2992c06 100644 --- a/backend/src/db/schema.ts +++ b/backend/src/db/schema.ts @@ -7,6 +7,7 @@ import { mysqlTable, text, timestamp, + uniqueIndex, varchar, } from 'drizzle-orm/mysql-core' @@ -116,6 +117,7 @@ export const uploadRequests = mysqlTable( .onUpdateNow(), }, (t) => [ + uniqueIndex('upload_requests_batchid_key_uidx').on(t.batchid, t.key), index('upload_requests_batchid_idx').on(t.batchid), index('upload_requests_userid_idx').on(t.userid), index('upload_requests_status_idx').on(t.status), diff --git a/backend/src/workers/upload.worker.ts b/backend/src/workers/upload.worker.ts index 3d74c70..529b4f4 100644 --- a/backend/src/workers/upload.worker.ts +++ b/backend/src/workers/upload.worker.ts @@ -39,10 +39,11 @@ export type WorkerDeps = { enqueueUpload?: (data: UploadJobData, delayMs: number) => Promise removeUploadJob?: (jobId: string) => Promise getNextUploadDelay?: (userid: string, rateLimit: RateLimitInfo, redis: Redis) => Promise + uploads?: Pick } export function createUploadWorker(redis: Redis, deps?: WorkerDeps): Worker { - const uploads = new UploadService(lazyDb.client) + const uploads = deps?.uploads ?? new UploadService(lazyDb.client) const makeClient = deps?.makeClient ?? ((token) => new MediaWikiClient(token)) const makeHandler = deps?.makeHandler ?? (() => new MapillaryHandler()) const decryptTokenFn = deps?.decryptToken ?? decryptAccessToken diff --git a/frontend/src/composables/__tests__/useCollections.test.ts b/frontend/src/composables/__tests__/useCollections.test.ts index 4e36577..f43170c 100644 --- a/frontend/src/composables/__tests__/useCollections.test.ts +++ b/frontend/src/composables/__tests__/useCollections.test.ts @@ -7,8 +7,8 @@ import type { UploadUpdateItem, } from '@backend/types/ws' import { type Image, type Item, UPLOAD_STATUS } from '@frontend/types/image' -import { type Mock, beforeAll, beforeEach, describe, expect, it, mock } from 'bun:test' -import { ref } from 'vue' +import { type Mock, afterEach, beforeAll, beforeEach, describe, expect, it, mock } from 'bun:test' +import { type EffectScope, effectScope, ref } from 'vue' type BatchesListData = Extract['data'] type BatchUploadsListData = Extract['data'] @@ -19,11 +19,13 @@ import { UPLOAD_SLICE_SIZE } from '@frontend/composables/useCollections' // Mock useSocket export const mockSocketData = ref(null) export const mockSend = mock(() => {}) +export const mockSocketConnected = ref(false) const mockSocketImpl = () => ({ useSocket: { data: mockSocketData, send: mockSend, + connected: mockSocketConnected, }, }) @@ -73,6 +75,7 @@ describe('useCollections Listeners', () => { let initCollectionsListeners: typeof InitCollectionsListenersType let listeners: ReturnType let store: ReturnType + let scope: EffectScope beforeAll(async () => { const mod = await import('../useCollections') @@ -83,7 +86,13 @@ describe('useCollections Listeners', () => { setActivePinia(createTestingPinia({ stubActions: false })) store = useCollectionsStore() mockSend.mockClear() - listeners = initCollectionsListeners() + mockSocketConnected.value = false + scope = effectScope() + listeners = scope.run(() => initCollectionsListeners())! + }) + + afterEach(() => { + scope.stop() }) describe('onUploadsUpdate', () => { @@ -1087,6 +1096,7 @@ describe('useCollections Listeners', () => { describe('onBatchCreated', () => { it('should set batchId and send first slice', () => { + mockSocketConnected.value = true // Mock selectedItems logic via items const newItems: Record = {} for (let i = 0; i < 15; i++) { @@ -1107,7 +1117,7 @@ describe('useCollections Listeners', () => { listeners.onBatchCreated(100) expect(store.batchId).toBe(100) - expect(store.uploadSliceIndex).toBe(0) + expect(store.ackedSliceIds.size).toBe(0) expect(mockSend).toHaveBeenCalled() const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls @@ -1157,7 +1167,32 @@ describe('useCollections Listeners', () => { expect(store.batchId).toBe(0) }) + it('does not send slices while offline, relying on the connected watcher to flush them later', () => { + mockSocketConnected.value = false + const newItems: Record = {} + for (let i = 0; i < 15; i++) { + const id = `img${i}` + newItems[id] = createMockItem({ + id, + meta: { + selected: true, + license: '', + description: { value: '', language: 'en' }, + categories: '', + }, + image: createMockImage({ id }), + }) + } + store.replaceItems(newItems) + + listeners.onBatchCreated(100) + + expect(store.batchId).toBe(100) + expect(mockSend).not.toHaveBeenCalled() + }) + it(`should handle exactly ${UPLOAD_SLICE_SIZE} selected items (one full slice)`, () => { + mockSocketConnected.value = true const newItems: Record = {} for (let i = 0; i < UPLOAD_SLICE_SIZE; i++) { const id = `img${i}` @@ -1177,7 +1212,7 @@ describe('useCollections Listeners', () => { listeners.onBatchCreated(100) expect(store.batchId).toBe(100) - expect(store.uploadSliceIndex).toBe(0) + expect(store.ackedSliceIds.size).toBe(0) expect(store.isBatchCreated).toBe(false) // Not created yet, need to wait for ACK expect(mockSend).toHaveBeenCalled() @@ -1197,7 +1232,8 @@ describe('useCollections Listeners', () => { expect(store.isBatchCreated).toBe(true) }) - it(`should handle exactly ${UPLOAD_SLICE_SIZE * 2} selected items (two full slices)`, () => { + it(`should send all slices immediately for ${UPLOAD_SLICE_SIZE * 2} selected items`, () => { + mockSocketConnected.value = true const newItems: Record = {} for (let i = 0; i < UPLOAD_SLICE_SIZE * 2; i++) { const id = `img${i}` @@ -1216,18 +1252,19 @@ describe('useCollections Listeners', () => { listeners.onBatchCreated(100) - expect(mockSend).toHaveBeenCalled() const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls - const arg = calls[0]![0] - const sentMsg = arg as ClientMessage - expect((sentMsg as UploadSliceMsg).data.items).toHaveLength(UPLOAD_SLICE_SIZE) + const sliceCalls = calls.filter((c) => (c[0] as ClientMessage).type === 'UPLOAD_SLICE') + expect(sliceCalls).toHaveLength(2) + expect((sliceCalls[0]![0] as UploadSliceMsg).data.sliceid).toBe(0) + expect((sliceCalls[1]![0] as UploadSliceMsg).data.sliceid).toBe(1) + expect((sliceCalls[0]![0] as UploadSliceMsg).data.items).toHaveLength(UPLOAD_SLICE_SIZE) + expect((sliceCalls[1]![0] as UploadSliceMsg).data.items).toHaveLength(UPLOAD_SLICE_SIZE) }) }) describe('onUploadSliceAck', () => { - it('should send next slice if index matches', () => { + it('adds the slice id to ackedSliceIds and does not send anything else', () => { store.batchId = 100 - store.uploadSliceIndex = 0 const newItems: Record = {} for (let i = 0; i < UPLOAD_SLICE_SIZE + 5; i++) { @@ -1247,33 +1284,58 @@ describe('useCollections Listeners', () => { listeners.onUploadSliceAck(0, []) - expect(store.uploadSliceIndex).toBe(1) - expect(mockSend).toHaveBeenCalled() - const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls - expect(calls.length).toBeGreaterThan(0) - const arg = calls[0]![0] - const sentMsg = arg as ClientMessage - expect(sentMsg).toMatchObject({ - type: 'UPLOAD_SLICE', - data: { - sliceid: 1, - }, - }) + expect(store.ackedSliceIds.has(0)).toBe(true) + expect(mockSend).not.toHaveBeenCalled() + }) + + it('ignores a stale ack when no batch is in progress, without permanently locking status checking', () => { + store.batchId = null + store.ackedSliceIds = new Set() + + listeners.onUploadSliceAck(0, []) + + expect(mockSend).not.toHaveBeenCalled() + expect(store.isStatusChecking).toBe(false) }) - it('should not send next slice if index does not match', () => { + it('ignores a duplicate ack for an already-acked slice id', () => { store.batchId = 100 - store.uploadSliceIndex = 1 + store.ackedSliceIds = new Set([0]) listeners.onUploadSliceAck(0, []) - expect(store.uploadSliceIndex).toBe(1) + expect(store.ackedSliceIds.size).toBe(1) expect(mockSend).not.toHaveBeenCalled() }) + it('acks arriving out of order still complete the batch once all are seen', () => { + store.batchId = 100 + const newItems: Record = {} + for (let i = 0; i < UPLOAD_SLICE_SIZE * 3; i++) { + const id = `img${i}` + newItems[id] = createMockItem({ + id, + meta: { + selected: true, + license: '', + description: { value: '', language: 'en' }, + categories: '', + }, + image: createMockImage({ id }), + }) + } + store.replaceItems(newItems) + + listeners.onUploadSliceAck(0, []) + listeners.onUploadSliceAck(2, []) + listeners.onUploadSliceAck(1, []) + + expect(store.ackedSliceIds.size).toBe(3) + expect(store.isBatchCreated).toBe(true) + }) + it('should update item statuses from ACK response', () => { store.batchId = 100 - store.uploadSliceIndex = 0 const newItems: Record = {} for (let i = 0; i < 5; i++) { @@ -1300,7 +1362,7 @@ describe('useCollections Listeners', () => { listeners.onUploadSliceAck(0, ackItems) - expect(store.uploadSliceIndex).toBe(1) + expect(store.ackedSliceIds.has(0)).toBe(true) expect(store.items.img0!.meta.status).toBe(UPLOAD_STATUS.Queued) expect(store.items.img1!.meta.status).toBe(UPLOAD_STATUS.Queued) expect(store.items.img2!.meta.status).toBe(UPLOAD_STATUS.InProgress) @@ -1308,9 +1370,8 @@ describe('useCollections Listeners', () => { expect(store.items.img4!.meta.status).toBe(UPLOAD_STATUS.Queued) // Unchanged }) - it('should send slice payload with batchid, handler, and up to UPLOAD_SLICE_SIZE items', () => { - store.batchId = 100 - store.uploadSliceIndex = 0 + it('each sent slice has correct batchid, handler, and up to UPLOAD_SLICE_SIZE items', () => { + mockSocketConnected.value = true store.input = 'input' store.globalLicense = '' @@ -1330,88 +1391,63 @@ describe('useCollections Listeners', () => { } store.replaceItems(newItems) - listeners.onUploadSliceAck(0, []) + listeners.onBatchCreated(100) - expect(mockSend).toHaveBeenCalled() const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls - expect(calls.length).toBeGreaterThan(0) - const arg = calls[0]![0] - const sentMsg = arg as ClientMessage - expect((sentMsg as UploadSliceMsg).data.batchid).toBe(100) - expect((sentMsg as UploadSliceMsg).data.handler).toBe('mapillary') - expect((sentMsg as UploadSliceMsg).data.items).toHaveLength(UPLOAD_SLICE_SIZE) - expect((sentMsg as UploadSliceMsg).data.items[0]).toMatchObject({ + const sliceCalls = calls.filter((c) => (c[0] as ClientMessage).type === 'UPLOAD_SLICE') + expect(sliceCalls.length).toBeGreaterThan(0) + const sentMsg = sliceCalls[0]![0] as UploadSliceMsg + expect(sentMsg.data.batchid).toBe(100) + expect(sentMsg.data.handler).toBe('mapillary') + expect(sentMsg.data.items).toHaveLength(UPLOAD_SLICE_SIZE) + expect(sentMsg.data.items[0]).toMatchObject({ input: 'input', labels: { value: 'd', language: 'en' }, copyright_override: false, }) - expect(typeof (sentMsg as UploadSliceMsg).data.items[0]!.title).toBe('string') - expect(typeof (sentMsg as UploadSliceMsg).data.items[0]!.wikitext).toBe('string') + expect(typeof sentMsg.data.items[0]!.title).toBe('string') + expect(typeof sentMsg.data.items[0]!.wikitext).toBe('string') }) it('should handle undefined slice ID', () => { store.batchId = 100 - store.uploadSliceIndex = 0 listeners.onUploadSliceAck(undefined as unknown as number, []) - expect(store.uploadSliceIndex).toBe(0) + expect(store.ackedSliceIds.size).toBe(0) expect(mockSend).not.toHaveBeenCalled() }) it('should handle null slice ID', () => { store.batchId = 100 - store.uploadSliceIndex = 0 listeners.onUploadSliceAck(null as unknown as number, []) - expect(store.uploadSliceIndex).toBe(0) + expect(store.ackedSliceIds.size).toBe(0) expect(mockSend).not.toHaveBeenCalled() }) it('should handle NaN slice ID', () => { store.batchId = 100 - store.uploadSliceIndex = 0 listeners.onUploadSliceAck(NaN, []) - expect(store.uploadSliceIndex).toBe(0) + expect(store.ackedSliceIds.size).toBe(0) expect(mockSend).not.toHaveBeenCalled() }) it('should handle negative slice ID', () => { store.batchId = 100 - store.uploadSliceIndex = 0 listeners.onUploadSliceAck(-1, []) - expect(store.uploadSliceIndex).toBe(0) - expect(mockSend).not.toHaveBeenCalled() - }) - - it('should handle slice ID when no batch ID is set', () => { - store.batchId = null - store.uploadSliceIndex = 0 - - listeners.onUploadSliceAck(0, []) - - expect(store.uploadSliceIndex).toBe(1) // Index gets incremented even when batchId is null - expect(mockSend).not.toHaveBeenCalled() - }) - - it('should handle slice ID when uploadSliceIndex is negative', () => { - store.batchId = 100 - store.uploadSliceIndex = -1 - - listeners.onUploadSliceAck(0, []) - - expect(store.uploadSliceIndex).toBe(-1) + expect(store.ackedSliceIds.size).toBe(0) expect(mockSend).not.toHaveBeenCalled() }) it('should complete all slices and subscribe to batch', () => { store.batchId = 100 - store.uploadSliceIndex = 1 + store.ackedSliceIds = new Set([0]) store.isLoading = true const newItems: Record = {} @@ -1432,24 +1468,17 @@ describe('useCollections Listeners', () => { listeners.onUploadSliceAck(1, []) - expect(store.uploadSliceIndex).toBe(2) + expect(store.ackedSliceIds.size).toBe(2) expect(store.isBatchCreated).toBe(true) expect(mockSend).toHaveBeenCalled() const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls - expect(calls.length).toBeGreaterThan(0) const arg = calls[calls.length - 1]![0] const sentMsg = arg as ClientMessage - - // With UPLOAD_SLICE_SIZE + 5 items, slice index 2 means start at UPLOAD_SLICE_SIZE * 2, - // which is >= UPLOAD_SLICE_SIZE + 5, so it should complete - if (sentMsg.type === 'UPLOAD_SLICE') { - expect(sentMsg.data.items).toHaveLength(0) // Empty slice, should trigger subscription - } + expect(sentMsg).toMatchObject({ type: 'SUBSCRIBE_BATCH', data: 100 }) }) - it('should handle copyright_override when license is set on item', () => { - store.batchId = 100 - store.uploadSliceIndex = 0 + it('sets copyright_override true when item license is set', () => { + mockSocketConnected.value = true store.input = 'input' store.globalLicense = '' @@ -1469,24 +1498,18 @@ describe('useCollections Listeners', () => { } store.replaceItems(newItems) - listeners.onUploadSliceAck(0, []) + listeners.onBatchCreated(100) - expect(mockSend).toHaveBeenCalled() const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls - // Get the first UPLOAD_SLICE message - const uploadSliceCall = calls.find((call) => { - const msg = call[0] as ClientMessage - return msg.type === 'UPLOAD_SLICE' - }) + const uploadSliceCall = calls.find((c) => (c[0] as ClientMessage).type === 'UPLOAD_SLICE') expect(uploadSliceCall).toBeDefined() const sentMsg = uploadSliceCall![0] as UploadSliceMsg expect(sentMsg.data.items.length).toBeGreaterThan(0) expect(sentMsg.data.items[0]!.copyright_override).toBe(true) }) - it('should handle copyright_override when globalLicense is set', () => { - store.batchId = 100 - store.uploadSliceIndex = 0 + it('sets copyright_override true when globalLicense is set', () => { + mockSocketConnected.value = true store.input = 'input' store.globalLicense = 'CC-BY-4.0' @@ -1506,19 +1529,96 @@ describe('useCollections Listeners', () => { } store.replaceItems(newItems) - listeners.onUploadSliceAck(0, []) + listeners.onBatchCreated(100) - expect(mockSend).toHaveBeenCalled() const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls - // Get the first UPLOAD_SLICE message - const uploadSliceCall = calls.find((call) => { - const msg = call[0] as ClientMessage - return msg.type === 'UPLOAD_SLICE' - }) + const uploadSliceCall = calls.find((c) => (c[0] as ClientMessage).type === 'UPLOAD_SLICE') expect(uploadSliceCall).toBeDefined() const sentMsg = uploadSliceCall![0] as UploadSliceMsg expect(sentMsg.data.items.length).toBeGreaterThan(0) expect(sentMsg.data.items[0]!.copyright_override).toBe(true) }) }) + + describe('onSocketReconnect', () => { + const makeItems = (count: number): Record => { + const items: Record = {} + for (let i = 0; i < count; i++) { + const id = `img${i}` + items[id] = createMockItem({ + id, + meta: { + selected: true, + license: '', + description: { value: '', language: 'en' }, + categories: '', + }, + image: createMockImage({ id }), + }) + } + return items + } + + it('resends unACKed slices when reconnected during an active batch upload', () => { + mockSocketConnected.value = true + store.replaceItems(makeItems(UPLOAD_SLICE_SIZE * 3)) + store.batchId = 100 + store.ackedSliceIds = new Set([0]) + store.isBatchCreated = false + + listeners.onSocketReconnect() + + const calls = (mockSend as Mock<(data: unknown) => void>).mock.calls + const sliceCalls = calls.filter((c) => (c[0] as ClientMessage).type === 'UPLOAD_SLICE') + expect(sliceCalls).toHaveLength(2) + expect((sliceCalls[0]![0] as UploadSliceMsg).data.sliceid).toBe(1) + expect((sliceCalls[1]![0] as UploadSliceMsg).data.sliceid).toBe(2) + }) + + it('does not send anything while still offline', () => { + mockSocketConnected.value = false + store.replaceItems(makeItems(UPLOAD_SLICE_SIZE * 3)) + store.batchId = 100 + store.ackedSliceIds = new Set([0]) + store.isBatchCreated = false + + listeners.onSocketReconnect() + + expect(mockSend).not.toHaveBeenCalled() + }) + + it('does nothing when no batch is in progress', () => { + mockSocketConnected.value = true + store.replaceItems(makeItems(UPLOAD_SLICE_SIZE * 2)) + store.batchId = null + store.isBatchCreated = false + + listeners.onSocketReconnect() + + expect(mockSend).not.toHaveBeenCalled() + }) + + it('does nothing when batch upload is already complete', () => { + mockSocketConnected.value = true + store.replaceItems(makeItems(UPLOAD_SLICE_SIZE * 2)) + store.batchId = 100 + store.isBatchCreated = true + + listeners.onSocketReconnect() + + expect(mockSend).not.toHaveBeenCalled() + }) + + it('does nothing when all slices have been ACKed', () => { + mockSocketConnected.value = true + store.replaceItems(makeItems(UPLOAD_SLICE_SIZE)) + store.batchId = 100 + store.ackedSliceIds = new Set([0]) + store.isBatchCreated = false + + listeners.onSocketReconnect() + + expect(mockSend).not.toHaveBeenCalled() + }) + }) }) diff --git a/frontend/src/composables/__tests__/useSocket.test.ts b/frontend/src/composables/__tests__/useSocket.test.ts index e4678f9..9f8007c 100644 --- a/frontend/src/composables/__tests__/useSocket.test.ts +++ b/frontend/src/composables/__tests__/useSocket.test.ts @@ -209,3 +209,34 @@ describe('useSocket auto-reconnect', () => { expect(doubledDelay).toBe((baseDelay ?? 0) * 2) }) }) + +describe('useSocket connected ref', () => { + beforeEach(() => { + currentWs = null + pendingReconnect = null + pendingDelay = null + socket = createSocketModule(mockTreaty, mockTimer) + }) + + afterEach(() => { + socket.close() + }) + + it('is false before the WebSocket opens', () => { + socket.open() + expect(socket.connected.value).toBe(false) + }) + + it('becomes true when the WebSocket open event fires', () => { + socket.open() + currentWs!.trigger('open') + expect(socket.connected.value).toBe(true) + }) + + it('becomes false when the WebSocket close event fires', () => { + socket.open() + currentWs!.trigger('open') + currentWs!.trigger('close') + expect(socket.connected.value).toBe(false) + }) +}) diff --git a/frontend/src/composables/useCollections.ts b/frontend/src/composables/useCollections.ts index b2a8674..0ebee02 100644 --- a/frontend/src/composables/useCollections.ts +++ b/frontend/src/composables/useCollections.ts @@ -19,7 +19,7 @@ import type { Image, Item } from '@frontend/types/image' import { UPLOAD_STATUS } from '@frontend/types/image' import { markRaw, watch } from 'vue' -export const UPLOAD_SLICE_SIZE = 18 +export const UPLOAD_SLICE_SIZE = 100 const toImage = (mediaImage: MediaImage): Image => ({ ...mediaImage, @@ -83,7 +83,7 @@ export const initCollectionsListeners = () => { const store = useCollectionsStore() const { buildDescription, getEffectiveTitle, wikitext } = useCommons() const { isDuplicateStatus } = useUploadStatus() - const { data, send } = useSocket + const { data, send, connected } = useSocket const sendSubscribeBatch = (batchId: number) => { if (store.isStatusChecking) return @@ -284,22 +284,83 @@ export const initCollectionsListeners = () => { } } + const sendSlice = (sliceIndex: number) => { + if (!store.batchId || !connected.value) return + const start = sliceIndex * UPLOAD_SLICE_SIZE + const end = Math.min(start + UPLOAD_SLICE_SIZE, store.selectedItems.length) + const sliceItems = store.selectedItems.slice(start, end).map((item) => ({ + id: item.id, + input: store.input, + title: getEffectiveTitle(item), + wikitext: wikitext(item), + labels: item.meta.description, + copyright_override: (item.meta.license?.trim() || store.globalLicense.trim()) !== '', + })) + send({ + type: 'UPLOAD_SLICE', + data: { + batchid: store.batchId, + sliceid: sliceIndex, + handler: store.handler, + items: sliceItems, + }, + }) + } + + // Sends every slice not yet acked. Slices skipped here because the socket was + // disconnected are picked up by the same call the next time `connected` flips + // true (see the watch() below) — sendSlice() never queues, so there is only + // ever one delivery path per slice, avoiding double-sends on reconnect. + const sendUnackedSlices = () => { + if (!store.batchId) return + const totalSlices = Math.ceil(store.selectedItems.length / UPLOAD_SLICE_SIZE) + if (totalSlices === 0) { + store.isLoading = false + store.isBatchCreated = true + sendSubscribeBatch(store.batchId) + return + } + for (let i = 0; i < totalSlices; i++) { + if (!store.ackedSliceIds.has(i)) sendSlice(i) + } + } + const onBatchCreated = (batchId: number) => { store.batchId = batchId - store.uploadSliceIndex = 0 - sendNextSlice() + store.ackedSliceIds = new Set() + if (!batchId) return + sendUnackedSlices() } const onUploadSliceAck = (sliceId: number, items: UploadSliceAckItem[]) => { - if (sliceId === store.uploadSliceIndex) { - store.uploadSliceIndex += 1 - items.forEach(({ id, status }) => { - store.updateItem(id, 'status', status as UploadStatus) - }) - sendNextSlice() + if (!store.batchId) return + if (!Number.isInteger(sliceId) || sliceId < 0) return + if (store.ackedSliceIds.has(sliceId)) return + store.ackedSliceIds.add(sliceId) + items.forEach(({ id, status }) => { + store.updateItem(id, 'status', status as UploadStatus) + }) + const totalSlices = Math.ceil(store.selectedItems.length / UPLOAD_SLICE_SIZE) + if (store.ackedSliceIds.size >= totalSlices) { + store.isLoading = false + store.isBatchCreated = true + sendSubscribeBatch(store.batchId!) } } + const onSocketReconnect = () => { + if (!store.batchId || store.isBatchCreated) return + sendUnackedSlices() + } + + watch( + connected, + (isConnected, wasConnected) => { + if (isConnected && wasConnected === false) onSocketReconnect() + }, + { flush: 'sync' }, + ) + const onRetryUploadsResponse = (newBatchId: number) => { store.setRetryNewBatchId(newBatchId) } @@ -362,40 +423,6 @@ export const initCollectionsListeners = () => { } }) - const sendNextSlice = () => { - if (!store.batchId) return - - const totalItems = store.selectedItems.length - const start = store.uploadSliceIndex * UPLOAD_SLICE_SIZE - - if (start >= totalItems) { - store.isLoading = false - store.isBatchCreated = true - sendSubscribeBatch(store.batchId) - return - } - - const end = Math.min(start + UPLOAD_SLICE_SIZE, totalItems) - const sliceItems = store.selectedItems.slice(start, end).map((item) => ({ - id: item.id, - input: store.input, - title: getEffectiveTitle(item), - wikitext: wikitext(item), - labels: item.meta.description, - copyright_override: (item.meta.license?.trim() || store.globalLicense.trim()) !== '', - })) - - send({ - type: 'UPLOAD_SLICE', - data: { - batchid: store.batchId, - sliceid: store.uploadSliceIndex, - handler: store.handler, - items: sliceItems, - }, - }) - } - return { onUploadsUpdate, onUploadsComplete, @@ -410,6 +437,7 @@ export const initCollectionsListeners = () => { onPartialCollectionImages, onBatchCreated, onUploadSliceAck, + onSocketReconnect, onRetryUploadsResponse, onPresetsList, } diff --git a/frontend/src/composables/useSocket.ts b/frontend/src/composables/useSocket.ts index 4577156..80aed1e 100644 --- a/frontend/src/composables/useSocket.ts +++ b/frontend/src/composables/useSocket.ts @@ -25,22 +25,22 @@ export const createSocketModule = ( timer = defaultTimer, ) => { const data = ref(null) + const connected = ref(false) let _ws: WsInstance | null = null let _reconnectDelay = RECONNECT_BASE_DELAY let _reconnectTimer: TimerHandle | null = null let _active = false - let _connected = false let _pendingQueue: ClientMessage[] = [] const _onOpen = () => { - _connected = true + connected.value = true _reconnectDelay = RECONNECT_BASE_DELAY for (const msg of _pendingQueue) _ws?.send(msg) _pendingQueue = [] } const _onClose = () => { - _connected = false + connected.value = false if (_active) _scheduleReconnect() } @@ -55,7 +55,7 @@ export const createSocketModule = ( const open = (isReconnect = false) => { _active = true - _connected = false + connected.value = false if (!isReconnect) { _pendingQueue = [] _reconnectDelay = RECONNECT_BASE_DELAY @@ -75,7 +75,7 @@ export const createSocketModule = ( } const send = (msg: ClientMessage) => { - if (!_connected) { + if (!connected.value) { _pendingQueue.push(msg) return } @@ -93,7 +93,7 @@ export const createSocketModule = ( _ws = null } - return { data, open, send, close } + return { data, connected, open, send, close } } export const useSocket = createSocketModule() diff --git a/frontend/src/stores/collections.store.ts b/frontend/src/stores/collections.store.ts index f89455d..9b430b6 100644 --- a/frontend/src/stores/collections.store.ts +++ b/frontend/src/stores/collections.store.ts @@ -31,7 +31,7 @@ export const useCollectionsStore = defineStore('collections', () => { const isBatchLoading = ref(false) const batchLoadingStatus = ref(null) const totalImageIds = ref([]) - const uploadSliceIndex = ref(0) + const ackedSliceIds = ref>(new Set()) const isBatchCreated = ref(false) const showSelectedOnly = ref(true) const viewMode = ref('list') @@ -318,7 +318,7 @@ export const useCollectionsStore = defineStore('collections', () => { isBatchLoading.value = false batchLoadingStatus.value = null totalImageIds.value = [] - uploadSliceIndex.value = 0 + ackedSliceIds.value = new Set() isBatchCreated.value = false retryNewBatchId.value = null // Don't clear presets - they are user-specific, not collection-specific @@ -387,7 +387,7 @@ export const useCollectionsStore = defineStore('collections', () => { batchLoadingStatus, totalImageIds, loadedCount, - uploadSliceIndex, + ackedSliceIds, isBatchCreated, presets, currentPresetId,