diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 027881f..ef8e91c 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -533,7 +533,8 @@ func (p *Processor) processFile(ctx context.Context, msg *goqite.Message, job *q func (p *Processor) handleProcessingError(ctx context.Context, msg *goqite.Message, job *queue.FileJob, jobID string, err error) error { slog.ErrorContext(ctx, "Error processing file", - "error", err, + "error", err.Error(), + "errorType", fmt.Sprintf("%T", err), "path", job.Path, "retryCount", job.RetryCount, "maxRetries", maxRetries, @@ -558,16 +559,26 @@ func (p *Processor) handleProcessingError(ctx context.Context, msg *goqite.Messa if markErr := p.queue.MarkAsError(ctx, msg.ID, job, err.Error()); markErr != nil { slog.ErrorContext(ctx, "Failed to mark job as error", "error", markErr, "path", job.Path) - // Re-add to queue as a fallback + // Re-add to queue as a fallback. Clear the in_progress row first so the + // new goqite entry doesn't appear alongside a stale tracking row. + if clearErr := p.queue.ClearInProgress(ctx, msg.ID); clearErr != nil { + slog.WarnContext(ctx, "Failed to clear in-progress row before re-add", "error", clearErr, "path", job.Path) + } if readdErr := p.queue.ReaddJob(ctx, job); readdErr != nil { slog.ErrorContext(ctx, "Failed to re-add job to queue", "error", readdErr, "path", job.Path) } } - } else { - // Re-add the job to the queue for retry - if readdErr := p.queue.ReaddJob(ctx, job); readdErr != nil { - slog.ErrorContext(ctx, "Failed to re-add job to queue for retry", "error", readdErr, "path", job.Path) - } + return nil + } + + // Retry path: clear the in_progress tracking row for the old msg.ID before + // re-adding, otherwise the same path is visible as two entries (the new + // pending goqite row + the stale in_progress row keyed by the old ID). + if clearErr := p.queue.ClearInProgress(ctx, msg.ID); clearErr != nil { + slog.WarnContext(ctx, "Failed to clear in-progress row before retry", "error", clearErr, "path", job.Path) + } + if readdErr := p.queue.ReaddJob(ctx, job); readdErr != nil { + slog.ErrorContext(ctx, "Failed to re-add job to queue for retry", "error", readdErr, "path", job.Path) } return nil diff --git a/internal/queue/queue.go b/internal/queue/queue.go index b96bebd..bcabf30 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -1606,6 +1606,18 @@ func (q *Queue) ReaddJob(ctx context.Context, job *FileJob) error { }) } +// ClearInProgress removes the in-progress tracking row for the given message ID. +// Call this when a retry is about to be re-queued under a new ID, otherwise the +// old in_progress row leaks and the same path appears as duplicate UI entries +// across the pending goqite row and the stale in_progress row. +func (q *Queue) ClearInProgress(ctx context.Context, msgID goqite.ID) error { + _, err := q.db.ExecContext(ctx, "DELETE FROM in_progress_items WHERE id = ?", string(msgID)) + if err != nil { + return fmt.Errorf("failed to clear in-progress item %s: %w", string(msgID), err) + } + return nil +} + // SetQueueItemPriority updates the priority of a pending queue item by id func (q *Queue) SetQueueItemPriority(id string, priority int) error { // Get the job body for the given id diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go new file mode 100644 index 0000000..668288f --- /dev/null +++ b/internal/queue/queue_test.go @@ -0,0 +1,132 @@ +package queue + +import ( + "context" + "path/filepath" + "testing" + + "github.com/javi11/postie/internal/config" + "github.com/javi11/postie/internal/database" +) + +// newTestQueue creates an isolated Queue backed by a temp sqlite DB with all +// migrations applied. The DB is removed automatically when the test ends. +func newTestQueue(t *testing.T) *Queue { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + ctx := context.Background() + db, err := database.New(ctx, config.DatabaseConfig{ + DatabaseType: "sqlite", + DatabasePath: dbPath, + }) + if err != nil { + t.Fatalf("database.New: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + + if err := db.GetMigrationRunner().MigrateUp(); err != nil { + t.Fatalf("MigrateUp: %v", err) + } + + q, err := New(ctx, db) + if err != nil { + t.Fatalf("queue.New: %v", err) + } + t.Cleanup(func() { _ = q.Close() }) + return q +} + +func countRows(t *testing.T, q *Queue, table, where string, args ...any) int { + t.Helper() + var n int + query := "SELECT COUNT(*) FROM " + table + if where != "" { + query += " WHERE " + where + } + if err := q.db.QueryRow(query, args...).Scan(&n); err != nil { + t.Fatalf("count %s: %v", table, err) + } + return n +} + +// TestRetryDoesNotLeakInProgressRows reproduces the duplicate-entries bug. +// Without ClearInProgress before ReaddJob, each retry leaks an in_progress +// row keyed by the previous goqite message ID, causing the same path to +// appear in two tables simultaneously (pending goqite + stale in_progress). +func TestRetryDoesNotLeakInProgressRows(t *testing.T) { + q := newTestQueue(t) + ctx := context.Background() + + const path = "/tmp/example.bin" + if err := q.AddFile(ctx, path, 1234); err != nil { + t.Fatalf("AddFile: %v", err) + } + + // Simulate 3 receive→fail→retry cycles, mirroring processor.handleProcessingError. + for i := range 3 { + msg, job, err := q.ReceiveFile(ctx) + if err != nil { + t.Fatalf("ReceiveFile #%d: %v", i, err) + } + if msg == nil || job == nil { + t.Fatalf("ReceiveFile #%d returned nil", i) + } + + // Invariant: while processing, exactly one in_progress row for this path. + if got := countRows(t, q, "in_progress_items", "path = ?", path); got != 1 { + t.Fatalf("cycle %d: in_progress rows = %d, want 1", i, got) + } + + // Simulate the retry path in handleProcessingError. + if err := q.ClearInProgress(ctx, msg.ID); err != nil { + t.Fatalf("ClearInProgress: %v", err) + } + job.RetryCount++ + if err := q.ReaddJob(ctx, job); err != nil { + t.Fatalf("ReaddJob: %v", err) + } + + // After clear+readd: exactly one pending goqite row, zero in_progress rows. + if got := countRows(t, q, "in_progress_items", "path = ?", path); got != 0 { + t.Fatalf("cycle %d: in_progress leak after ClearInProgress: %d rows", i, got) + } + if got := countRows(t, q, "goqite", "queue = 'file_jobs' AND json_extract(body, '$.path') = ?", path); got != 1 { + t.Fatalf("cycle %d: pending goqite rows = %d, want 1", i, got) + } + } +} + +// TestIsPathInQueueDuringReceive verifies the path is always visible to +// IsPathInQueue during ReceiveFile (insert-then-delete ordering). +func TestIsPathInQueueDuringReceive(t *testing.T) { + q := newTestQueue(t) + ctx := context.Background() + + const path = "/tmp/race.bin" + if err := q.AddFile(ctx, path, 100); err != nil { + t.Fatalf("AddFile: %v", err) + } + + if ok, err := q.IsPathInQueue(path); err != nil || !ok { + t.Fatalf("before receive: IsPathInQueue=%v err=%v, want true,nil", ok, err) + } + + msg, _, err := q.ReceiveFile(ctx) + if err != nil || msg == nil { + t.Fatalf("ReceiveFile: msg=%v err=%v", msg, err) + } + + if ok, err := q.IsPathInQueue(path); err != nil || !ok { + t.Fatalf("after receive (in_progress): IsPathInQueue=%v err=%v, want true,nil", ok, err) + } + + if err := q.ClearInProgress(ctx, msg.ID); err != nil { + t.Fatalf("ClearInProgress: %v", err) + } + + if ok, err := q.IsPathInQueue(path); err != nil || ok { + t.Fatalf("after clear: IsPathInQueue=%v err=%v, want false,nil", ok, err) + } +}