From aef29919d8d7b2433dac9ae938027d9d08acbe83 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 1 Feb 2026 18:49:58 +0700 Subject: [PATCH] feat: return matched destinations in publish api --- cmd/e2e/delivery_pipeline_test.go | 3 ++- cmd/e2e/helpers_test.go | 5 +++-- docs/apis/openapi.yaml | 7 +++++++ internal/apirouter/publish_handlers_test.go | 10 ++++++---- internal/apirouter/router_test.go | 2 +- internal/publishmq/eventhandler.go | 14 ++++++++++---- internal/publishmq/eventhandler_test.go | 8 ++++++++ 7 files changed, 37 insertions(+), 12 deletions(-) diff --git a/cmd/e2e/delivery_pipeline_test.go b/cmd/e2e/delivery_pipeline_test.go index 014b3f5c..d030398b 100644 --- a/cmd/e2e/delivery_pipeline_test.go +++ b/cmd/e2e/delivery_pipeline_test.go @@ -11,9 +11,10 @@ func (s *basicSuite) TestDeliveryPipeline_PublishDeliversToWebhook() { tenant := s.createTenant() dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) - s.publish(tenant.ID, "user.created", map[string]any{ + resp := s.publish(tenant.ID, "user.created", map[string]any{ "event_id": "delivery_test_1", }) + s.Equal([]string{dest.ID}, resp.DestinationIDs) // Verify mock server received the event events := s.waitForNewMockServerEvents(dest.mockID, 1) diff --git a/cmd/e2e/helpers_test.go b/cmd/e2e/helpers_test.go index edfb6208..852d390f 100644 --- a/cmd/e2e/helpers_test.go +++ b/cmd/e2e/helpers_test.go @@ -57,8 +57,9 @@ type destinationResponse struct { } type publishResponse struct { - ID string `json:"id"` - Duplicate bool `json:"duplicate"` + ID string `json:"id"` + Duplicate bool `json:"duplicate"` + DestinationIDs []string `json:"destination_ids"` } type mockServerEvent struct { diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index dc6491a2..e9eddb77 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -1698,6 +1698,7 @@ components: required: - id - duplicate + - destination_ids properties: id: type: string @@ -1707,6 +1708,12 @@ components: type: boolean description: Whether this event was already processed (idempotency hit). If true, the event was not queued again. example: false + destination_ids: + type: array + items: + type: string + description: The IDs of destinations that matched this event. Empty array if no destinations matched. + example: ["des_456", "des_789"] RetryRequest: type: object description: Request body for retrying event delivery to a destination. diff --git a/internal/apirouter/publish_handlers_test.go b/internal/apirouter/publish_handlers_test.go index ba02c407..b455591a 100644 --- a/internal/apirouter/publish_handlers_test.go +++ b/internal/apirouter/publish_handlers_test.go @@ -147,9 +147,9 @@ func TestAPI_Publish(t *testing.T) { }) t.Run("Success", func(t *testing.T) { - t.Run("returns event ID", func(t *testing.T) { + t.Run("returns event ID and destination_ids", func(t *testing.T) { h := newAPITest(t) - h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123"} + h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123", DestinationIDs: []string{"d1"}} req := h.jsonReq(http.MethodPost, "/api/v1/publish", map[string]any{ "tenant_id": "t1", @@ -162,11 +162,12 @@ func TestAPI_Publish(t *testing.T) { require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result)) assert.Equal(t, "evt-123", result.EventID) assert.False(t, result.Duplicate) + assert.Equal(t, []string{"d1"}, result.DestinationIDs) }) - t.Run("returns duplicate flag", func(t *testing.T) { + t.Run("returns duplicate flag with empty destination_ids", func(t *testing.T) { h := newAPITest(t) - h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123", Duplicate: true} + h.eventHandler.result = &publishmq.HandleResult{EventID: "evt-123", Duplicate: true, DestinationIDs: []string{}} req := h.jsonReq(http.MethodPost, "/api/v1/publish", map[string]any{ "tenant_id": "t1", @@ -178,6 +179,7 @@ func TestAPI_Publish(t *testing.T) { var result publishmq.HandleResult require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &result)) assert.True(t, result.Duplicate) + assert.Empty(t, result.DestinationIDs) }) }) diff --git a/internal/apirouter/router_test.go b/internal/apirouter/router_test.go index 66f036ee..a97b65a1 100644 --- a/internal/apirouter/router_test.go +++ b/internal/apirouter/router_test.go @@ -195,7 +195,7 @@ func (m *mockEventHandler) Handle(_ context.Context, event *models.Event) (*publ if m.result != nil { return m.result, nil } - return &publishmq.HandleResult{EventID: event.ID}, nil + return &publishmq.HandleResult{EventID: event.ID, DestinationIDs: []string{}}, nil } // stubRegistry is a minimal destregistry.Registry for test setup. diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index 36a4f470..cbfc9adf 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -26,8 +26,9 @@ type EventHandler interface { } type HandleResult struct { - EventID string `json:"id"` - Duplicate bool `json:"duplicate"` + EventID string `json:"id"` + Duplicate bool `json:"duplicate"` + DestinationIDs []string `json:"destination_ids"` } type eventHandler struct { @@ -101,9 +102,14 @@ func (h *eventHandler) Handle(ctx context.Context, event *models.Event) (*Handle } } + if matchedDestinations == nil { + matchedDestinations = []string{} + } + result := &HandleResult{ - EventID: event.ID, - Duplicate: false, + EventID: event.ID, + Duplicate: false, + DestinationIDs: matchedDestinations, } // Early return if no destinations matched diff --git a/internal/publishmq/eventhandler_test.go b/internal/publishmq/eventhandler_test.go index b7a53b97..a10e6e5b 100644 --- a/internal/publishmq/eventhandler_test.go +++ b/internal/publishmq/eventhandler_test.go @@ -259,6 +259,7 @@ func TestEventHandler_HandleResult(t *testing.T) { require.NotNil(t, result) require.Equal(t, event.ID, result.EventID) require.False(t, result.Duplicate) + require.Len(t, result.DestinationIDs, 3) }) t.Run("no destinations matched", func(t *testing.T) { @@ -272,6 +273,7 @@ func TestEventHandler_HandleResult(t *testing.T) { require.NotNil(t, result) require.Equal(t, event.ID, result.EventID) require.False(t, result.Duplicate) + require.Empty(t, result.DestinationIDs) }) t.Run("duplicate event - idempotency", func(t *testing.T) { @@ -291,11 +293,13 @@ func TestEventHandler_HandleResult(t *testing.T) { result1, err := eventHandler.Handle(ctx, event) require.NoError(t, err) require.False(t, result1.Duplicate) + require.Len(t, result1.DestinationIDs, 1) // Duplicate request result2, err := eventHandler.Handle(ctx, event) require.NoError(t, err) require.True(t, result2.Duplicate) // Duplicate due to idempotency + require.Len(t, result2.DestinationIDs, 1) }) t.Run("with destination_id - queued", func(t *testing.T) { @@ -314,6 +318,7 @@ func TestEventHandler_HandleResult(t *testing.T) { result, err := eventHandler.Handle(ctx, event) require.NoError(t, err) require.False(t, result.Duplicate) + require.Equal(t, []string{dest.ID}, result.DestinationIDs) }) t.Run("with destination_id - duplicate event", func(t *testing.T) { @@ -358,6 +363,7 @@ func TestEventHandler_HandleResult(t *testing.T) { result, err := eventHandler.Handle(ctx, event) require.NoError(t, err) require.False(t, result.Duplicate) + require.Empty(t, result.DestinationIDs) }) t.Run("with destination_id - not found", func(t *testing.T) { @@ -370,6 +376,7 @@ func TestEventHandler_HandleResult(t *testing.T) { result, err := eventHandler.Handle(ctx, event) require.NoError(t, err) require.False(t, result.Duplicate) + require.Empty(t, result.DestinationIDs) }) t.Run("with destination_id - topic mismatch", func(t *testing.T) { @@ -388,6 +395,7 @@ func TestEventHandler_HandleResult(t *testing.T) { result, err := eventHandler.Handle(ctx, event) require.NoError(t, err) require.False(t, result.Duplicate) + require.Empty(t, result.DestinationIDs) }) }