Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ func (p *Processor) processFile(ctx context.Context, msg *goqite.Message, job *q

func (p *Processor) handleProcessingError(ctx context.Context, msg *goqite.Message, job *queue.FileJob, jobID string, err error) error {
slog.ErrorContext(ctx, "Error processing file",
"error", err,
"error", err.Error(),
"errorType", fmt.Sprintf("%T", err),
"path", job.Path,
"retryCount", job.RetryCount,
"maxRetries", maxRetries,
Expand All @@ -558,16 +559,26 @@ func (p *Processor) handleProcessingError(ctx context.Context, msg *goqite.Messa

if markErr := p.queue.MarkAsError(ctx, msg.ID, job, err.Error()); markErr != nil {
slog.ErrorContext(ctx, "Failed to mark job as error", "error", markErr, "path", job.Path)
// Re-add to queue as a fallback
// Re-add to queue as a fallback. Clear the in_progress row first so the
// new goqite entry doesn't appear alongside a stale tracking row.
if clearErr := p.queue.ClearInProgress(ctx, msg.ID); clearErr != nil {
slog.WarnContext(ctx, "Failed to clear in-progress row before re-add", "error", clearErr, "path", job.Path)
}
if readdErr := p.queue.ReaddJob(ctx, job); readdErr != nil {
slog.ErrorContext(ctx, "Failed to re-add job to queue", "error", readdErr, "path", job.Path)
}
}
} else {
// Re-add the job to the queue for retry
if readdErr := p.queue.ReaddJob(ctx, job); readdErr != nil {
slog.ErrorContext(ctx, "Failed to re-add job to queue for retry", "error", readdErr, "path", job.Path)
}
return nil
}

// Retry path: clear the in_progress tracking row for the old msg.ID before
// re-adding, otherwise the same path is visible as two entries (the new
// pending goqite row + the stale in_progress row keyed by the old ID).
if clearErr := p.queue.ClearInProgress(ctx, msg.ID); clearErr != nil {
slog.WarnContext(ctx, "Failed to clear in-progress row before retry", "error", clearErr, "path", job.Path)
}
if readdErr := p.queue.ReaddJob(ctx, job); readdErr != nil {
slog.ErrorContext(ctx, "Failed to re-add job to queue for retry", "error", readdErr, "path", job.Path)
}

return nil
Expand Down
12 changes: 12 additions & 0 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,18 @@ func (q *Queue) ReaddJob(ctx context.Context, job *FileJob) error {
})
}

// ClearInProgress removes the in-progress tracking row for the given message ID.
// Call this when a retry is about to be re-queued under a new ID, otherwise the
// old in_progress row leaks and the same path appears as duplicate UI entries
// across the pending goqite row and the stale in_progress row.
func (q *Queue) ClearInProgress(ctx context.Context, msgID goqite.ID) error {
_, err := q.db.ExecContext(ctx, "DELETE FROM in_progress_items WHERE id = ?", string(msgID))
if err != nil {
return fmt.Errorf("failed to clear in-progress item %s: %w", string(msgID), err)
}
return nil
}

// SetQueueItemPriority updates the priority of a pending queue item by id
func (q *Queue) SetQueueItemPriority(id string, priority int) error {
// Get the job body for the given id
Expand Down
132 changes: 132 additions & 0 deletions internal/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package queue

import (
"context"
"path/filepath"
"testing"

"github.com/javi11/postie/internal/config"
"github.com/javi11/postie/internal/database"
)

// newTestQueue creates an isolated Queue backed by a temp sqlite DB with all
// migrations applied. The DB is removed automatically when the test ends.
func newTestQueue(t *testing.T) *Queue {
t.Helper()
dir := t.TempDir()
dbPath := filepath.Join(dir, "test.db")

ctx := context.Background()
db, err := database.New(ctx, config.DatabaseConfig{
DatabaseType: "sqlite",
DatabasePath: dbPath,
})
if err != nil {
t.Fatalf("database.New: %v", err)
}
t.Cleanup(func() { _ = db.Close() })

if err := db.GetMigrationRunner().MigrateUp(); err != nil {
t.Fatalf("MigrateUp: %v", err)
}

q, err := New(ctx, db)
if err != nil {
t.Fatalf("queue.New: %v", err)
}
t.Cleanup(func() { _ = q.Close() })
return q
}

func countRows(t *testing.T, q *Queue, table, where string, args ...any) int {
t.Helper()
var n int
query := "SELECT COUNT(*) FROM " + table
if where != "" {
query += " WHERE " + where
}
if err := q.db.QueryRow(query, args...).Scan(&n); err != nil {
t.Fatalf("count %s: %v", table, err)
}
return n
}

// TestRetryDoesNotLeakInProgressRows reproduces the duplicate-entries bug.
// Without ClearInProgress before ReaddJob, each retry leaks an in_progress
// row keyed by the previous goqite message ID, causing the same path to
// appear in two tables simultaneously (pending goqite + stale in_progress).
func TestRetryDoesNotLeakInProgressRows(t *testing.T) {
q := newTestQueue(t)
ctx := context.Background()

const path = "/tmp/example.bin"
if err := q.AddFile(ctx, path, 1234); err != nil {
t.Fatalf("AddFile: %v", err)
}

// Simulate 3 receive→fail→retry cycles, mirroring processor.handleProcessingError.
for i := range 3 {
msg, job, err := q.ReceiveFile(ctx)
if err != nil {
t.Fatalf("ReceiveFile #%d: %v", i, err)
}
if msg == nil || job == nil {
t.Fatalf("ReceiveFile #%d returned nil", i)
}

// Invariant: while processing, exactly one in_progress row for this path.
if got := countRows(t, q, "in_progress_items", "path = ?", path); got != 1 {
t.Fatalf("cycle %d: in_progress rows = %d, want 1", i, got)
}

// Simulate the retry path in handleProcessingError.
if err := q.ClearInProgress(ctx, msg.ID); err != nil {
t.Fatalf("ClearInProgress: %v", err)
}
job.RetryCount++
if err := q.ReaddJob(ctx, job); err != nil {
t.Fatalf("ReaddJob: %v", err)
}

// After clear+readd: exactly one pending goqite row, zero in_progress rows.
if got := countRows(t, q, "in_progress_items", "path = ?", path); got != 0 {
t.Fatalf("cycle %d: in_progress leak after ClearInProgress: %d rows", i, got)
}
if got := countRows(t, q, "goqite", "queue = 'file_jobs' AND json_extract(body, '$.path') = ?", path); got != 1 {
t.Fatalf("cycle %d: pending goqite rows = %d, want 1", i, got)
}
}
}

// TestIsPathInQueueDuringReceive verifies the path is always visible to
// IsPathInQueue during ReceiveFile (insert-then-delete ordering).
func TestIsPathInQueueDuringReceive(t *testing.T) {
q := newTestQueue(t)
ctx := context.Background()

const path = "/tmp/race.bin"
if err := q.AddFile(ctx, path, 100); err != nil {
t.Fatalf("AddFile: %v", err)
}

if ok, err := q.IsPathInQueue(path); err != nil || !ok {
t.Fatalf("before receive: IsPathInQueue=%v err=%v, want true,nil", ok, err)
}

msg, _, err := q.ReceiveFile(ctx)
if err != nil || msg == nil {
t.Fatalf("ReceiveFile: msg=%v err=%v", msg, err)
}

if ok, err := q.IsPathInQueue(path); err != nil || !ok {
t.Fatalf("after receive (in_progress): IsPathInQueue=%v err=%v, want true,nil", ok, err)
}

if err := q.ClearInProgress(ctx, msg.ID); err != nil {
t.Fatalf("ClearInProgress: %v", err)
}

if ok, err := q.IsPathInQueue(path); err != nil || ok {
t.Fatalf("after clear: IsPathInQueue=%v err=%v, want false,nil", ok, err)
}
}
Loading