diff --git a/Dockerfile b/Dockerfile index 22994fd..8690131 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,11 +18,12 @@ COPY --from=0 /artifacts/* / CMD /kafka-bridge -consumer_proxy_addr=$QUEUE_PROXY_ADDRS \ -consumer_group_id=$GROUP_ID \ - -consumer_offset=largest \ + -consumer_offset=latest \ -consumer_autocommit_enable=$CONSUMER_AUTOCOMMIT_ENABLE \ -consumer_authorization_key="$AUTHORIZATION_KEY" \ -topic=$TOPIC \ -producer_address=$PRODUCER_ADDRESS \ -producer_vulcan_auth="$PRODUCER_VULCAN_AUTH" \ -producer_type=$PRODUCER_TYPE \ - -service_name=$SERVICE_NAME + -service_name=$SERVICE_NAME \ + -region=$REGION diff --git a/main.go b/main.go index 6d2464d..b005f17 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ type BridgeApp struct { httpClient *http.Client serviceName string logger *log.UPPLogger + region string } const ( @@ -30,7 +31,7 @@ const ( proxy = "proxy" ) -func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset string, consumerAutoCommitEnable bool, consumerAuthorizationKey string, topic string, producerAddress string, producerVulcanAuth string, producerType string, serviceName string, logger *log.UPPLogger) *BridgeApp { +func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset string, consumerAutoCommitEnable bool, consumerAuthorizationKey string, topic string, producerAddress string, producerVulcanAuth string, producerType string, serviceName string, logger *log.UPPLogger, region string) *BridgeApp { consumerConfig := consumer.QueueConfig{} consumerConfig.Addrs = strings.Split(consumerAddrs, ",") consumerConfig.Group = consumerGroupID @@ -71,6 +72,7 @@ func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset s httpClient: httpClient, serviceName: serviceName, logger: logger, + region: region, } return app } @@ -89,6 +91,7 @@ func initBridgeApp() *BridgeApp { producerVulcanAuth := flag.String("producer_vulcan_auth", "", "Authentication string by which you access cms-notifier via vulcand.") producerType := flag.String("producer_type", proxy, "Two possible values are accepted: proxy - if the requests are going through the kafka-proxy; or plainHTTP if a normal http request is required.") serviceName := flag.String("service_name", "kafka-bridge", "The full name for the bridge app, like: `cms-kafka-bridge-pub-xp`") + region := flag.String("region", "", `Region in which the service is residing. Valid values: "eu", "us", or ""`) flag.Parse() @@ -96,7 +99,7 @@ func initBridgeApp() *BridgeApp { logger := log.NewUPPLogger(*serviceName, "INFO", logConf) logger.Info("Starting Kafka Bridge") - return newBridgeApp(*consumerAddrs, *consumerGroup, *consumerOffset, *consumerAutoCommitEnable, *consumerAuthorizationKey, *topic, *producerAddress, *producerVulcanAuth, *producerType, *serviceName, logger) + return newBridgeApp(*consumerAddrs, *consumerGroup, *consumerOffset, *consumerAutoCommitEnable, *consumerAuthorizationKey, *topic, *producerAddress, *producerVulcanAuth, *producerType, *serviceName, logger, *region) } func (app *BridgeApp) enableHealthchecksAndGTG(serviceName string) { diff --git a/message_forwarder.go b/message_forwarder.go index 8029550..c0e6ec5 100644 --- a/message_forwarder.go +++ b/message_forwarder.go @@ -3,32 +3,60 @@ package main import ( "errors" - queueProducer "github.com/Financial-Times/message-queue-go-producer/producer" - queueConsumer "github.com/Financial-Times/message-queue-gonsumer" + log "github.com/Financial-Times/go-logger/v2" + producer "github.com/Financial-Times/message-queue-go-producer/producer" + consumer "github.com/Financial-Times/message-queue-gonsumer" + "github.com/dchest/uniuri" ) -const tidValidRegexp = "(tid|SYNTHETIC-REQ-MON)[a-zA-Z0-9_-]*$" +func (app BridgeApp) forwardMsg(msg consumer.Message) { + if !isMsgForwardable(msg, app.region) { + app.logger.WithMonitoringEvent("Forwarding", msg.Headers["X-Request-Id"], "").Info("Message has been skipped. Origin region same as app region.") + return + } + + enrichMsg(&msg, app.region, app.logger) + + err := app.producerInstance.SendMessage("", producer.Message{Headers: msg.Headers, Body: msg.Body}) + if err != nil { + app.logger.WithError(err).WithMonitoringEvent("Forwarding", msg.Headers["X-Request-Id"], "").Error("Error happened during message forwarding") + return + } + + app.logger.WithMonitoringEvent("Forwarding", msg.Headers["X-Request-Id"], "").Info("Message has been forwarded") +} -func (app BridgeApp) forwardMsg(msg queueConsumer.Message) { +func enrichMsg(msg *consumer.Message, region string, log *log.UPPLogger) { tid, err := extractTID(msg.Headers) if err != nil { tid = "tid_" + uniuri.NewLen(10) + "_kafka_bridge" - app.logger.WithError(err).Info("Couldn't extract transaction id. TID was generated.") + log.WithError(err).Info("Couldn't extract transaction id. TID was generated.") } msg.Headers["X-Request-Id"] = tid - err = app.producerInstance.SendMessage("", queueProducer.Message{Headers: msg.Headers, Body: msg.Body}) - if err != nil { - app.logger.WithError(err).WithMonitoringEvent("Forwarding", tid, "").Error("Error happened during message forwarding") - } else { - app.logger.WithMonitoringEvent("Forwarding", tid, "").Info("Message has been forwarded") + + // Write region in synchronisation bridges only + if region == "eu" { + msg.Headers["Origin-Region"] = "us" + } else if region == "us" { + msg.Headers["Origin-Region"] = "eu" } } +func isMsgForwardable(msg consumer.Message, region string) bool { + origRegion := msg.Headers["Origin-Region"] + if region != "" && origRegion != "" && region == origRegion { + return false + } + + return true +} + func extractTID(headers map[string]string) (string, error) { header := headers["X-Request-Id"] if header == "" { - return "", errors.New("X-Request-Id header could not be found.") + return "", errors.New(`header "X-Request-Id" is empty or missing`) } + return header, nil } diff --git a/message_forwarder_test.go b/message_forwarder_test.go index 80279f3..d63059e 100644 --- a/message_forwarder_test.go +++ b/message_forwarder_test.go @@ -5,16 +5,20 @@ import ( "strings" "testing" - queueConsumer "github.com/Financial-Times/message-queue-gonsumer" + "github.com/stretchr/testify/assert" + + log "github.com/Financial-Times/go-logger/v2" + producer "github.com/Financial-Times/message-queue-go-producer/producer" + consumer "github.com/Financial-Times/message-queue-gonsumer" ) func TestExtractTID(t *testing.T) { var tests = []struct { - msg queueConsumer.Message + msg consumer.Message expectedTransactionID string expectedErrorMsg string }{ - {queueConsumer.Message{ + {consumer.Message{ Headers: map[string]string{ "Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c", "Message-Timestamp": "2015-07-06T07:03:09.362Z", @@ -28,7 +32,7 @@ func TestExtractTID(t *testing.T) { "", }, { - queueConsumer.Message{ + consumer.Message{ Headers: map[string]string{ "Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c", "Message-Timestamp": "2015-07-06T07:03:09.362Z", @@ -38,10 +42,10 @@ func TestExtractTID(t *testing.T) { }, Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`}, "", - "X-Request-Id header could not be found", + `header "X-Request-Id" is empty or missing`, }, { - queueConsumer.Message{ + consumer.Message{ Headers: map[string]string{ "Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c", "Message-Timestamp": "2015-07-06T07:03:09.362Z", @@ -67,27 +71,181 @@ func TestExtractTID(t *testing.T) { } } -func TestExtractTID_TIDRegexp(t *testing.T) { +// Tests that both enrichment and the check if a message is forwardable are performed in forwardMsg +func TestForwardMsg(t *testing.T) { + logger := log.NewUPPLogger("Test", "PANIC") + var tests = []struct { - header string - tid string + name string + region string + msg consumer.Message + expectedMsg producer.Message }{ - {"X-Request-Id:tid_ABCDe12345", "tid_ABCDe12345"}, - {"X-Request-Id: tid_ABCDe12345", "tid_ABCDe12345"}, - {"X-Request-Id: SYNTHETIC-REQ-MON_ABCDe12345", "SYNTHETIC-REQ-MON_ABCDe12345"}, - {"X-Request-Id: SYNTHETIC-REQ-MON_ABCDe12345", "SYNTHETIC-REQ-MON_ABCDe12345"}, - {"X-Request-Id: SYNTHETIC-REQ-MON-abcdefgh-1234-pqrs-5678-stuvwxyz", "SYNTHETIC-REQ-MON-abcdefgh-1234-pqrs-5678-stuvwxyz"}, - {"X-Request-Id: SYNTHETIC-REQ-MONabcdefgh-1234-pqrs-5678-stuvwxyz", "SYNTHETIC-REQ-MONabcdefgh-1234-pqrs-5678-stuvwxyz"}, - {"X-Request-Id: SYN-REQ-MON_ABCDe12345", ""}, - {"X-Request-Id: ABCDE12345", ""}, - {"X-Request-Id: tid_ABCDe1234%", ""}, + { + name: "Message is skipped successfully", + region: "eu", + msg: buildConsumerMessageWithRegion("eu"), + expectedMsg: producer.Message{}, + }, + { + name: "Origin-Region header is enriched successfully", + region: "eu", + msg: buildConsumerMessageWithRegion(""), + expectedMsg: producer.Message{ + Headers: map[string]string{ + "Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c", + "Message-Timestamp": "2015-07-06T07:03:09.362Z", + "Message-Type": "cms-content-published", + "Origin-System-Id": "http://cmdb.ft.com/systems/methode-web-pub", + "Origin-Region": "us", + "Content-Type": "application/json", + "X-Request-Id": "tid_t9happe59y", + }, + Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`}, + }, } - validRegexp := regexp.MustCompile(tidValidRegexp) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := &MockMessageProducer{} + app := &BridgeApp{ + producerInstance: p, + logger: logger, + region: test.region, + } - actualTID := validRegexp.FindString(test.header) - if actualTID != test.tid { - t.Errorf("\nHeader: %s\nExpectedTID: %s\nActualTID: %s\n", test.header, test.tid, actualTID) - } + app.forwardMsg(test.msg) + assert.Equal(t, test.expectedMsg, p.Msg) + }) + } +} + +type MockMessageProducer struct { + Msg producer.Message +} + +func (p *MockMessageProducer) SendMessage(uuid string, msg producer.Message) error { + p.Msg = msg + return nil +} + +func (p MockMessageProducer) ConnectivityCheck() (string, error) { + return "Connectivity is OK", nil +} + +func TestIsMsgForwardable(t *testing.T) { + var tests = []struct { + name string + region string + msg consumer.Message + expected bool + }{ + { + name: "Service without region forwards messages with no Origin-Region header", + region: "", + msg: buildConsumerMessageWithRegion(""), + expected: true, + }, + { + name: "Service without region forwards all regional messages", + region: "", + msg: buildConsumerMessageWithRegion("us"), + expected: true, + }, + { + name: "Same-region message to be skipped", + region: "eu", + msg: buildConsumerMessageWithRegion("eu"), + expected: false, + }, + { + name: "Different-region message to be forwarded", + region: "eu", + msg: buildConsumerMessageWithRegion("us"), + expected: true, + }, + { + name: "Message without Origin-Region header to be forwarded", + region: "eu", + msg: buildConsumerMessageWithRegion(""), + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, isMsgForwardable(test.msg, test.region)) + }) + } +} + +func TestEnrichMsg(t *testing.T) { + logger := log.NewUPPLogger("Test", "PANIC") + + var tests = []struct { + name string + msg consumer.Message + appRegion string + expectedMsgRegion string + }{ + { + name: `EU app writes messages with "us" region`, + msg: buildConsumerMessageWithRegion(""), + appRegion: "eu", + expectedMsgRegion: "us", + }, + { + name: `US app writes messages with "eu" region`, + msg: buildConsumerMessageWithRegion(""), + appRegion: "us", + expectedMsgRegion: "eu", + }, + { + name: `App with no region does not add "Origin-Region" header`, + msg: buildConsumerMessageWithRegion(""), + appRegion: "", + expectedMsgRegion: "", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + enrichMsg(&test.msg, test.appRegion, logger) + assert.Equal(t, test.expectedMsgRegion, test.msg.Headers["Origin-Region"]) + }) + } + + t.Run("Transaction ID added to message header", func(t *testing.T) { + msg := &consumer.Message{ + Headers: map[string]string{ + "Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c", + "Message-Timestamp": "2015-07-06T07:03:09.362Z", + "Message-Type": "cms-content-published", + "Origin-System-Id": "http://cmdb.ft.com/systems/methode-web-pub", + "Content-Type": "application/json", + }, + Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`} + enrichMsg(msg, "", logger) + + var validID = regexp.MustCompile(`^tid_.+_kafka_bridge$`) + assert.True(t, validID.MatchString(msg.Headers["X-Request-Id"])) + }) +} + +func buildConsumerMessageWithRegion(region string) consumer.Message { + msg := consumer.Message{ + Headers: map[string]string{ + "Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c", + "Message-Timestamp": "2015-07-06T07:03:09.362Z", + "Message-Type": "cms-content-published", + "Origin-System-Id": "http://cmdb.ft.com/systems/methode-web-pub", + "Content-Type": "application/json", + "X-Request-Id": "tid_t9happe59y", + }, + Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`} + + if region != "" { + msg.Headers["Origin-Region"] = region } + return msg }