Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .env.test
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# DBs
TEST_POSTGRES_URL="localhost:35432"
TEST_CLICKHOUSE_URL="localhost:39000"
TEST_REDIS_URL="localhost:36379"
TEST_DRAGONFLY_URL="localhost:37379"
# MQs
TEST_RABBITMQ_URL="localhost:35672"
TEST_LOCALSTACK_URL="localhost:34566"
Expand Down
9 changes: 0 additions & 9 deletions build/test/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ services:
dockerfile: ./build/test/Dockerfile.mock
ports:
- 35555:5555
redis:
image: redis/redis-stack-server:latest
ports:
- 36379:6379
dragonfly:
image: docker.dragonflydb.io/dragonflydb/dragonfly
command: ["--proactor_threads=1", "--maxmemory=256mb"]
ports:
- 37379:6379
clickhouse:
image: clickhouse/clickhouse-server:24-alpine
ports:
Expand Down
39 changes: 39 additions & 0 deletions cmd/e2e/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,45 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() {
s.Equal(1, manualCount, "should have exactly one manual retry attempt")
}

func (s *basicSuite) TestRetry_DuplicateManualRetryExecutesBoth() {
tenant := s.createTenant()
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

eventID := idgen.Event()
s.publish(tenant.ID, "user.created", map[string]any{
"test": "duplicate_manual_retry",
}, withEventID(eventID))

// Wait for initial delivery
s.waitForNewAttempts(tenant.ID, 1)

// First manual retry
status := s.retryEvent(eventID, dest.ID)
s.Equal(http.StatusAccepted, status)
s.waitForNewAttempts(tenant.ID, 2)

// Second manual retry for same event+destination
status = s.retryEvent(eventID, dest.ID)
s.Equal(http.StatusAccepted, status)
s.waitForNewAttempts(tenant.ID, 3)

// Verify: 3 attempts total, 2 manual
var resp struct {
Models []map[string]any `json:"models"`
}
status = s.doJSON(http.MethodGet, s.apiURL("/attempts?tenant_id="+tenant.ID+"&event_id="+eventID+"&dir=asc"), nil, &resp)
s.Require().Equal(http.StatusOK, status)
s.Require().Len(resp.Models, 3, "should have 3 attempts: initial + 2 manual retries")

manualCount := 0
for _, atm := range resp.Models {
if manual, ok := atm["manual"].(bool); ok && manual {
manualCount++
}
}
s.Equal(2, manualCount, "should have exactly 2 manual retry attempts")
}

func (s *basicSuite) TestRetry_ManualRetryOnDisabledDestinationRejected() {
tenant := s.createTenant()
dest := s.createWebhookDestination(tenant.ID, "*")
Expand Down
2 changes: 1 addition & 1 deletion contributing/test.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Integration and e2e tests require external services like ClickHouse, LocalStack,

### Why persistent infrastructure?

Lightweight services like Redis start quickly, but heavier dependencies like LocalStack (AWS) or GCP emulators can take 15-30 seconds to initialize. With persistent infrastructure, you pay this cost once and get fast iteration from then on.
Redis and Dragonfly always use testcontainers (one container per test) since they start quickly. Heavier dependencies like LocalStack (AWS) or GCP emulators can take 15-30 seconds to initialize. With persistent infrastructure, you pay this cost once and get fast iteration from then on.

To run the test infrastructure:

Expand Down
14 changes: 13 additions & 1 deletion internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,21 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
return h.handleError(msg, &PreDeliveryError{err: err})
}

err = h.idempotence.Exec(ctx, idempotencyKeyFromDeliveryTask(task), func(ctx context.Context) error {
executed := false
idempotencyKey := idempotencyKeyFromDeliveryTask(task)
err = h.idempotence.Exec(ctx, idempotencyKey, func(ctx context.Context) error {
executed = true
return h.doHandle(ctx, task, destination)
})
if err == nil && !executed {
h.logger.Ctx(ctx).Info("delivery task skipped (idempotent)",
zap.String("event_id", task.Event.ID),
zap.String("tenant_id", task.Event.TenantID),
zap.String("destination_id", task.DestinationID),
zap.Int("attempt", task.Attempt),
zap.Bool("manual", task.Manual),
zap.String("idempotency_key", idempotencyKey))
}
return h.handleError(msg, err)
}

Expand Down
60 changes: 60 additions & 0 deletions internal/deliverymq/messagehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,66 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina
}
}

func TestManualDelivery_DuplicateRetry(t *testing.T) {
// Test scenario:
// - First manual retry for event+destination succeeds
// - Second manual retry for same event+destination is requested
// - Second retry should also execute (not be blocked by idempotency)
//
// Manual retries are explicit user actions and should always execute,
// even if a previous manual retry for the same event+destination already succeeded.

// Setup test data
tenant := models.Tenant{ID: idgen.String()}
destination := testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithTenantID(tenant.ID),
)
event := testutil.EventFactory.Any(
testutil.EventFactory.WithTenantID(tenant.ID),
testutil.EventFactory.WithDestinationID(destination.ID),
)

// Setup mocks
destGetter := &mockDestinationGetter{dest: &destination}
retryScheduler := newMockRetryScheduler()
publisher := newMockPublisher([]error{nil, nil}) // Both succeed
logPublisher := newMockLogPublisher(nil)
alertMonitor := newMockAlertMonitor()

