Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions frontend/src/hooks/useTaskSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,27 @@ export function useTaskSubscription({
const onPhaseRef = useRef(onPhase)
const onOutputRef = useRef(onOutput)
const esRef = useRef<EventSource | null>(null)
const pollIntervalRef = useRef<ReturnType<typeof setInterval> | null>(null)
const pollTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const reconnectAttemptRef = useRef(0)
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const lastStatusRef = useRef<string | null>(null)
const cleanupRef = useRef(false)
const versionRef = useRef(0)

onStatusRef.current = onStatus
onPhaseRef.current = onPhase
onOutputRef.current = onOutput

const cleanupAll = useCallback(() => {
cleanupRef.current = true
versionRef.current += 1
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
if (pollIntervalRef.current) {
clearInterval(pollIntervalRef.current)
pollIntervalRef.current = null
if (pollTimerRef.current) {
clearTimeout(pollTimerRef.current)
pollTimerRef.current = null
}
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current)
Expand All @@ -62,28 +64,39 @@ export function useTaskSubscription({

const startPolling = useCallback(() => {
if (cleanupRef.current) return
const version = versionRef.current + 1
versionRef.current = version
setIsPolling(true)
setIsConnected(false)
pollIntervalRef.current = setInterval(async () => {
if (cleanupRef.current) return

const poll = async () => {
if (cleanupRef.current || versionRef.current !== version) return
try {
const data = await getTaskStatus(taskId) as { status?: string }
if (cleanupRef.current) return
if (cleanupRef.current || versionRef.current !== version) return
if (data.status && data.status !== lastStatusRef.current) {
lastStatusRef.current = data.status
onStatusRef.current?.(data.status)
}
if (data.status && ['completed', 'failed', 'cancelled'].includes(data.status)) {
cleanupAll()
setIsPolling(false)
return
}
} catch {
}
}, pollingInterval)
if (!cleanupRef.current && versionRef.current === version) {
pollTimerRef.current = setTimeout(poll, pollingInterval)
}
}

poll()
}, [taskId, pollingInterval, cleanupAll])

const connectSSE = useCallback(() => {
if (cleanupRef.current) return
const version = versionRef.current + 1
versionRef.current = version
if (esRef.current) {
esRef.current.close()
esRef.current = null
Expand All @@ -94,7 +107,7 @@ export function useTaskSubscription({
esRef.current = es

es.addEventListener('status', (e: MessageEvent) => {
if (cleanupRef.current) return
if (cleanupRef.current || versionRef.current !== version) return
try {
const data = JSON.parse(e.data) as { status: string; scan_phase?: string }
if (data.scan_phase) {
Expand All @@ -114,7 +127,7 @@ export function useTaskSubscription({
})

es.addEventListener('phase', (e: MessageEvent) => {
if (cleanupRef.current) return
if (cleanupRef.current || versionRef.current !== version) return
try {
const data = JSON.parse(e.data) as { scan_phase: string }
if (data.scan_phase) {
Expand All @@ -125,7 +138,7 @@ export function useTaskSubscription({
})

es.addEventListener('output', (e: MessageEvent) => {
if (cleanupRef.current) return
if (cleanupRef.current || versionRef.current !== version) return
try {
const data = JSON.parse(e.data) as { chunk: string }
if (data.chunk) {
Expand All @@ -136,7 +149,7 @@ export function useTaskSubscription({
})

es.onerror = () => {
if (cleanupRef.current) return
if (cleanupRef.current || versionRef.current !== version) return
es.close()
esRef.current = null
setIsConnected(false)
Expand All @@ -154,7 +167,7 @@ export function useTaskSubscription({
}

es.onopen = () => {
if (cleanupRef.current) return
if (cleanupRef.current || versionRef.current !== version) return
reconnectAttemptRef.current = 0
setIsConnected(true)
setIsPolling(false)
Expand Down
23 changes: 15 additions & 8 deletions frontend/src/pages/Scans.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,29 @@ export default function Scans() {
type: "warning",
});

// Ref so the visibilitychange handler always sees the current interval id
const intervalRef = useRef<ReturnType<typeof setInterval> | null>(null);
// Ref so the visibilitychange handler always sees the current timer id
const pollingTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const requestSeqRef = useRef(0);
const abortRef = useRef<AbortController | null>(null);

function scheduleNextPoll() {
pollingTimerRef.current = setTimeout(async () => {
await loadTasks();
if (!abortRef.current?.signal.aborted) {
scheduleNextPoll();
}
}, 5000);
}

function startPolling() {
stopPolling();
intervalRef.current = setInterval(() => {
loadTasks();
}, 5000);
scheduleNextPoll();
}

function stopPolling() {
if (intervalRef.current !== null) {
clearInterval(intervalRef.current);
intervalRef.current = null;
if (pollingTimerRef.current !== null) {
clearTimeout(pollingTimerRef.current);
pollingTimerRef.current = null;
}
}

Expand Down
7 changes: 4 additions & 3 deletions frontend/testing/unit/hooks/useTaskSubscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ describe('useTaskSubscription', () => {

const es = getES()!
await act(() => { es.triggerError() })

// startPolling calls poll() immediately (chained setTimeout), so one call
// happens synchronously before the first interval elapses.
await tickTime(50)
expect(getTaskStatus).toHaveBeenCalledTimes(1)
expect(getTaskStatus).toHaveBeenCalledTimes(2) // initial (direct) + first timer

await tickTime(50)
expect(getTaskStatus).toHaveBeenCalledTimes(2)
expect(getTaskStatus).toHaveBeenCalledTimes(3) // initial + first + second timer
})

it('stops polling on terminal status', async () => {
Expand Down
Loading