Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/e2e/delivery_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ func (s *basicSuite) TestDeliveryPipeline_PublishDeliversToWebhook() {
tenant := s.createTenant()
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))

s.publish(tenant.ID, "user.created", map[string]any{
resp := s.publish(tenant.ID, "user.created", map[string]any{
"event_id": "delivery_test_1",
})
s.Equal([]string{dest.ID}, resp.DestinationIDs)

// Verify mock server received the event
events := s.waitForNewMockServerEvents(dest.mockID, 1)
Expand Down
5 changes: 3 additions & 2 deletions cmd/e2e/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ type destinationResponse struct {
}

type publishResponse struct {
ID string `json:"id"`
Duplicate bool `json:"duplicate"`
ID string `json:"id"`
Duplicate bool `json:"duplicate"`
DestinationIDs []string `json:"destination_ids"`
}

type mockServerEvent struct {
Expand Down
7 changes: 7 additions & 0 deletions docs/apis/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,7 @@ components:
required:
- id
- duplicate
- destination_ids
properties:
id:
type: string
Expand All @@ -1707,6 +1708,12 @@ components:
type: boolean
description: Whether this event was already processed (idempotency hit). If true, the event was not queued again.
example: false
destination_ids:
type: array
items:
type: string
description: The IDs of destinations that matched this event. Empty array if no destinations matched.
example: ["des_456", "des_789"]
RetryRequest:
type: object
description: Request body for retrying event delivery to a destination.
Expand Down
10 changes: 6 additions & 4 deletions internal/apirouter/publish_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func TestAPI_Publish(t *testing.T) {
})

t.Run("Success", func(t *testing.T) {
t.Run("returns event ID", func(t *testing.T) {
t.Run("returns event ID and destination_ids", func(t *testing.T) {
h := newAPITest(t)
h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123"}
h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123", DestinationIDs: []string{"d1"}}

req := h.jsonReq(http.MethodPost, "/api/v1/publish", map[string]any{
"tenant_id": "t1",
Expand All @@ -162,11 +162,12 @@ func TestAPI_Publish(t *testing.T) {
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result))
assert.Equal(t, "evt-123", result.EventID)
assert.False(t, result.Duplicate)
assert.Equal(t, []string{"d1"}, result.DestinationIDs)
})

t.Run("returns duplicate flag", func(t *testing.T) {
t.Run("returns duplicate flag with empty destination_ids", func(t *testing.T) {
h := newAPITest(t)
h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123", Duplicate: true}
h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123", Duplicate: true, DestinationIDs: []string{}}

req := h.jsonReq(http.MethodPost, "/api/v1/publish", map[string]any{
"tenant_id": "t1",
Expand All @@ -178,6 +179,7 @@ func TestAPI_Publish(t *testing.T) {
var result publishmq.HandleResult
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result))
assert.True(t, result.Duplicate)
assert.Empty(t, result.DestinationIDs)
})
})

Expand Down
2 changes: 1 addition & 1 deletion internal/apirouter/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (m *mockEventHandler) Handle(_ context.Context, event *models.Event) (*publ
if m.result != nil {
return m.result, nil
}
return &publishmq.HandleResult{EventID: event.ID}, nil
return &publishmq.HandleResult{EventID: event.ID, DestinationIDs: []string{}}, nil
}

// stubRegistry is a minimal destregistry.Registry for test setup.
Expand Down
14 changes: 10 additions & 4 deletions internal/publishmq/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type EventHandler interface {
}

type HandleResult struct {
EventID string `json:"id"`
Duplicate bool `json:"duplicate"`
EventID string `json:"id"`
Duplicate bool `json:"duplicate"`
DestinationIDs []string `json:"destination_ids"`
}

type eventHandler struct {
Expand Down Expand Up @@ -101,9 +102,14 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle
}
}

if matchedDestinations == nil {
matchedDestinations = []string{}
}

result := &HandleResult{
EventID: event.ID,
Duplicate: false,
EventID: event.ID,
Duplicate: false,
DestinationIDs: matchedDestinations,
}

// Early return if no destinations matched
Expand Down
8 changes: 8 additions & 0 deletions internal/publishmq/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func TestEventHandler_HandleResult(t *testing.T) {
require.NotNil(t, result)
require.Equal(t, event.ID, result.EventID)
require.False(t, result.Duplicate)
require.Len(t, result.DestinationIDs, 3)
})

t.Run("no destinations matched", func(t *testing.T) {
Expand All @@ -272,6 +273,7 @@ func TestEventHandler_HandleResult(t *testing.T) {
require.NotNil(t, result)
require.Equal(t, event.ID, result.EventID)
require.False(t, result.Duplicate)
require.Empty(t, result.DestinationIDs)
})

t.Run("duplicate event - idempotency", func(t *testing.T) {
Expand All @@ -291,11 +293,13 @@ func TestEventHandler_HandleResult(t *testing.T) {
result1, err := eventHandler.Handle(ctx, event)
require.NoError(t, err)
require.False(t, result1.Duplicate)
require.Len(t, result1.DestinationIDs, 1)

// Duplicate request
result2, err := eventHandler.Handle(ctx, event)
require.NoError(t, err)
require.True(t, result2.Duplicate) // Duplicate due to idempotency
require.Len(t, result2.DestinationIDs, 1)
})

t.Run("with destination_id - queued", func(t *testing.T) {
Expand All @@ -314,6 +318,7 @@ func TestEventHandler_HandleResult(t *testing.T) {
result, err := eventHandler.Handle(ctx, event)
require.NoError(t, err)
require.False(t, result.Duplicate)
require.Equal(t, []string{dest.ID}, result.DestinationIDs)
})

t.Run("with destination_id - duplicate event", func(t *testing.T) {
Expand Down Expand Up @@ -358,6 +363,7 @@ func TestEventHandler_HandleResult(t *testing.T) {
result, err := eventHandler.Handle(ctx, event)
require.NoError(t, err)
require.False(t, result.Duplicate)
require.Empty(t, result.DestinationIDs)
})

t.Run("with destination_id - not found", func(t *testing.T) {
Expand All @@ -370,6 +376,7 @@ func TestEventHandler_HandleResult(t *testing.T) {
result, err := eventHandler.Handle(ctx, event)
require.NoError(t, err)
require.False(t, result.Duplicate)
require.Empty(t, result.DestinationIDs)
})

t.Run("with destination_id - topic mismatch", func(t *testing.T) {
Expand All @@ -388,6 +395,7 @@ func TestEventHandler_HandleResult(t *testing.T) {
result, err := eventHandler.Handle(ctx, event)
require.NoError(t, err)
require.False(t, result.Duplicate)
require.Empty(t, result.DestinationIDs)
})
}

Expand Down
Loading