// Setup message handler with Redis for idempotency
redis := testutil.CreateTestRedisClient(t)
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
logPublisher,
destGetter,
publisher,
testutil.NewMockEventTracer(nil),
retryScheduler,
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Step 1: First manual retry succeeds
task1 := models.NewManualDeliveryTask(event, destination.ID)
mockMsg1, msg1 := newDeliveryMockMessage(task1)
err := handler.Handle(context.Background(), msg1)
require.NoError(t, err)
assert.True(t, mockMsg1.acked, "first manual retry should be acked")
assert.Equal(t, 1, publisher.current, "first manual retry should publish")
require.Len(t, logPublisher.entries, 1, "first manual retry should log delivery")

// Step 2: Second manual retry for same event+destination should also execute
task2 := models.NewManualDeliveryTask(event, destination.ID)
mockMsg2, msg2 := newDeliveryMockMessage(task2)
err = handler.Handle(context.Background(), msg2)
require.NoError(t, err)
assert.True(t, mockMsg2.acked, "second manual retry should be acked")
assert.Equal(t, 2, publisher.current, "second manual retry should also publish")
require.Len(t, logPublisher.entries, 2, "second manual retry should also log delivery")
}

func TestMessageHandler_RetryID_MultipleDestinations(t *testing.T) {
// Test scenario:
// - One event is delivered to TWO different destinations
Expand Down
14 changes: 11 additions & 3 deletions internal/models/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"encoding/json"

"github.com/hookdeck/outpost/internal/idgen"
"github.com/hookdeck/outpost/internal/mqs"
)

Expand Down Expand Up @@ -38,6 +39,7 @@ type DeliveryTask struct {
DestinationID string `json:"destination_id"`
Attempt int `json:"attempt"`
Manual bool `json:"manual"`
Nonce string `json:"nonce,omitempty"`
Telemetry *DeliveryTelemetry `json:"telemetry,omitempty"`
}

Expand All @@ -56,12 +58,16 @@ func (t *DeliveryTask) ToMessage() (*mqs.Message, error) {
}

// IdempotencyKey returns the key used for idempotency checks.
// Uses Event.ID + DestinationID + Manual flag.
// Manual retries get a different key so they can bypass idempotency of failed automatic deliveries.
// Manual retries include a nonce so each /retry request gets its own idempotency key,
// while MQ redeliveries of the same message (same nonce) are still deduplicated.
// Nonce was added to fix a regression from #653 where removing DeliveryEvent.ID
// made the manual retry idempotency key static per event+destination.
func (t *DeliveryTask) IdempotencyKey() string {
if t.Manual {
return t.Event.ID + ":" + t.DestinationID + ":manual"
return t.Event.ID + ":" + t.DestinationID + ":manual:" + t.Nonce
}
// Non-manual deliveries share a key per event+destination. On failure, the
// idempotency key is cleared so the scheduled retry can execute with the same key.
return t.Event.ID + ":" + t.DestinationID
}

Expand All @@ -81,9 +87,11 @@ func NewDeliveryTask(event Event, destinationID string) DeliveryTask {
}

// NewManualDeliveryTask creates a new DeliveryTask for a manual retry.
// Each manual retry gets a unique nonce so separate /retry requests are not deduplicated.
func NewManualDeliveryTask(event Event, destinationID string) DeliveryTask {
task := NewDeliveryTask(event, destinationID)
task.Manual = true
task.Nonce = idgen.String()
return task
}

Expand Down
35 changes: 29 additions & 6 deletions internal/rsmq/rsmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// RSMQSuite runs RSMQ tests against different backends.
type RSMQSuite struct {
suite.Suite
cfg *redis.RedisConfig
client RedisClient
rsmq *RedisSMQ
}
Expand All @@ -30,10 +31,21 @@ func TestRedisRSMQSuite(t *testing.T) {

func TestDragonflyRSMQSuite(t *testing.T) { suite.Run(t, new(DragonflyRSMQSuite)) }

func (s *RedisRSMQSuite) SetupTest() {
func (s *RedisRSMQSuite) SetupSuite() {
testinfra.Start(s.T())
cfg := testinfra.NewRedisConfig(s.T())
client, err := redis.New(context.Background(), cfg)
s.cfg = testinfra.NewRedisConfig(s.T())
}

func (s *RedisRSMQSuite) SetupTest() {
// Flush the container's DB 0 before each test method for a clean state.
flushClient, err := redis.New(context.Background(), s.cfg)
if err != nil {
s.T().Fatalf("failed to create redis client for flush: %v", err)
}
flushClient.FlushDB(context.Background())
flushClient.Close()

client, err := redis.New(context.Background(), s.cfg)
if err != nil {
s.T().Fatalf("failed to create redis client: %v", err)
}
Expand All @@ -42,10 +54,21 @@ func (s *RedisRSMQSuite) SetupTest() {
s.rsmq = NewRedisSMQ(s.client, "test")
}

func (s *DragonflyRSMQSuite) SetupTest() {
func (s *DragonflyRSMQSuite) SetupSuite() {
testinfra.Start(s.T())
cfg := testinfra.NewDragonflyConfig(s.T())
client, err := redis.New(context.Background(), cfg)
s.cfg = testinfra.NewDragonflyConfig(s.T())
}

func (s *DragonflyRSMQSuite) SetupTest() {
// Flush the container's DB 0 before each test method for a clean state.
flushClient, err := redis.New(context.Background(), s.cfg)
if err != nil {
s.T().Fatalf("failed to create redis client for flush: %v", err)
}
flushClient.FlushDB(context.Background())
flushClient.Close()

client, err := redis.New(context.Background(), s.cfg)
if err != nil {
s.T().Fatalf("failed to create redis client: %v", err)
}
Expand Down
Loading
Loading