Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions backend/src/core/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ import type { Redis } from 'ioredis'

const BATCH_UPLOADS_PAGE_SIZE = 100

const UPLOAD_DONE_STATUSES = new Set([
'completed',
'failed',
'duplicate',
'duplicated_sdc_updated',
'duplicated_sdc_not_updated',
])

const BATCH_RETRIEVAL_CHUNK_SIZE = 100

type SessionUserWithAuth = SessionUser & {
Expand Down Expand Up @@ -644,23 +636,40 @@ export class Handler {
}

private startUploadStream(batchid: number): ReturnType<typeof setTimeout> {
let lastSerialized: string | null = null
let cursorTime: Date = new Date(0)
// Track last-sent state per id: catches same-second re-updates that id-ordering misses
const sentState = new Map<number, string>()
const poll = async () => {
try {
const items = await this.getAllUploadsForBatch(batchid)
const updateItems = items.map(toUploadUpdateItem)
const serialized = JSON.stringify(updateItems)
if (serialized !== lastSerialized) {
// minId=0 makes this equivalent to updated_at >= cursorTime; sentState deduplicates
const candidates = await this.services.uploads.getUploadsByBatchChangedSinceWithIdCursor(
batchid,
cursorTime,
0,
)
const toSend = candidates.filter((r) => {
const key = `${r.status}|${JSON.stringify(r.error)}|${r.success ?? ''}`
if (sentState.get(r.id) === key) return false
sentState.set(r.id, key)
return true
})
if (toSend.length > 0) {
this.sender.send({
type: 'UPLOADS_UPDATE',
data: updateItems,
data: toSend.map(toUploadUpdateItem),
partial: true,
nonce: nonce(),
})
lastSerialized = serialized
}
const total = await this.services.batches.countUploadsInBatch(batchid)
const completed = items.filter((i) => UPLOAD_DONE_STATUSES.has(i.status)).length
if (total > 0 && completed >= total) {
if (candidates.length > 0) {
const maxTime = candidates[candidates.length - 1]!.updated_at
if (maxTime > cursorTime) cursorTime = maxTime
}
const [total, active] = await Promise.all([
this.services.batches.countUploadsInBatch(batchid),
this.services.uploads.countActiveUploadsInBatch(batchid),
])
if (total > 0 && active === 0) {
this.sender.send({
type: 'UPLOADS_COMPLETE',
data: batchid,
Expand Down
40 changes: 40 additions & 0 deletions backend/src/db/dal/uploads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,4 +529,44 @@ export class UploadService {

return { items, total }
}

async getUploadsByBatchChangedSinceWithIdCursor(
batchId: number,
since: Date,
minId: number,
): Promise<SafeUploadRow[]> {
const {
access_token: _,
collection: __,
copyright_override: ___,
celery_task_id: ____,
...cols
} = getTableColumns(uploadRequests)
return this.db
.select(cols)
.from(uploadRequests)
.where(
and(
eq(uploadRequests.batchid, batchId),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Cursor Skips Earlier Rows

The (updated_at, id) cursor still misses later changes to a row that was already behind the current id boundary. For example, after the stream sends rows through (T, id=10), a later status change to id=5 can still have updated_at = T when timestamps share the same second. This predicate then requires id > 10 for rows at T, so the changed upload is excluded from every later poll and the UI can stay on the old status.

Fix in Claude Code

or(
gt(uploadRequests.updated_at, since),
and(eq(uploadRequests.updated_at, since), gt(uploadRequests.id, minId)),
),
),
)
.orderBy(asc(uploadRequests.updated_at), asc(uploadRequests.id))
}

async countActiveUploadsInBatch(batchId: number): Promise<number> {
const [row] = await this.db
.select({ n: count(uploadRequests.id) })
.from(uploadRequests)
.where(
and(
eq(uploadRequests.batchid, batchId),
inArray(uploadRequests.status, ['queued', 'in_progress']),
),
)
return row?.n ?? 0
}
}
1 change: 1 addition & 0 deletions backend/src/types/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ export const SubscribedSchema = t.Object({
export const UploadsUpdateSchema = t.Object({
type: t.Literal('UPLOADS_UPDATE'),
data: t.Array(UploadUpdateItemSchema),
partial: t.Boolean(),
nonce: t.String(),
})

Expand Down
1 change: 0 additions & 1 deletion frontend/src/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ initCollectionsListeners()
<Header />
<DevAuthBanner v-if="isDev" />
<MaintenanceBanner />
<BetaBanner />

<template v-if="auth.isAuthenticated">
<div
Expand Down
8 changes: 5 additions & 3 deletions frontend/src/composables/useCollections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export const initCollectionsListeners = () => {
}

// Update batch uploads list if present
const index = newBatchUploads.findIndex((u) => u.key === update.key)
const index = newBatchUploads.findIndex((u) => u.id === update.id)
if (index !== -1) {
batchUploadsChanged = true
const upload = { ...newBatchUploads[index] } as BatchUploadItem
Expand All @@ -141,11 +141,13 @@ export const initCollectionsListeners = () => {
stats: {
...store.batch.stats,
queued: newBatchUploads.filter((u) => u.status === UPLOAD_STATUS.Queued).length,
in_progress: newBatchUploads.filter((u) => u.status === UPLOAD_STATUS.InProgress).length,
in_progress: newBatchUploads.filter((u) => u.status === UPLOAD_STATUS.InProgress)
.length,
completed: newBatchUploads.filter((u) => u.status === UPLOAD_STATUS.Completed).length,
failed: newBatchUploads.filter((u) => u.status === UPLOAD_STATUS.Failed).length,
cancelled: newBatchUploads.filter((u) => u.status === UPLOAD_STATUS.Cancelled).length,
duplicate: newBatchUploads.filter((u) => isDuplicateStatus(u.status as UploadStatus)).length,
duplicate: newBatchUploads.filter((u) => isDuplicateStatus(u.status as UploadStatus))
.length,
},
}
}
Expand Down