Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 104 additions & 8 deletions cmd/e2e/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"net/http"
"sync"
"time"

"github.com/hookdeck/outpost/internal/models"
)

type AlertRequest struct {
Expand All @@ -18,17 +16,67 @@ type AlertRequest struct {
}

type AlertPayload struct {
Topic string `json:"topic"`
Timestamp time.Time `json:"timestamp"`
Data json.RawMessage `json:"data"`
}

// ConsecutiveFailureAlert is a parsed alert for "alert.consecutive_failure"
type ConsecutiveFailureAlert struct {
Topic string `json:"topic"`
Timestamp time.Time `json:"timestamp"`
Data ConsecutiveFailureData `json:"data"`
}

// DestinationDisabledAlert is a parsed alert for "alert.destination.disabled"
type DestinationDisabledAlert struct {
Topic string `json:"topic"`
Timestamp time.Time `json:"timestamp"`
Data DestinationDisabledData `json:"data"`
}

// AlertedEvent matches internal/alert.AlertedEvent
type AlertedEvent struct {
ID string `json:"id"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
Data map[string]any `json:"data"`
}

// AlertDestination matches internal/alert.AlertDestination
type AlertDestination struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Type string `json:"type"`
Topics []string `json:"topics"`
Filter map[string]any `json:"filter,omitempty"`
Config map[string]string `json:"config"`
Metadata map[string]string `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DisabledAt *time.Time `json:"disabled_at"`
}

// ConsecutiveFailureData matches internal/alert.ConsecutiveFailureData
type ConsecutiveFailureData struct {
MaxConsecutiveFailures int `json:"max_consecutive_failures"`
ConsecutiveFailures int `json:"consecutive_failures"`
WillDisable bool `json:"will_disable"`
Destination *models.Destination `json:"destination"`
Data map[string]interface{} `json:"data"`
TenantID string `json:"tenant_id"`
Event AlertedEvent `json:"event"`
MaxConsecutiveFailures int `json:"max_consecutive_failures"`
ConsecutiveFailures int `json:"consecutive_failures"`
WillDisable bool `json:"will_disable"`
Destination *AlertDestination `json:"destination"`
AttemptResponse map[string]any `json:"attempt_response"`
}

// DestinationDisabledData matches the expected payload for "alert.destination.disabled"
type DestinationDisabledData struct {
TenantID string `json:"tenant_id"`
Destination *AlertDestination `json:"destination"`
DisabledAt time.Time `json:"disabled_at"`
TriggeringEvent *AlertedEvent `json:"triggering_event,omitempty"`
ConsecutiveFailures int `json:"consecutive_failures"`
MaxConsecutiveFailures int `json:"max_consecutive_failures"`
AttemptResponse map[string]any `json:"attempt_response"`
}

type AlertMockServer struct {
Expand Down Expand Up @@ -117,14 +165,23 @@ func (s *AlertMockServer) handleAlert(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

// alertDataWithDestination is used to extract destination from any alert type
type alertDataWithDestination struct {
Destination *AlertDestination `json:"destination"`
}

// Helper methods for assertions
func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertRequest {
s.mu.RLock()
defer s.mu.RUnlock()

var filtered []AlertRequest
for _, alert := range s.alerts {
if alert.Alert.Data.Destination != nil && alert.Alert.Data.Destination.ID == destinationID {
var data alertDataWithDestination
if err := json.Unmarshal(alert.Alert.Data, &data); err != nil {
continue
}
if data.Destination != nil && data.Destination.ID == destinationID {
filtered = append(filtered, alert)
}
}
Expand All @@ -141,3 +198,42 @@ func (s *AlertMockServer) GetLastAlert() *AlertRequest {
alert := s.alerts[len(s.alerts)-1]
return &alert
}

// GetAlertsForDestinationByTopic returns alerts filtered by destination ID and topic
func (s *AlertMockServer) GetAlertsForDestinationByTopic(destinationID, topic string) []AlertRequest {
s.mu.RLock()
defer s.mu.RUnlock()

var filtered []AlertRequest
for _, alert := range s.alerts {
if alert.Alert.Topic != topic {
continue
}
var data alertDataWithDestination
if err := json.Unmarshal(alert.Alert.Data, &data); err != nil {
continue
}
if data.Destination != nil && data.Destination.ID == destinationID {
filtered = append(filtered, alert)
}
}
return filtered
}

// ParseConsecutiveFailureData parses the Data field as ConsecutiveFailureData
func (a *AlertRequest) ParseConsecutiveFailureData() (*ConsecutiveFailureData, error) {
var data ConsecutiveFailureData
if err := json.Unmarshal(a.Alert.Data, &data); err != nil {
return nil, err
}
return &data, nil
}

// ParseDestinationDisabledData parses the Data field as DestinationDisabledData
func (a *AlertRequest) ParseDestinationDisabledData() (*DestinationDisabledData, error) {
var data DestinationDisabledData
if err := json.Unmarshal(a.Alert.Data, &data); err != nil {
return nil, err
}
return &data, nil
}
141 changes: 134 additions & 7 deletions cmd/e2e/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() {
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

// Publish 20 failing events
for i := 0; i < 20; i++ {
for i := range 20 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
Expand All @@ -22,14 +22,48 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() {

// Wait for 4 alert callbacks to be processed
s.waitForAlerts(dest.ID, 4)
alerts := s.alertServer.GetAlertsForDestination(dest.ID)
alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure")
s.Require().Len(alerts, 4, "should have 4 alerts")

expectedCounts := []int{10, 14, 18, 20}
for i, alert := range alerts {
// Parse alert data
data, err := alert.ParseConsecutiveFailureData()
s.Require().NoError(err, "failed to parse consecutive failure data")

// Auth header assertion
s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match")
s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures,

// Topic assertion
s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure")

// TenantID assertion
s.NotEmpty(data.TenantID, "alert should have tenant_id")
s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match")

// Destination assertions
s.Require().NotNil(data.Destination, "alert should have destination")
s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match")
s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match")
s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook")

// Event assertions
s.NotEmpty(data.Event.ID, "alert event should have ID")
s.Equal("user.created", data.Event.Topic, "alert event topic should match")
s.NotNil(data.Event.Data, "alert event should have data")

// ConsecutiveFailures assertions
s.Equal(expectedCounts[i], data.ConsecutiveFailures,
"alert %d should have %d consecutive failures", i, expectedCounts[i])
s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20")

// WillDisable assertion (should be true for last alert only)
if i == len(alerts)-1 {
s.True(data.WillDisable, "last alert should have will_disable=true")
}

// AttemptResponse assertion
s.NotNil(data.AttemptResponse, "alert should have attempt_response")
}
}

Expand All @@ -38,7 +72,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() {
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

// First batch: 14 failures
for i := 0; i < 14; i++ {
for i := range 14 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
Expand All @@ -56,7 +90,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() {
s.waitForNewMockServerEvents(dest.mockID, 15)

// Second batch: 14 more failures
for i := 0; i < 14; i++ {
for i := range 14 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
Expand All @@ -71,13 +105,106 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() {

// Wait for 4 alert callbacks: [10, 14] from first batch, [10, 14] from second batch
s.waitForAlerts(dest.ID, 4)
alerts := s.alertServer.GetAlertsForDestination(dest.ID)
alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure")
s.Require().Len(alerts, 4, "should have 4 alerts")

expectedCounts := []int{10, 14, 10, 14}
for i, alert := range alerts {
// Parse alert data
data, err := alert.ParseConsecutiveFailureData()
s.Require().NoError(err, "failed to parse consecutive failure data")

// Auth header assertion
s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match")
s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures,

// Topic assertion
s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure")

// TenantID assertion
s.NotEmpty(data.TenantID, "alert should have tenant_id")
s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match")

// Destination assertions
s.Require().NotNil(data.Destination, "alert should have destination")
s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match")
s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match")
s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook")

// Event assertions
s.NotEmpty(data.Event.ID, "alert event should have ID")
s.Equal("user.created", data.Event.Topic, "alert event topic should match")
s.NotNil(data.Event.Data, "alert event should have data")

// ConsecutiveFailures assertions
s.Equal(expectedCounts[i], data.ConsecutiveFailures,
"alert %d should have %d consecutive failures", i, expectedCounts[i])
s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20")

// WillDisable assertion (none should have will_disable=true since counter resets)
s.False(data.WillDisable, "alert %d should have will_disable=false (counter resets)", i)

// AttemptResponse assertion
s.NotNil(data.AttemptResponse, "alert should have attempt_response")
}
}

func (s *basicSuite) TestAlerts_DestinationDisabledCallback() {
tenant := s.createTenant()
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

// Publish 20 failing events to trigger auto-disable
for i := range 20 {
s.publish(tenant.ID, "user.created", map[string]any{
"index": i,
}, withPublishMetadata(map[string]string{"should_err": "true"}))
}

// Wait for destination to be disabled (sync point for all 20 deliveries)
s.waitForNewDestinationDisabled(tenant.ID, dest.ID)

// Verify destination is disabled
got := s.getDestination(tenant.ID, dest.ID)
s.NotNil(got.DisabledAt, "destination should be disabled")

// Wait for the destination.disabled alert callback
s.waitForAlertsByTopic(dest.ID, "alert.destination.disabled", 1)
alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.disabled")
s.Require().Len(alerts, 1, "should have 1 destination.disabled alert")

alert := alerts[0]
data, err := alert.ParseDestinationDisabledData()
s.Require().NoError(err, "failed to parse destination disabled data")

// Auth header assertion
s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match")

// Topic assertion
s.Equal("alert.destination.disabled", alert.Alert.Topic, "alert topic should be alert.destination.disabled")

// TenantID assertion
s.NotEmpty(data.TenantID, "alert should have tenant_id")
s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match")

// Destination assertions
s.Require().NotNil(data.Destination, "alert should have destination")
s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match")
s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match")
s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook")
s.NotNil(data.Destination.DisabledAt, "destination should have disabled_at set")

// DisabledAt assertion
s.False(data.DisabledAt.IsZero(), "disabled_at should not be zero")

// TriggeringEvent assertions (optional but expected)
if data.TriggeringEvent != nil {
s.NotEmpty(data.TriggeringEvent.ID, "triggering event should have ID")
s.Equal("user.created", data.TriggeringEvent.Topic, "triggering event topic should match")
}

// ConsecutiveFailures assertions
s.Equal(20, data.ConsecutiveFailures, "consecutive_failures should be 20")
s.Equal(20, data.MaxConsecutiveFailures, "max_consecutive_failures should be 20")

// AttemptResponse assertion
s.NotNil(data.AttemptResponse, "alert should have attempt_response")
}
17 changes: 17 additions & 0 deletions cmd/e2e/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,23 @@ func (s *basicSuite) waitForAlerts(destID string, count int) {
s.Require().FailNowf("timeout", "timed out waiting for %d alerts for %s (got %d)", count, destID, lastCount)
}

// waitForAlertsByTopic polls until at least count alerts with the specific topic exist for the destination.
func (s *basicSuite) waitForAlertsByTopic(destID, topic string, count int) {
s.T().Helper()
timeout := alertPollTimeout
deadline := time.Now().Add(timeout)
var lastCount int

for time.Now().Before(deadline) {
lastCount = len(s.alertServer.GetAlertsForDestinationByTopic(destID, topic))
if lastCount >= count {
return
}
time.Sleep(100 * time.Millisecond)
}
s.Require().FailNowf("timeout", "timed out waiting for %d %s alerts for %s (got %d)", count, topic, destID, lastCount)
}

// =============================================================================
// Absence assertion
// =============================================================================
Expand Down
Loading
Loading