diff --git a/.env.test b/.env.test index 3fa67f98..d71503a8 100644 --- a/.env.test +++ b/.env.test @@ -1,8 +1,6 @@ # DBs TEST_POSTGRES_URL="localhost:35432" TEST_CLICKHOUSE_URL="localhost:39000" -TEST_REDIS_URL="localhost:36379" -TEST_DRAGONFLY_URL="localhost:37379" # MQs TEST_RABBITMQ_URL="localhost:35672" TEST_LOCALSTACK_URL="localhost:34566" diff --git a/build/test/compose.yml b/build/test/compose.yml index 69ec3d2d..44299283 100644 --- a/build/test/compose.yml +++ b/build/test/compose.yml @@ -8,15 +8,6 @@ services: dockerfile: ./build/test/Dockerfile.mock ports: - 35555:5555 - redis: - image: redis/redis-stack-server:latest - ports: - - 36379:6379 - dragonfly: - image: docker.dragonflydb.io/dragonflydb/dragonfly - command: ["--proactor_threads=1", "--maxmemory=256mb"] - ports: - - 37379:6379 clickhouse: image: clickhouse/clickhouse-server:24-alpine ports: diff --git a/cmd/e2e/retry_test.go b/cmd/e2e/retry_test.go index cdb78aab..5e2b4573 100644 --- a/cmd/e2e/retry_test.go +++ b/cmd/e2e/retry_test.go @@ -93,6 +93,45 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() { s.Equal(1, manualCount, "should have exactly one manual retry attempt") } +func (s *basicSuite) TestRetry_DuplicateManualRetryExecutesBoth() { + tenant := s.createTenant() + dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) + + eventID := idgen.Event() + s.publish(tenant.ID, "user.created", map[string]any{ + "test": "duplicate_manual_retry", + }, withEventID(eventID)) + + // Wait for initial delivery + s.waitForNewAttempts(tenant.ID, 1) + + // First manual retry + status := s.retryEvent(eventID, dest.ID) + s.Equal(http.StatusAccepted, status) + s.waitForNewAttempts(tenant.ID, 2) + + // Second manual retry for same event+destination + status = s.retryEvent(eventID, dest.ID) + s.Equal(http.StatusAccepted, status) + s.waitForNewAttempts(tenant.ID, 3) + + // Verify: 3 attempts total, 2 manual + var resp struct { + Models []map[string]any `json:"models"` + } + status = s.doJSON(http.MethodGet, s.apiURL("/attempts?tenant_id="+tenant.ID+"&event_id="+eventID+"&dir=asc"), nil, &resp) + s.Require().Equal(http.StatusOK, status) + s.Require().Len(resp.Models, 3, "should have 3 attempts: initial + 2 manual retries") + + manualCount := 0 + for _, atm := range resp.Models { + if manual, ok := atm["manual"].(bool); ok && manual { + manualCount++ + } + } + s.Equal(2, manualCount, "should have exactly 2 manual retry attempts") +} + func (s *basicSuite) TestRetry_ManualRetryOnDisabledDestinationRejected() { tenant := s.createTenant() dest := s.createWebhookDestination(tenant.ID, "*") diff --git a/contributing/test.md b/contributing/test.md index 0c5975b7..e0a4663e 100644 --- a/contributing/test.md +++ b/contributing/test.md @@ -134,7 +134,7 @@ Integration and e2e tests require external services like ClickHouse, LocalStack, ### Why persistent infrastructure? -Lightweight services like Redis start quickly, but heavier dependencies like LocalStack (AWS) or GCP emulators can take 15-30 seconds to initialize. With persistent infrastructure, you pay this cost once and get fast iteration from then on. +Redis and Dragonfly always use testcontainers (one container per test) since they start quickly. Heavier dependencies like LocalStack (AWS) or GCP emulators can take 15-30 seconds to initialize. With persistent infrastructure, you pay this cost once and get fast iteration from then on. To run the test infrastructure: diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 6f4fa3d6..d6e31590 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -147,9 +147,21 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error { return h.handleError(msg, &PreDeliveryError{err: err}) } - err = h.idempotence.Exec(ctx, idempotencyKeyFromDeliveryTask(task), func(ctx context.Context) error { + executed := false + idempotencyKey := idempotencyKeyFromDeliveryTask(task) + err = h.idempotence.Exec(ctx, idempotencyKey, func(ctx context.Context) error { + executed = true return h.doHandle(ctx, task, destination) }) + if err == nil && !executed { + h.logger.Ctx(ctx).Info("delivery task skipped (idempotent)", + zap.String("event_id", task.Event.ID), + zap.String("tenant_id", task.Event.TenantID), + zap.String("destination_id", task.DestinationID), + zap.Int("attempt", task.Attempt), + zap.Bool("manual", task.Manual), + zap.String("idempotency_key", idempotencyKey)) + } return h.handleError(msg, err) } diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 2b3944e8..ebbab75f 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -1118,6 +1118,66 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina } } +func TestManualDelivery_DuplicateRetry(t *testing.T) { + // Test scenario: + // - First manual retry for event+destination succeeds + // - Second manual retry for same event+destination is requested + // - Second retry should also execute (not be blocked by idempotency) + // + // Manual retries are explicit user actions and should always execute, + // even if a previous manual retry for the same event+destination already succeeded. + + // Setup test data + tenant := models.Tenant{ID: idgen.String()} + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(tenant.ID), + testutil.EventFactory.WithDestinationID(destination.ID), + ) + + // Setup mocks + destGetter := &mockDestinationGetter{dest: &destination} + retryScheduler := newMockRetryScheduler() + publisher := newMockPublisher([]error{nil, nil}) // Both succeed + logPublisher := newMockLogPublisher(nil) + alertMonitor := newMockAlertMonitor() + + // Setup message handler with Redis for idempotency + redis := testutil.CreateTestRedisClient(t) + handler := deliverymq.NewMessageHandler( + testutil.CreateTestLogger(t), + logPublisher, + destGetter, + publisher, + testutil.NewMockEventTracer(nil), + retryScheduler, + &backoff.ConstantBackoff{Interval: 1 * time.Second}, + 10, + alertMonitor, + idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)), + ) + + // Step 1: First manual retry succeeds + task1 := models.NewManualDeliveryTask(event, destination.ID) + mockMsg1, msg1 := newDeliveryMockMessage(task1) + err := handler.Handle(context.Background(), msg1) + require.NoError(t, err) + assert.True(t, mockMsg1.acked, "first manual retry should be acked") + assert.Equal(t, 1, publisher.current, "first manual retry should publish") + require.Len(t, logPublisher.entries, 1, "first manual retry should log delivery") + + // Step 2: Second manual retry for same event+destination should also execute + task2 := models.NewManualDeliveryTask(event, destination.ID) + mockMsg2, msg2 := newDeliveryMockMessage(task2) + err = handler.Handle(context.Background(), msg2) + require.NoError(t, err) + assert.True(t, mockMsg2.acked, "second manual retry should be acked") + assert.Equal(t, 2, publisher.current, "second manual retry should also publish") + require.Len(t, logPublisher.entries, 2, "second manual retry should also log delivery") +} + func TestMessageHandler_RetryID_MultipleDestinations(t *testing.T) { // Test scenario: // - One event is delivered to TWO different destinations diff --git a/internal/models/tasks.go b/internal/models/tasks.go index ae685ade..ed43187a 100644 --- a/internal/models/tasks.go +++ b/internal/models/tasks.go @@ -3,6 +3,7 @@ package models import ( "encoding/json" + "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/mqs" ) @@ -38,6 +39,7 @@ type DeliveryTask struct { DestinationID string `json:"destination_id"` Attempt int `json:"attempt"` Manual bool `json:"manual"` + Nonce string `json:"nonce,omitempty"` Telemetry *DeliveryTelemetry `json:"telemetry,omitempty"` } @@ -56,12 +58,16 @@ func (t *DeliveryTask) ToMessage() (*mqs.Message, error) { } // IdempotencyKey returns the key used for idempotency checks. -// Uses Event.ID + DestinationID + Manual flag. -// Manual retries get a different key so they can bypass idempotency of failed automatic deliveries. +// Manual retries include a nonce so each /retry request gets its own idempotency key, +// while MQ redeliveries of the same message (same nonce) are still deduplicated. +// Nonce was added to fix a regression from #653 where removing DeliveryEvent.ID +// made the manual retry idempotency key static per event+destination. func (t *DeliveryTask) IdempotencyKey() string { if t.Manual { - return t.Event.ID + ":" + t.DestinationID + ":manual" + return t.Event.ID + ":" + t.DestinationID + ":manual:" + t.Nonce } + // Non-manual deliveries share a key per event+destination. On failure, the + // idempotency key is cleared so the scheduled retry can execute with the same key. return t.Event.ID + ":" + t.DestinationID } @@ -81,9 +87,11 @@ func NewDeliveryTask(event Event, destinationID string) DeliveryTask { } // NewManualDeliveryTask creates a new DeliveryTask for a manual retry. +// Each manual retry gets a unique nonce so separate /retry requests are not deduplicated. func NewManualDeliveryTask(event Event, destinationID string) DeliveryTask { task := NewDeliveryTask(event, destinationID) task.Manual = true + task.Nonce = idgen.String() return task } diff --git a/internal/rsmq/rsmq_test.go b/internal/rsmq/rsmq_test.go index 3005ec89..db9e76d9 100644 --- a/internal/rsmq/rsmq_test.go +++ b/internal/rsmq/rsmq_test.go @@ -15,6 +15,7 @@ import ( // RSMQSuite runs RSMQ tests against different backends. type RSMQSuite struct { suite.Suite + cfg *redis.RedisConfig client RedisClient rsmq *RedisSMQ } @@ -30,10 +31,21 @@ func TestRedisRSMQSuite(t *testing.T) { func TestDragonflyRSMQSuite(t *testing.T) { suite.Run(t, new(DragonflyRSMQSuite)) } -func (s *RedisRSMQSuite) SetupTest() { +func (s *RedisRSMQSuite) SetupSuite() { testinfra.Start(s.T()) - cfg := testinfra.NewRedisConfig(s.T()) - client, err := redis.New(context.Background(), cfg) + s.cfg = testinfra.NewRedisConfig(s.T()) +} + +func (s *RedisRSMQSuite) SetupTest() { + // Flush the container's DB 0 before each test method for a clean state. + flushClient, err := redis.New(context.Background(), s.cfg) + if err != nil { + s.T().Fatalf("failed to create redis client for flush: %v", err) + } + flushClient.FlushDB(context.Background()) + flushClient.Close() + + client, err := redis.New(context.Background(), s.cfg) if err != nil { s.T().Fatalf("failed to create redis client: %v", err) } @@ -42,10 +54,21 @@ func (s *RedisRSMQSuite) SetupTest() { s.rsmq = NewRedisSMQ(s.client, "test") } -func (s *DragonflyRSMQSuite) SetupTest() { +func (s *DragonflyRSMQSuite) SetupSuite() { testinfra.Start(s.T()) - cfg := testinfra.NewDragonflyConfig(s.T()) - client, err := redis.New(context.Background(), cfg) + s.cfg = testinfra.NewDragonflyConfig(s.T()) +} + +func (s *DragonflyRSMQSuite) SetupTest() { + // Flush the container's DB 0 before each test method for a clean state. + flushClient, err := redis.New(context.Background(), s.cfg) + if err != nil { + s.T().Fatalf("failed to create redis client for flush: %v", err) + } + flushClient.FlushDB(context.Background()) + flushClient.Close() + + client, err := redis.New(context.Background(), s.cfg) if err != nil { s.T().Fatalf("failed to create redis client: %v", err) } diff --git a/internal/tenantstore/redistenantstore/redistenantstore_test.go b/internal/tenantstore/redistenantstore/redistenantstore_test.go index 4e02d501..14539ac5 100644 --- a/internal/tenantstore/redistenantstore/redistenantstore_test.go +++ b/internal/tenantstore/redistenantstore/redistenantstore_test.go @@ -27,40 +27,18 @@ func miniredisFactory(t *testing.T) redis.Cmdable { return testutil.CreateTestRedisClient(t) } -// redisStackFactory creates a Redis Stack client (with RediSearch). -func redisStackFactory(t *testing.T) redis.Cmdable { - testinfra.Start(t) - redisCfg := testinfra.NewRedisStackConfig(t) - client, err := redis.New(context.Background(), redisCfg) - if err != nil { - t.Fatalf("failed to create redis client: %v", err) - } - t.Cleanup(func() { client.Close() }) - return client -} - -// dragonflyFactory creates a Dragonfly client (no RediSearch). -func dragonflyFactory(t *testing.T) redis.Cmdable { - testinfra.Start(t) - redisCfg := testinfra.NewDragonflyConfig(t) - client, err := redis.New(context.Background(), redisCfg) - if err != nil { - t.Fatalf("failed to create dragonfly client: %v", err) - } - t.Cleanup(func() { client.Close() }) - return client -} - -// dragonflyStackFactory creates a Dragonfly client on DB 0 (with RediSearch). -func dragonflyStackFactory(t *testing.T) redis.Cmdable { - testinfra.Start(t) - redisCfg := testinfra.NewDragonflyStackConfig(t) - client, err := redis.New(context.Background(), redisCfg) - if err != nil { - t.Fatalf("failed to create dragonfly stack client: %v", err) +// sharedFactory returns a factory that reuses a single container (via cfg), +// creating a fresh client and flushing the DB for each sub-test. +func sharedFactory(cfg *redis.RedisConfig) redisClientFactory { + return func(t *testing.T) redis.Cmdable { + client, err := redis.New(context.Background(), cfg) + if err != nil { + t.Fatalf("failed to create redis client: %v", err) + } + client.FlushDB(context.Background()) + t.Cleanup(func() { client.Close() }) + return client } - t.Cleanup(func() { client.Close() }) - return client } // redisTenantStoreHarness implements drivertest.Harness for Redis-backed stores. @@ -150,12 +128,16 @@ func TestMiniredis_WithDeploymentID(t *testing.T) { func TestRedisStack(t *testing.T) { t.Parallel() - drivertest.RunConformanceTests(t, newHarness(redisStackFactory, "")) + testinfra.Start(t) + cfg := testinfra.NewRedisStackConfig(t) + drivertest.RunConformanceTests(t, newHarness(sharedFactory(cfg), "")) } func TestRedisStack_WithDeploymentID(t *testing.T) { t.Parallel() - drivertest.RunConformanceTests(t, newHarness(redisStackFactory, "dp_test_001")) + testinfra.Start(t) + cfg := testinfra.NewRedisStackConfig(t) + drivertest.RunConformanceTests(t, newHarness(sharedFactory(cfg), "dp_test_001")) } // ============================================================================= @@ -164,12 +146,16 @@ func TestRedisStack_WithDeploymentID(t *testing.T) { func TestDragonfly(t *testing.T) { t.Parallel() - drivertest.RunConformanceTests(t, newHarness(dragonflyFactory, "")) + testinfra.Start(t) + cfg := testinfra.NewDragonflyConfig(t) + drivertest.RunConformanceTests(t, newHarness(sharedFactory(cfg), "")) } func TestDragonfly_WithDeploymentID(t *testing.T) { t.Parallel() - drivertest.RunConformanceTests(t, newHarness(dragonflyFactory, "dp_test_001")) + testinfra.Start(t) + cfg := testinfra.NewDragonflyConfig(t) + drivertest.RunConformanceTests(t, newHarness(sharedFactory(cfg), "dp_test_001")) } // ============================================================================= @@ -178,12 +164,16 @@ func TestDragonfly_WithDeploymentID(t *testing.T) { func TestRedisStack_ListTenant(t *testing.T) { t.Parallel() - drivertest.RunListTenantTests(t, newHarness(redisStackFactory, "")) + testinfra.Start(t) + cfg := testinfra.NewRedisStackConfig(t) + drivertest.RunListTenantTests(t, newHarness(sharedFactory(cfg), "")) } func TestRedisStack_ListTenant_WithDeploymentID(t *testing.T) { t.Parallel() - drivertest.RunListTenantTests(t, newHarness(redisStackFactory, "dp_test_001")) + testinfra.Start(t) + cfg := testinfra.NewRedisStackConfig(t) + drivertest.RunListTenantTests(t, newHarness(sharedFactory(cfg), "dp_test_001")) } // ============================================================================= @@ -192,12 +182,16 @@ func TestRedisStack_ListTenant_WithDeploymentID(t *testing.T) { func TestDragonflyStack_ListTenant(t *testing.T) { t.Parallel() - drivertest.RunListTenantTests(t, newHarness(dragonflyStackFactory, "")) + testinfra.Start(t) + cfg := testinfra.NewDragonflyStackConfig(t) + drivertest.RunListTenantTests(t, newHarness(sharedFactory(cfg), "")) } func TestDragonflyStack_ListTenant_WithDeploymentID(t *testing.T) { t.Parallel() - drivertest.RunListTenantTests(t, newHarness(dragonflyStackFactory, "dp_test_001")) + testinfra.Start(t) + cfg := testinfra.NewDragonflyStackConfig(t) + drivertest.RunListTenantTests(t, newHarness(sharedFactory(cfg), "dp_test_001")) } // ============================================================================= diff --git a/internal/util/testinfra/redis.go b/internal/util/testinfra/redis.go index ea5094bc..906a7261 100644 --- a/internal/util/testinfra/redis.go +++ b/internal/util/testinfra/redis.go @@ -2,11 +2,9 @@ package testinfra import ( "context" - "fmt" "log" "strconv" "strings" - "sync" "testing" "github.com/hookdeck/outpost/internal/redis" @@ -14,50 +12,30 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) -var ( - redisOnce sync.Once - dragonflyOnce sync.Once - - // DB 1-15 are for regular Redis/Dragonfly tests (can run in parallel) - redisDBMu sync.Mutex - dragonflyDBMu sync.Mutex - redisDBUsed = make(map[int]bool) - dragonflyDBUsed = make(map[int]bool) -) - -const ( - minRegularDB = 1 // Regular tests use DB 1-15 - maxRegularDB = 15 -) - -// NewRedisConfig allocates a Redis database (1-15) for the test. +// NewRedisConfig spins up a dedicated Redis container for the test. // Use this for tests that don't need RediSearch. -// The database is flushed on cleanup. +// The container is terminated on cleanup. func NewRedisConfig(t *testing.T) *redis.RedisConfig { - addr := EnsureRedis() - db := allocateRegularDB() - - cfg := parseAddrToConfig(addr, db) - - t.Cleanup(func() { - flushRedisDB(addr, db) - releaseRegularDB(db) - }) - - return cfg + return startRedisContainer(t, "redis/redis-stack-server:latest") } // NewRedisStackConfig spins up a dedicated Redis Stack container for tests requiring RediSearch. // Each test gets its own isolated container, eliminating cross-test interference. // The container is terminated on cleanup. func NewRedisStackConfig(t *testing.T) *redis.RedisConfig { + return startRedisContainer(t, "redis/redis-stack-server:latest") +} + +// NewDragonflyConfig spins up a dedicated Dragonfly container for the test. +// Each caller gets its own isolated container on DB 0, eliminating cross-test interference. +// The container is terminated on cleanup. +func NewDragonflyConfig(t *testing.T) *redis.RedisConfig { ctx := context.Background() - // Use generic container with explicit port exposure and wait strategy to avoid - // race conditions when multiple tests spin up containers in parallel. req := testcontainers.ContainerRequest{ - Image: "redis/redis-stack-server:latest", + Image: "docker.dragonflydb.io/dragonflydb/dragonfly:latest", ExposedPorts: []string{"6379/tcp"}, + Cmd: []string{"--proactor_threads=1", "--maxmemory=256mb"}, WaitingFor: wait.ForListeningPort("6379/tcp"), } @@ -66,55 +44,38 @@ func NewRedisStackConfig(t *testing.T) *redis.RedisConfig { Started: true, }) if err != nil { - t.Fatalf("failed to start redis-stack container: %v", err) + t.Fatalf("failed to start dragonfly container: %v", err) } endpoint, err := container.Endpoint(ctx, "") if err != nil { - t.Fatalf("failed to get redis-stack endpoint: %v", err) + t.Fatalf("failed to get dragonfly endpoint: %v", err) } - log.Printf("Redis Stack (dedicated for test) running at %s", endpoint) + log.Printf("Dragonfly (dedicated) running at %s", endpoint) cfg := parseAddrToConfig(endpoint, 0) t.Cleanup(func() { if err := container.Terminate(ctx); err != nil { - log.Printf("failed to terminate redis-stack container: %s", err) + log.Printf("failed to terminate dragonfly container: %s", err) } }) return cfg } -// NewDragonflyConfig allocates a Dragonfly database (1-15) for the test. -// Use NewDragonflyStackConfig for tests requiring RediSearch. -// The database is flushed on cleanup. -func NewDragonflyConfig(t *testing.T) *redis.RedisConfig { - addr := EnsureDragonfly() - db := allocateDragonflyDB() - - cfg := parseAddrToConfig(addr, db) - - t.Cleanup(func() { - flushRedisDB(addr, db) - releaseDragonflyDB(db) - }) - - return cfg +// NewDragonflyStackConfig is an alias for NewDragonflyConfig. +// Kept for backward compatibility with existing test code. +func NewDragonflyStackConfig(t *testing.T) *redis.RedisConfig { + return NewDragonflyConfig(t) } -// NewDragonflyStackConfig spins up a dedicated Dragonfly container for tests requiring RediSearch. -// Each test gets its own isolated container, eliminating cross-test interference. -// The container is terminated on cleanup. -func NewDragonflyStackConfig(t *testing.T) *redis.RedisConfig { +func startRedisContainer(t *testing.T, image string) *redis.RedisConfig { ctx := context.Background() - // Use generic container instead of redis module since Dragonfly has different startup behavior. - // Set proactor_threads=1 and maxmemory=256mb to reduce resource usage when running many containers. req := testcontainers.ContainerRequest{ - Image: "docker.dragonflydb.io/dragonflydb/dragonfly:latest", + Image: image, ExposedPorts: []string{"6379/tcp"}, - Cmd: []string{"--proactor_threads=1", "--maxmemory=256mb"}, WaitingFor: wait.ForListeningPort("6379/tcp"), } @@ -123,20 +84,20 @@ func NewDragonflyStackConfig(t *testing.T) *redis.RedisConfig { Started: true, }) if err != nil { - t.Fatalf("failed to start dragonfly container: %v", err) + t.Fatalf("failed to start redis container: %v", err) } endpoint, err := container.Endpoint(ctx, "") if err != nil { - t.Fatalf("failed to get dragonfly endpoint: %v", err) + t.Fatalf("failed to get redis endpoint: %v", err) } - log.Printf("Dragonfly (dedicated for test) running at %s", endpoint) + log.Printf("Redis (%s) running at %s", image, endpoint) cfg := parseAddrToConfig(endpoint, 0) t.Cleanup(func() { if err := container.Terminate(ctx); err != nil { - log.Printf("failed to terminate dragonfly container: %s", err) + log.Printf("failed to terminate redis container: %s", err) } }) @@ -158,139 +119,3 @@ func parseAddrToConfig(addr string, db int) *redis.RedisConfig { Database: db, } } - -func allocateRegularDB() int { - redisDBMu.Lock() - defer redisDBMu.Unlock() - - for i := minRegularDB; i <= maxRegularDB; i++ { - if !redisDBUsed[i] { - redisDBUsed[i] = true - return i - } - } - panic(fmt.Sprintf("no available databases (DB %d-%d all in use)", minRegularDB, maxRegularDB)) -} - -func releaseRegularDB(db int) { - redisDBMu.Lock() - defer redisDBMu.Unlock() - delete(redisDBUsed, db) -} - -func allocateDragonflyDB() int { - dragonflyDBMu.Lock() - defer dragonflyDBMu.Unlock() - - // Use DB 1-15 to avoid conflicts with tests that might use DB 0 - for i := minRegularDB; i <= maxRegularDB; i++ { - if !dragonflyDBUsed[i] { - dragonflyDBUsed[i] = true - return i - } - } - panic(fmt.Sprintf("no available Dragonfly databases (DB %d-%d all in use)", minRegularDB, maxRegularDB)) -} - -func releaseDragonflyDB(db int) { - dragonflyDBMu.Lock() - defer dragonflyDBMu.Unlock() - delete(dragonflyDBUsed, db) -} - -func flushRedisDB(addr string, db int) { - cfg := parseAddrToConfig(addr, db) - client, err := redis.New(context.Background(), cfg) - if err != nil { - log.Printf("failed to create Redis client for flush: %s", err) - return - } - defer client.Close() - - if err := client.FlushDB(context.Background()).Err(); err != nil { - log.Printf("failed to flush Redis DB %d: %s", db, err) - } -} - -func EnsureRedis() string { - cfg := ReadConfig() - if cfg.RedisURL == "" { - redisOnce.Do(func() { - startRedisTestContainer(cfg) - }) - } - return cfg.RedisURL -} - -func EnsureDragonfly() string { - cfg := ReadConfig() - if cfg.DragonflyURL == "" { - dragonflyOnce.Do(func() { - startDragonflyTestContainer(cfg) - }) - } - return cfg.DragonflyURL -} - -func startRedisTestContainer(cfg *Config) { - ctx := context.Background() - - req := testcontainers.ContainerRequest{ - Image: "redis/redis-stack-server:latest", - ExposedPorts: []string{"6379/tcp"}, - WaitingFor: wait.ForListeningPort("6379/tcp"), - } - - redisContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - if err != nil { - panic(err) - } - - endpoint, err := redisContainer.Endpoint(ctx, "") - if err != nil { - panic(err) - } - log.Printf("Redis (redis-stack-server) running at %s", endpoint) - cfg.RedisURL = endpoint - cfg.cleanupFns = append(cfg.cleanupFns, func() { - if err := redisContainer.Terminate(ctx); err != nil { - log.Printf("failed to terminate container: %s", err) - } - }) -} - -func startDragonflyTestContainer(cfg *Config) { - ctx := context.Background() - - // Use generic container with resource-limiting flags to prevent memory issues. - // Dragonfly requires ~256MB per thread by default, which can exhaust memory. - req := testcontainers.ContainerRequest{ - Image: "docker.dragonflydb.io/dragonflydb/dragonfly:latest", - ExposedPorts: []string{"6379/tcp"}, - Cmd: []string{"--proactor_threads=1", "--maxmemory=256mb"}, - WaitingFor: wait.ForListeningPort("6379/tcp"), - } - - dragonflyContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - if err != nil { - panic(err) - } - - endpoint, err := dragonflyContainer.Endpoint(ctx, "") - if err != nil { - panic(err) - } - log.Printf("Dragonfly running at %s", endpoint) - cfg.DragonflyURL = endpoint - cfg.cleanupFns = append(cfg.cleanupFns, func() { - if err := dragonflyContainer.Terminate(ctx); err != nil { - log.Printf("failed to terminate container: %s", err) - } - }) -} diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index b1b96dbe..3c8cf2cc 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -22,8 +22,6 @@ var ( type Config struct { TestInfra bool TestAzure bool - RedisURL string - DragonflyURL string ClickHouseURL string PostgresURL string LocalStackURL string @@ -71,8 +69,6 @@ func initConfig() { cfg = &Config{ TestInfra: v.GetBool("TESTINFRA"), TestAzure: v.GetBool("TESTAZURE"), - RedisURL: v.GetString("TEST_REDIS_URL"), - DragonflyURL: v.GetString("TEST_DRAGONFLY_URL"), ClickHouseURL: v.GetString("TEST_CLICKHOUSE_URL"), PostgresURL: v.GetString("TEST_POSTGRES_URL"), LocalStackURL: localstackURL, @@ -87,8 +83,6 @@ func initConfig() { cfg = &Config{ TestInfra: v.GetBool("TESTINFRA"), TestAzure: v.GetBool("TESTAZURE"), - RedisURL: "", - DragonflyURL: "", ClickHouseURL: "", PostgresURL: "", LocalStackURL: "",