From 98967037f833690a54650f7e9052b3652303f6df Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 12:21:57 +0700 Subject: [PATCH 1/4] refactor: update alert payload schema Co-Authored-By: Claude Opus 4.5 --- cmd/e2e/alert/alert.go | 37 ++++++++++--- cmd/e2e/alerts_test.go | 64 +++++++++++++++++++++- internal/alert/monitor.go | 13 +++-- internal/alert/monitor_test.go | 8 +-- internal/alert/notifier.go | 35 +++++++++--- internal/alert/notifier_test.go | 2 +- internal/deliverymq/messagehandler.go | 18 ++---- internal/deliverymq/messagehandler_test.go | 6 +- 8 files changed, 138 insertions(+), 45 deletions(-) diff --git a/cmd/e2e/alert/alert.go b/cmd/e2e/alert/alert.go index e96443cf..abf73ce1 100644 --- a/cmd/e2e/alert/alert.go +++ b/cmd/e2e/alert/alert.go @@ -8,8 +8,6 @@ import ( "net/http" "sync" "time" - - "github.com/hookdeck/outpost/internal/models" ) type AlertRequest struct { @@ -23,12 +21,37 @@ type AlertPayload struct { Data ConsecutiveFailureData `json:"data"` } +// AlertedEvent matches internal/alert.AlertedEvent +type AlertedEvent struct { + ID string `json:"id"` + Topic string `json:"topic"` + Metadata map[string]string `json:"metadata"` + Data map[string]any `json:"data"` +} + +// AlertDestination matches internal/alert.AlertDestination +type AlertDestination struct { + ID string `json:"id"` + TenantID string `json:"tenant_id"` + Type string `json:"type"` + Topics []string `json:"topics"` + Filter map[string]any `json:"filter,omitempty"` + Config map[string]string `json:"config"` + Metadata map[string]string `json:"metadata,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + DisabledAt *time.Time `json:"disabled_at"` +} + +// ConsecutiveFailureData matches internal/alert.ConsecutiveFailureData type ConsecutiveFailureData struct { - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - ConsecutiveFailures int `json:"consecutive_failures"` - WillDisable bool `json:"will_disable"` - Destination *models.Destination `json:"destination"` - Data map[string]interface{} `json:"data"` + TenantID string `json:"tenant_id"` + Event AlertedEvent `json:"event"` + MaxConsecutiveFailures int `json:"max_consecutive_failures"` + ConsecutiveFailures int `json:"consecutive_failures"` + WillDisable bool `json:"will_disable"` + Destination *AlertDestination `json:"destination"` + AttemptResponse map[string]any `json:"attempt_response"` } type AlertMockServer struct { diff --git a/cmd/e2e/alerts_test.go b/cmd/e2e/alerts_test.go index cbb2cc4e..5774b5fd 100644 --- a/cmd/e2e/alerts_test.go +++ b/cmd/e2e/alerts_test.go @@ -7,7 +7,7 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) // Publish 20 failing events - for i := 0; i < 20; i++ { + for i := range 20 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -27,9 +27,39 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { expectedCounts := []int{10, 14, 18, 20} for i, alert := range alerts { + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") + + // Topic assertion + s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + + // TenantID assertion + s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") + s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + + // Event assertions + s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") + s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") + s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + + // ConsecutiveFailures assertions s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) + s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + + // WillDisable assertion (should be true for last alert only) + if i == len(alerts)-1 { + s.True(alert.Alert.Data.WillDisable, "last alert should have will_disable=true") + } + + // AttemptResponse assertion + s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") } } @@ -38,7 +68,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) // First batch: 14 failures - for i := 0; i < 14; i++ { + for i := range 14 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -56,7 +86,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { s.waitForNewMockServerEvents(dest.mockID, 15) // Second batch: 14 more failures - for i := 0; i < 14; i++ { + for i := range 14 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -76,8 +106,36 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { expectedCounts := []int{10, 14, 10, 14} for i, alert := range alerts { + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") + + // Topic assertion + s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + + // TenantID assertion + s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") + s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + + // Event assertions + s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") + s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") + s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + + // ConsecutiveFailures assertions s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) + s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + + // WillDisable assertion (none should have will_disable=true since counter resets) + s.False(alert.Alert.Data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) + + // AttemptResponse assertion + s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") } } diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 39ff54f4..7eeb10df 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -82,11 +82,11 @@ func WithDeploymentID(deploymentID string) AlertOption { // DeliveryAttempt represents a single delivery attempt type DeliveryAttempt struct { - Success bool - DeliveryTask *models.DeliveryTask - Destination *AlertDestination - Timestamp time.Time - DeliveryResponse map[string]interface{} + Success bool + DeliveryTask *models.DeliveryTask + Destination *AlertDestination + Timestamp time.Time + AttemptResponse map[string]interface{} } type alertMonitor struct { @@ -154,6 +154,7 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ + TenantID: attempt.Destination.TenantID, Event: AlertedEvent{ ID: attempt.DeliveryTask.Event.ID, Topic: attempt.DeliveryTask.Event.Topic, @@ -164,7 +165,7 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp ConsecutiveFailures: count, WillDisable: m.disabler != nil && level == 100, Destination: attempt.Destination, - DeliveryResponse: attempt.DeliveryResponse, + AttemptResponse: attempt.AttemptResponse, }) // If we've hit 100% and have a disabler configured, disable the destination diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 16372475..2185635e 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -58,7 +58,7 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { Success: false, DeliveryTask: task, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "500", "data": map[string]any{"error": "test error"}, }, @@ -80,7 +80,7 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") require.Equal(t, dest, alert.Data.Destination) require.Equal(t, "alert.consecutive_failure", alert.Topic) - require.Equal(t, attempt.DeliveryResponse, alert.Data.DeliveryResponse) + require.Equal(t, attempt.AttemptResponse, alert.Data.AttemptResponse) require.Equal(t, 20, alert.Data.MaxConsecutiveFailures) require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") } @@ -125,7 +125,7 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { Success: false, DeliveryTask: task, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "500", "data": map[string]any{"error": "test error"}, }, @@ -198,7 +198,7 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { Success: false, DeliveryTask: task, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "500", }, Timestamp: time.Now(), diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index ef9589bc..92cc8b51 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -50,23 +50,42 @@ type AlertedEvent struct { } type AlertDestination struct { - ID string `json:"id" redis:"id"` - TenantID string `json:"tenant_id" redis:"-"` - Type string `json:"type" redis:"type"` - Topics models.Topics `json:"topics" redis:"-"` - Config models.Config `json:"config" redis:"-"` - CreatedAt time.Time `json:"created_at" redis:"created_at"` - DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` + ID string `json:"id" redis:"id"` + TenantID string `json:"tenant_id" redis:"-"` + Type string `json:"type" redis:"type"` + Topics models.Topics `json:"topics" redis:"-"` + Filter models.Filter `json:"filter,omitempty" redis:"-"` + Config models.Config `json:"config" redis:"-"` + Metadata models.Metadata `json:"metadata,omitempty" redis:"-"` + CreatedAt time.Time `json:"created_at" redis:"created_at"` + UpdatedAt time.Time `json:"updated_at" redis:"updated_at"` + DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` +} + +func AlertDestinationFromDestination(d *models.Destination) *AlertDestination { + return &AlertDestination{ + ID: d.ID, + TenantID: d.TenantID, + Type: d.Type, + Topics: d.Topics, + Filter: d.Filter, + Config: d.Config, + Metadata: d.Metadata, + CreatedAt: d.CreatedAt, + UpdatedAt: d.UpdatedAt, + DisabledAt: d.DisabledAt, + } } // ConsecutiveFailureData represents the data needed for a consecutive failure alert type ConsecutiveFailureData struct { + TenantID string `json:"tenant_id"` Event AlertedEvent `json:"event"` MaxConsecutiveFailures int `json:"max_consecutive_failures"` ConsecutiveFailures int `json:"consecutive_failures"` WillDisable bool `json:"will_disable"` Destination *AlertDestination `json:"destination"` - DeliveryResponse map[string]interface{} `json:"delivery_response"` + AttemptResponse map[string]interface{} `json:"attempt_response"` } // ConsecutiveFailureAlert represents an alert for consecutive failures diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 31661e52..43ff62e4 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -102,7 +102,7 @@ func TestAlertNotifier_Notify(t *testing.T) { ConsecutiveFailures: 5, WillDisable: true, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "error", "data": map[string]any{"code": "ETIMEDOUT"}, }, diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index d6e31590..073d47f3 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -298,16 +298,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De alertAttempt := alert.DeliveryAttempt{ Success: attemptResult.Status == models.AttemptStatusSuccess, DeliveryTask: task, - Destination: &alert.AlertDestination{ - ID: destination.ID, - TenantID: destination.TenantID, - Type: destination.Type, - Topics: destination.Topics, - Config: destination.Config, - CreatedAt: destination.CreatedAt, - DisabledAt: destination.DisabledAt, - }, - Timestamp: attemptResult.Time, + Destination: alert.AlertDestinationFromDestination(destination), + Timestamp: attemptResult.Time, } if !alertAttempt.Success && err != nil { @@ -316,14 +308,14 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De if errors.As(err, &atmErr) { var pubErr *destregistry.ErrDestinationPublishAttempt if errors.As(atmErr.err, &pubErr) { - alertAttempt.DeliveryResponse = pubErr.Data + alertAttempt.AttemptResponse = pubErr.Data } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ + alertAttempt.AttemptResponse = map[string]interface{}{ "error": atmErr.err.Error(), } } } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ + alertAttempt.AttemptResponse = map[string]interface{}{ "error": "unexpected", "message": err.Error(), } diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index ebbab75f..f3a5212b 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -989,7 +989,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { return attempt.Success && // Should be a successful attempt attempt.Destination.ID == destination.ID && // Should have correct destination attempt.DeliveryTask != nil && // Should have delivery task - attempt.DeliveryResponse == nil // No error data for success + attempt.AttemptResponse == nil // No error data for success })).Return(nil) // Setup message handler @@ -1112,9 +1112,9 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina assert.NotNil(t, attempt.DeliveryTask, "alert attempt should have delivery task") if expectedData != nil { - assert.Equal(t, expectedData, attempt.DeliveryResponse, "alert attempt data should match") + assert.Equal(t, expectedData, attempt.AttemptResponse, "alert attempt data should match") } else { - assert.Nil(t, attempt.DeliveryResponse, "alert attempt should not have data") + assert.Nil(t, attempt.AttemptResponse, "alert attempt should not have data") } } From 4214c677f19ba9cde8f646e4392b5b3def36f960 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 12:34:56 +0700 Subject: [PATCH 2/4] test: add failing tests for alert.destination.disabled callback TDD setup - tests will pass once feature is implemented. Co-Authored-By: Claude Opus 4.5 --- cmd/e2e/alert/alert.go | 75 +++++++++++++++++++- cmd/e2e/alerts_test.go | 125 +++++++++++++++++++++++++-------- cmd/e2e/helpers_test.go | 17 +++++ internal/alert/monitor_test.go | 86 +++++++++++++++++++++++ internal/alert/notifier.go | 33 +++++++++ 5 files changed, 307 insertions(+), 29 deletions(-) diff --git a/cmd/e2e/alert/alert.go b/cmd/e2e/alert/alert.go index abf73ce1..bc901498 100644 --- a/cmd/e2e/alert/alert.go +++ b/cmd/e2e/alert/alert.go @@ -16,11 +16,25 @@ type AlertRequest struct { } type AlertPayload struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data json.RawMessage `json:"data"` +} + +// ConsecutiveFailureAlert is a parsed alert for "alert.consecutive_failure" +type ConsecutiveFailureAlert struct { Topic string `json:"topic"` Timestamp time.Time `json:"timestamp"` Data ConsecutiveFailureData `json:"data"` } +// DestinationDisabledAlert is a parsed alert for "alert.destination.disabled" +type DestinationDisabledAlert struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data DestinationDisabledData `json:"data"` +} + // AlertedEvent matches internal/alert.AlertedEvent type AlertedEvent struct { ID string `json:"id"` @@ -54,6 +68,17 @@ type ConsecutiveFailureData struct { AttemptResponse map[string]any `json:"attempt_response"` } +// DestinationDisabledData matches the expected payload for "alert.destination.disabled" +type DestinationDisabledData struct { + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + TriggeringEvent *AlertedEvent `json:"triggering_event,omitempty"` + ConsecutiveFailures int `json:"consecutive_failures"` + MaxConsecutiveFailures int `json:"max_consecutive_failures"` + AttemptResponse map[string]any `json:"attempt_response"` +} + type AlertMockServer struct { server *http.Server alerts []AlertRequest @@ -140,6 +165,11 @@ func (s *AlertMockServer) handleAlert(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +// alertDataWithDestination is used to extract destination from any alert type +type alertDataWithDestination struct { + Destination *AlertDestination `json:"destination"` +} + // Helper methods for assertions func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertRequest { s.mu.RLock() @@ -147,7 +177,11 @@ func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertR var filtered []AlertRequest for _, alert := range s.alerts { - if alert.Alert.Data.Destination != nil && alert.Alert.Data.Destination.ID == destinationID { + var data alertDataWithDestination + if err := json.Unmarshal(alert.Alert.Data, &data); err != nil { + continue + } + if data.Destination != nil && data.Destination.ID == destinationID { filtered = append(filtered, alert) } } @@ -164,3 +198,42 @@ func (s *AlertMockServer) GetLastAlert() *AlertRequest { alert := s.alerts[len(s.alerts)-1] return &alert } + +// GetAlertsForDestinationByTopic returns alerts filtered by destination ID and topic +func (s *AlertMockServer) GetAlertsForDestinationByTopic(destinationID, topic string) []AlertRequest { + s.mu.RLock() + defer s.mu.RUnlock() + + var filtered []AlertRequest + for _, alert := range s.alerts { + if alert.Alert.Topic != topic { + continue + } + var data alertDataWithDestination + if err := json.Unmarshal(alert.Alert.Data, &data); err != nil { + continue + } + if data.Destination != nil && data.Destination.ID == destinationID { + filtered = append(filtered, alert) + } + } + return filtered +} + +// ParseConsecutiveFailureData parses the Data field as ConsecutiveFailureData +func (a *AlertRequest) ParseConsecutiveFailureData() (*ConsecutiveFailureData, error) { + var data ConsecutiveFailureData + if err := json.Unmarshal(a.Alert.Data, &data); err != nil { + return nil, err + } + return &data, nil +} + +// ParseDestinationDisabledData parses the Data field as DestinationDisabledData +func (a *AlertRequest) ParseDestinationDisabledData() (*DestinationDisabledData, error) { + var data DestinationDisabledData + if err := json.Unmarshal(a.Alert.Data, &data); err != nil { + return nil, err + } + return &data, nil +} diff --git a/cmd/e2e/alerts_test.go b/cmd/e2e/alerts_test.go index 5774b5fd..4622b097 100644 --- a/cmd/e2e/alerts_test.go +++ b/cmd/e2e/alerts_test.go @@ -22,11 +22,15 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { // Wait for 4 alert callbacks to be processed s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestination(dest.ID) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 18, 20} for i, alert := range alerts { + // Parse alert data + data, err := alert.ParseConsecutiveFailureData() + s.Require().NoError(err, "failed to parse consecutive failure data") + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") @@ -34,32 +38,32 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") // TenantID assertion - s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") - s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") // Destination assertions - s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") - s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") - s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") - s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") // Event assertions - s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") - s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") - s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + s.NotEmpty(data.Event.ID, "alert event should have ID") + s.Equal("user.created", data.Event.Topic, "alert event topic should match") + s.NotNil(data.Event.Data, "alert event should have data") // ConsecutiveFailures assertions - s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, + s.Equal(expectedCounts[i], data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) - s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20") // WillDisable assertion (should be true for last alert only) if i == len(alerts)-1 { - s.True(alert.Alert.Data.WillDisable, "last alert should have will_disable=true") + s.True(data.WillDisable, "last alert should have will_disable=true") } // AttemptResponse assertion - s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") + s.NotNil(data.AttemptResponse, "alert should have attempt_response") } } @@ -101,11 +105,15 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { // Wait for 4 alert callbacks: [10, 14] from first batch, [10, 14] from second batch s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestination(dest.ID) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 10, 14} for i, alert := range alerts { + // Parse alert data + data, err := alert.ParseConsecutiveFailureData() + s.Require().NoError(err, "failed to parse consecutive failure data") + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") @@ -113,29 +121,90 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") // TenantID assertion - s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") - s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") // Destination assertions - s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") - s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") - s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") - s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") // Event assertions - s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") - s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") - s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + s.NotEmpty(data.Event.ID, "alert event should have ID") + s.Equal("user.created", data.Event.Topic, "alert event topic should match") + s.NotNil(data.Event.Data, "alert event should have data") // ConsecutiveFailures assertions - s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, + s.Equal(expectedCounts[i], data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) - s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20") // WillDisable assertion (none should have will_disable=true since counter resets) - s.False(alert.Alert.Data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) + s.False(data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) // AttemptResponse assertion - s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") + s.NotNil(data.AttemptResponse, "alert should have attempt_response") + } +} + +func (s *basicSuite) TestAlerts_DestinationDisabledCallback() { + tenant := s.createTenant() + dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) + + // Publish 20 failing events to trigger auto-disable + for i := range 20 { + s.publish(tenant.ID, "user.created", map[string]any{ + "index": i, + }, withPublishMetadata(map[string]string{"should_err": "true"})) + } + + // Wait for destination to be disabled (sync point for all 20 deliveries) + s.waitForNewDestinationDisabled(tenant.ID, dest.ID) + + // Verify destination is disabled + got := s.getDestination(tenant.ID, dest.ID) + s.NotNil(got.DisabledAt, "destination should be disabled") + + // Wait for the destination.disabled alert callback + s.waitForAlertsByTopic(dest.ID, "alert.destination.disabled", 1) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.disabled") + s.Require().Len(alerts, 1, "should have 1 destination.disabled alert") + + alert := alerts[0] + data, err := alert.ParseDestinationDisabledData() + s.Require().NoError(err, "failed to parse destination disabled data") + + // Auth header assertion + s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") + + // Topic assertion + s.Equal("alert.destination.disabled", alert.Alert.Topic, "alert topic should be alert.destination.disabled") + + // TenantID assertion + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") + s.NotNil(data.Destination.DisabledAt, "destination should have disabled_at set") + + // DisabledAt assertion + s.False(data.DisabledAt.IsZero(), "disabled_at should not be zero") + + // TriggeringEvent assertions (optional but expected) + if data.TriggeringEvent != nil { + s.NotEmpty(data.TriggeringEvent.ID, "triggering event should have ID") + s.Equal("user.created", data.TriggeringEvent.Topic, "triggering event topic should match") } + + // ConsecutiveFailures assertions + s.Equal(20, data.ConsecutiveFailures, "consecutive_failures should be 20") + s.Equal(20, data.MaxConsecutiveFailures, "max_consecutive_failures should be 20") + + // AttemptResponse assertion + s.NotNil(data.AttemptResponse, "alert should have attempt_response") } diff --git a/cmd/e2e/helpers_test.go b/cmd/e2e/helpers_test.go index edfb6208..0f019faf 100644 --- a/cmd/e2e/helpers_test.go +++ b/cmd/e2e/helpers_test.go @@ -406,6 +406,23 @@ func (s *basicSuite) waitForAlerts(destID string, count int) { s.Require().FailNowf("timeout", "timed out waiting for %d alerts for %s (got %d)", count, destID, lastCount) } +// waitForAlertsByTopic polls until at least count alerts with the specific topic exist for the destination. +func (s *basicSuite) waitForAlertsByTopic(destID, topic string, count int) { + s.T().Helper() + timeout := alertPollTimeout + deadline := time.Now().Add(timeout) + var lastCount int + + for time.Now().Before(deadline) { + lastCount = len(s.alertServer.GetAlertsForDestinationByTopic(destID, topic)) + if lastCount >= count { + return + } + time.Sleep(100 * time.Millisecond) + } + s.Require().FailNowf("timeout", "timed out waiting for %d %s alerts for %s (got %d)", count, topic, destID, lastCount) +} + // ============================================================================= // Absence assertion // ============================================================================= diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 2185635e..9f75cd8b 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -236,3 +236,89 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { } require.Equal(t, 6, disableCallCount, "Should have called disable 6 times (for failures 20-25)") } + +func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { + // This test verifies that when a destination is auto-disabled after reaching + // the consecutive failure threshold, a DestinationDisabledAlert is sent via + // the notifier with topic "alert.destination.disabled". + t.Parallel() + ctx := context.Background() + logger := testutil.CreateTestLogger(t) + redisClient := testutil.CreateTestRedisClient(t) + notifier := &mockAlertNotifier{} + notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabler := &mockDestinationDisabler{} + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + autoDisableCount := 5 + monitor := alert.NewAlertMonitor( + logger, + redisClient, + alert.WithNotifier(notifier), + alert.WithDisabler(disabler), + alert.WithAutoDisableFailureCount(autoDisableCount), + alert.WithAlertThresholds([]int{100}), // Only alert at 100% to simplify test + ) + + modelsDest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_disabled_test"), + testutil.DestinationFactory.WithTenantID("tenant_disabled_test"), + ) + dest := alert.AlertDestinationFromDestination(&modelsDest) + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID("event_123"), + testutil.EventFactory.WithTopic("test.event"), + ) + task := &models.DeliveryTask{Event: *event} + attemptResponse := map[string]interface{}{ + "status": "500", + "data": map[string]any{"error": "internal server error"}, + } + attempt := alert.DeliveryAttempt{ + Success: false, + DeliveryTask: task, + Destination: dest, + AttemptResponse: attemptResponse, + Timestamp: time.Now(), + } + + // Send exactly autoDisableCount failures to trigger auto-disable + for i := 1; i <= autoDisableCount; i++ { + require.NoError(t, monitor.HandleAttempt(ctx, attempt)) + } + + // Verify destination was disabled + disabler.AssertCalled(t, "DisableDestination", mock.Anything, dest.TenantID, dest.ID) + + // Find the DestinationDisabledAlert in the notifier calls + var foundDestinationDisabledAlert bool + var destinationDisabledAlert alert.DestinationDisabledAlert + for _, call := range notifier.Calls { + if call.Method == "Notify" { + alertArg := call.Arguments.Get(1) + if disabledAlert, ok := alertArg.(alert.DestinationDisabledAlert); ok { + foundDestinationDisabledAlert = true + destinationDisabledAlert = disabledAlert + break + } + } + } + + require.True(t, foundDestinationDisabledAlert, "Expected DestinationDisabledAlert to be sent when destination is disabled") + + // Verify the alert topic + assert.Equal(t, "alert.destination.disabled", destinationDisabledAlert.Topic, "Alert should have topic 'alert.destination.disabled'") + + // Verify the alert data + assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.TenantID, "TenantID should match") + assert.Equal(t, dest, destinationDisabledAlert.Data.Destination, "Destination should match") + assert.False(t, destinationDisabledAlert.Data.DisabledAt.IsZero(), "DisabledAt should be set") + assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures, "ConsecutiveFailures should match threshold") + assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.MaxConsecutiveFailures, "MaxConsecutiveFailures should match configured value") + assert.Equal(t, attemptResponse, destinationDisabledAlert.Data.AttemptResponse, "AttemptResponse should match") + + // Verify the triggering event is included + require.NotNil(t, destinationDisabledAlert.Data.TriggeringEvent, "TriggeringEvent should be set") + assert.Equal(t, event.ID, destinationDisabledAlert.Data.TriggeringEvent.ID, "TriggeringEvent ID should match") + assert.Equal(t, event.Topic, destinationDisabledAlert.Data.TriggeringEvent.Topic, "TriggeringEvent Topic should match") +} diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index 92cc8b51..75ba00ed 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -110,6 +110,39 @@ func NewConsecutiveFailureAlert(data ConsecutiveFailureData) ConsecutiveFailureA } } +// DestinationDisabledData represents the data for a destination disabled alert +type DestinationDisabledData struct { + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + TriggeringEvent *AlertedEvent `json:"triggering_event,omitempty"` + ConsecutiveFailures int `json:"consecutive_failures"` + MaxConsecutiveFailures int `json:"max_consecutive_failures"` + AttemptResponse map[string]any `json:"attempt_response"` +} + +// DestinationDisabledAlert represents an alert for when a destination is auto-disabled +type DestinationDisabledAlert struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data DestinationDisabledData `json:"data"` +} + +// MarshalJSON implements json.Marshaler +func (a DestinationDisabledAlert) MarshalJSON() ([]byte, error) { + type Alias DestinationDisabledAlert + return json.Marshal(Alias(a)) +} + +// NewDestinationDisabledAlert creates a new destination disabled alert with defaults +func NewDestinationDisabledAlert(data DestinationDisabledData) DestinationDisabledAlert { + return DestinationDisabledAlert{ + Topic: "alert.destination.disabled", + Timestamp: time.Now(), + Data: data, + } +} + type httpAlertNotifier struct { client *http.Client callbackURL string From 0c437b5078ab7d2fd815e0f7e2c2280a2026351f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 13:00:34 +0700 Subject: [PATCH 3/4] feat: add alert.destination.disabled callback Send alert when destination is auto-disabled after reaching consecutive failure threshold. Co-Authored-By: Claude Opus 4.5 --- internal/alert/monitor.go | 31 ++++++++++- internal/alert/monitor_test.go | 98 ++++++++++++++++++++++------------ internal/services/builder.go | 12 +++-- 3 files changed, 101 insertions(+), 40 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 7eeb10df..d87b9f75 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -13,7 +13,7 @@ import ( // DestinationDisabler handles disabling destinations type DestinationDisabler interface { - DisableDestination(ctx context.Context, tenantID, destinationID string) error + DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) } // AlertMonitor is the main interface for handling delivery attempt alerts @@ -170,7 +170,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp // If we've hit 100% and have a disabler configured, disable the destination if level == 100 && m.disabler != nil { - if err := m.disabler.DisableDestination(ctx, attempt.Destination.TenantID, attempt.Destination.ID); err != nil { + disabledDest, err := m.disabler.DisableDestination(ctx, attempt.Destination.TenantID, attempt.Destination.ID) + if err != nil { return fmt.Errorf("failed to disable destination: %w", err) } @@ -180,6 +181,32 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), ) + + // Send destination disabled alert + if m.notifier != nil { + disabledAlert := NewDestinationDisabledAlert(DestinationDisabledData{ + TenantID: attempt.Destination.TenantID, + Destination: AlertDestinationFromDestination(&disabledDest), + DisabledAt: *disabledDest.DisabledAt, + TriggeringEvent: &AlertedEvent{ + ID: attempt.DeliveryTask.Event.ID, + Topic: attempt.DeliveryTask.Event.Topic, + Metadata: attempt.DeliveryTask.Event.Metadata, + Data: attempt.DeliveryTask.Event.Data, + }, + ConsecutiveFailures: count, + MaxConsecutiveFailures: m.autoDisableFailureCount, + AttemptResponse: attempt.AttemptResponse, + }) + if err := m.notifier.Notify(ctx, disabledAlert); err != nil { + m.logger.Ctx(ctx).Error("failed to send destination disabled alert", + zap.Error(err), + zap.String("tenant_id", attempt.Destination.TenantID), + zap.String("destination_id", attempt.Destination.ID), + ) + return fmt.Errorf("failed to send destination disabled alert: %w", err) + } + } } // Send alert if notifier is configured diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 9f75cd8b..62c87613 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -27,9 +27,9 @@ type mockDestinationDisabler struct { mock.Mock } -func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { - m.Called(ctx, tenantID, destinationID) - return nil +func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) { + args := m.Called(ctx, tenantID, destinationID) + return args.Get(0).(models.Destination), args.Error(1) } func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { @@ -39,8 +39,14 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_1", + TenantID: "tenant_1", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -70,22 +76,23 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications were sent at correct thresholds - var notifyCallCount int + // Verify consecutive failure notifications were sent at correct thresholds + var consecutiveFailureCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - failures := alert.Data.ConsecutiveFailures - require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") - require.Equal(t, dest, alert.Data.Destination) - require.Equal(t, "alert.consecutive_failure", alert.Topic) - require.Equal(t, attempt.AttemptResponse, alert.Data.AttemptResponse) - require.Equal(t, 20, alert.Data.MaxConsecutiveFailures) - require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + failures := cfAlert.Data.ConsecutiveFailures + require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") + require.Equal(t, dest, cfAlert.Data.Destination) + require.Equal(t, "alert.consecutive_failure", cfAlert.Topic) + require.Equal(t, attempt.AttemptResponse, cfAlert.Data.AttemptResponse) + require.Equal(t, 20, cfAlert.Data.MaxConsecutiveFailures) + require.Equal(t, failures == 20, cfAlert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + } } } - require.Equal(t, 4, notifyCallCount, "Should have sent exactly 4 notifications") + require.Equal(t, 4, consecutiveFailureCount, "Should have sent exactly 4 consecutive failure notifications") // Verify destination was disabled exactly once at 100% var disableCallCount int @@ -106,8 +113,14 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_1", + TenantID: "tenant_1", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -159,8 +172,9 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { // Verify the notifications were at the right thresholds var seenCounts []int for _, call := range notifier.Calls { - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - seenCounts = append(seenCounts, alert.Data.ConsecutiveFailures) + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures) + } } assert.Contains(t, seenCounts, 10, "Should have alerted at 50% (10 failures)") assert.Contains(t, seenCounts, 14, "Should have alerted at 66% (14 failures)") @@ -179,8 +193,14 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_above", + TenantID: "tenant_above", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -209,22 +229,23 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications at 50%, 70%, 90%, and 100% thresholds + // Verify consecutive failure notifications at 50%, 70%, 90%, and 100% thresholds // Plus additional notifications for failures 21-25 (all at 100% level) - var notifyCallCount int + var consecutiveFailureCount int var disableNotifyCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alertData := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - if alertData.Data.ConsecutiveFailures >= 20 { - disableNotifyCount++ - require.True(t, alertData.Data.WillDisable, "WillDisable should be true at and above 100%") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + if cfAlert.Data.ConsecutiveFailures >= 20 { + disableNotifyCount++ + require.True(t, cfAlert.Data.WillDisable, "WillDisable should be true at and above 100%") + } } } } // 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25 - require.Equal(t, 9, notifyCallCount, "Should have sent 9 notifications (4 at thresholds + 5 above)") + require.Equal(t, 9, consecutiveFailureCount, "Should have sent 9 consecutive failure notifications (4 at thresholds + 5 above)") require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)") // Verify destination was disabled multiple times (once per failure >= 20) @@ -247,8 +268,16 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + + // Create a destination that will be returned by the disabler + disabledAt := time.Now() + modelsDest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_disabled_test"), + testutil.DestinationFactory.WithTenantID("tenant_disabled_test"), + ) + modelsDest.DisabledAt = &disabledAt disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(modelsDest, nil) autoDisableCount := 5 monitor := alert.NewAlertMonitor( @@ -260,10 +289,6 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { alert.WithAlertThresholds([]int{100}), // Only alert at 100% to simplify test ) - modelsDest := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithID("dest_disabled_test"), - testutil.DestinationFactory.WithTenantID("tenant_disabled_test"), - ) dest := alert.AlertDestinationFromDestination(&modelsDest) event := testutil.EventFactory.AnyPointer( testutil.EventFactory.WithID("event_123"), @@ -311,8 +336,13 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { // Verify the alert data assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.TenantID, "TenantID should match") - assert.Equal(t, dest, destinationDisabledAlert.Data.Destination, "Destination should match") + assert.Equal(t, dest.ID, destinationDisabledAlert.Data.Destination.ID, "Destination ID should match") + assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.Destination.TenantID, "Destination TenantID should match") + assert.NotNil(t, destinationDisabledAlert.Data.Destination.DisabledAt, "Destination DisabledAt should be set") assert.False(t, destinationDisabledAlert.Data.DisabledAt.IsZero(), "DisabledAt should be set") + // Verify the alert's DisabledAt matches the destination's DisabledAt exactly + assert.Equal(t, disabledAt, destinationDisabledAlert.Data.DisabledAt, "Alert DisabledAt should match destination's DisabledAt exactly") + assert.Equal(t, disabledAt, *destinationDisabledAlert.Data.Destination.DisabledAt, "Alert Destination.DisabledAt should match destination's DisabledAt exactly") assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures, "ConsecutiveFailures should match threshold") assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.MaxConsecutiveFailures, "MaxConsecutiveFailures should match configured value") assert.Equal(t, attemptResponse, destinationDisabledAlert.Data.AttemptResponse, "AttemptResponse should match") diff --git a/internal/services/builder.go b/internal/services/builder.go index b3c0e71b..961b1fd0 100644 --- a/internal/services/builder.go +++ b/internal/services/builder.go @@ -18,6 +18,7 @@ import ( "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logmq" "github.com/hookdeck/outpost/internal/logstore" + "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/publishmq" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/scheduler" @@ -413,17 +414,20 @@ func newDestinationDisabler(tenantStore tenantstore.TenantStore) alert.Destinati } } -func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { +func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) { destination, err := d.tenantStore.RetrieveDestination(ctx, tenantID, destinationID) if err != nil { - return err + return models.Destination{}, err } if destination == nil { - return nil + return models.Destination{}, fmt.Errorf("destination not found") } now := time.Now() destination.DisabledAt = &now - return d.tenantStore.UpsertDestination(ctx, *destination) + if err := d.tenantStore.UpsertDestination(ctx, *destination); err != nil { + return models.Destination{}, err + } + return *destination, nil } // Helper methods for serviceInstance to initialize common dependencies From 7db63e9c18eff111352c74a88e45a5533f9b0d28 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 18:54:46 +0700 Subject: [PATCH 4/4] chore: error handling --- internal/alert/monitor.go | 21 ++-- internal/alert/notifier_test.go | 205 ++++++++++++++++---------------- 2 files changed, 113 insertions(+), 113 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index d87b9f75..e13f245d 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -182,7 +182,7 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp zap.String("destination_type", attempt.Destination.Type), ) - // Send destination disabled alert + // Send destination disabled alert (best-effort, don't fail on notification error) if m.notifier != nil { disabledAlert := NewDestinationDisabledAlert(DestinationDisabledData{ TenantID: attempt.Destination.TenantID, @@ -204,30 +204,27 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), ) - return fmt.Errorf("failed to send destination disabled alert: %w", err) } } } - // Send alert if notifier is configured + // Send alert if notifier is configured (best-effort, don't fail on notification error) if m.notifier != nil { if err := m.notifier.Notify(ctx, alert); err != nil { - m.logger.Ctx(ctx).Error("failed to send alert", + m.logger.Ctx(ctx).Error("failed to send consecutive failure alert", zap.Error(err), zap.String("event_id", attempt.DeliveryTask.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), + ) + } else { + m.logger.Ctx(ctx).Audit("alert sent", + zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("tenant_id", attempt.Destination.TenantID), + zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), ) - return fmt.Errorf("failed to send alert: %w", err) } - - m.logger.Ctx(ctx).Audit("alert sent", - zap.String("event_id", attempt.DeliveryTask.Event.ID), - zap.String("tenant_id", attempt.Destination.TenantID), - zap.String("destination_id", attempt.Destination.ID), - zap.String("destination_type", attempt.Destination.Type), - ) } return nil diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 43ff62e4..6ca044f7 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -16,107 +17,109 @@ import ( func TestAlertNotifier_Notify(t *testing.T) { t.Parallel() - tests := []struct { - name string - handler func(w http.ResponseWriter, r *http.Request) - notifierOpts []alert.NotifierOption - wantErr bool - errContains string - }{ - { - name: "successful notification", - handler: func(w http.ResponseWriter, r *http.Request) { - // Verify request - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - - // Read and verify request body - var body map[string]interface{} - err := json.NewDecoder(r.Body).Decode(&body) - require.NoError(t, err) - - assert.Equal(t, "alert.consecutive_failure", body["topic"]) - data := body["data"].(map[string]interface{}) - assert.Equal(t, float64(10), data["max_consecutive_failures"]) - assert.Equal(t, float64(5), data["consecutive_failures"]) - assert.Equal(t, true, data["will_disable"]) - - // Log raw JSON for debugging - rawJSON, _ := json.Marshal(body) - t.Logf("Raw JSON: %s", string(rawJSON)) - - w.WriteHeader(http.StatusOK) - }, - }, - { - name: "server error", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - }, - wantErr: true, - errContains: "alert callback failed with status 500", - }, - { - name: "invalid response status", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadRequest) - }, - wantErr: true, - errContains: "alert callback failed with status 400", - }, - { - name: "timeout exceeded", - handler: func(w http.ResponseWriter, r *http.Request) { - time.Sleep(100 * time.Millisecond) - w.WriteHeader(http.StatusOK) - }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithTimeout(50 * time.Millisecond)}, - wantErr: true, - errContains: "context deadline exceeded", - }, - { - name: "successful notification with bearer token", - handler: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) - w.WriteHeader(http.StatusOK) + t.Run("successful notification", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + // Verify request + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + // Read and verify request body + var body map[string]any + err := json.NewDecoder(r.Body).Decode(&body) + require.NoError(t, err) + + assert.Equal(t, "alert.consecutive_failure", body["topic"]) + data := body["data"].(map[string]any) + assert.Equal(t, float64(10), data["max_consecutive_failures"]) + assert.Equal(t, float64(5), data["consecutive_failures"]) + assert.Equal(t, true, data["will_disable"]) + + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + MaxConsecutiveFailures: 10, + ConsecutiveFailures: 5, + WillDisable: true, + Destination: dest, + AttemptResponse: map[string]any{ + "status": "error", + "data": map[string]any{"code": "ETIMEDOUT"}, }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithBearerToken("test-token")}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - // Create test server - ts := httptest.NewServer(http.HandlerFunc(tt.handler)) - defer ts.Close() - - // Create notifier - notifier := alert.NewHTTPAlertNotifier(ts.URL, tt.notifierOpts...) - - // Create test alert - dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} - testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ - MaxConsecutiveFailures: 10, - ConsecutiveFailures: 5, - WillDisable: true, - Destination: dest, - AttemptResponse: map[string]interface{}{ - "status": "error", - "data": map[string]any{"code": "ETIMEDOUT"}, - }, - }) - - // Send alert - err := notifier.Notify(context.Background(), testAlert) - - if tt.wantErr { - require.Error(t, err) - assert.ErrorContains(t, err, tt.errContains) - } else { - require.NoError(t, err) - } }) - } + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("successful notification with bearer token", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithBearerToken("test-token")) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + MaxConsecutiveFailures: 10, + ConsecutiveFailures: 5, + WillDisable: true, + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("server error returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "status 500") + }) + + t.Run("timeout returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithTimeout(50*time.Millisecond)) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to send alert") + }) }