Skip to content
This repository was archived by the owner on Dec 7, 2022. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ COPY --from=0 /artifacts/* /

CMD /kafka-bridge -consumer_proxy_addr=$QUEUE_PROXY_ADDRS \
-consumer_group_id=$GROUP_ID \
-consumer_offset=largest \
-consumer_offset=latest \
-consumer_autocommit_enable=$CONSUMER_AUTOCOMMIT_ENABLE \
-consumer_authorization_key="$AUTHORIZATION_KEY" \
-topic=$TOPIC \
-producer_address=$PRODUCER_ADDRESS \
-producer_vulcan_auth="$PRODUCER_VULCAN_AUTH" \
-producer_type=$PRODUCER_TYPE \
-service_name=$SERVICE_NAME
-service_name=$SERVICE_NAME \
-region=$REGION
7 changes: 5 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ type BridgeApp struct {
httpClient *http.Client
serviceName string
logger *log.UPPLogger
region string
}

const (
plainHTTP = "plainHTTP"
proxy = "proxy"
)

func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset string, consumerAutoCommitEnable bool, consumerAuthorizationKey string, topic string, producerAddress string, producerVulcanAuth string, producerType string, serviceName string, logger *log.UPPLogger) *BridgeApp {
func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset string, consumerAutoCommitEnable bool, consumerAuthorizationKey string, topic string, producerAddress string, producerVulcanAuth string, producerType string, serviceName string, logger *log.UPPLogger, region string) *BridgeApp {
consumerConfig := consumer.QueueConfig{}
consumerConfig.Addrs = strings.Split(consumerAddrs, ",")
consumerConfig.Group = consumerGroupID
Expand Down Expand Up @@ -71,6 +72,7 @@ func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset s
httpClient: httpClient,
serviceName: serviceName,
logger: logger,
region: region,
}
return app
}
Expand All @@ -89,14 +91,15 @@ func initBridgeApp() *BridgeApp {
producerVulcanAuth := flag.String("producer_vulcan_auth", "", "Authentication string by which you access cms-notifier via vulcand.")
producerType := flag.String("producer_type", proxy, "Two possible values are accepted: proxy - if the requests are going through the kafka-proxy; or plainHTTP if a normal http request is required.")
serviceName := flag.String("service_name", "kafka-bridge", "The full name for the bridge app, like: `cms-kafka-bridge-pub-xp`")
region := flag.String("region", "", `Region in which the service is residing. Valid values: "eu", "us", or ""`)

flag.Parse()

logConf := log.KeyNamesConfig{KeyTime: "@time"}
logger := log.NewUPPLogger(*serviceName, "INFO", logConf)
logger.Info("Starting Kafka Bridge")

return newBridgeApp(*consumerAddrs, *consumerGroup, *consumerOffset, *consumerAutoCommitEnable, *consumerAuthorizationKey, *topic, *producerAddress, *producerVulcanAuth, *producerType, *serviceName, logger)
return newBridgeApp(*consumerAddrs, *consumerGroup, *consumerOffset, *consumerAutoCommitEnable, *consumerAuthorizationKey, *topic, *producerAddress, *producerVulcanAuth, *producerType, *serviceName, logger, *region)
}

func (app *BridgeApp) enableHealthchecksAndGTG(serviceName string) {
Expand Down
50 changes: 39 additions & 11 deletions message_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,60 @@ package main
import (
"errors"

queueProducer "github.com/Financial-Times/message-queue-go-producer/producer"
queueConsumer "github.com/Financial-Times/message-queue-gonsumer"
log "github.com/Financial-Times/go-logger/v2"
producer "github.com/Financial-Times/message-queue-go-producer/producer"
consumer "github.com/Financial-Times/message-queue-gonsumer"

"github.com/dchest/uniuri"
)

const tidValidRegexp = "(tid|SYNTHETIC-REQ-MON)[a-zA-Z0-9_-]*$"
func (app BridgeApp) forwardMsg(msg consumer.Message) {
if !isMsgForwardable(msg, app.region) {
app.logger.WithMonitoringEvent("Forwarding", msg.Headers["X-Request-Id"], "").Info("Message has been skipped. Origin region same as app region.")
return
}

enrichMsg(&msg, app.region, app.logger)

err := app.producerInstance.SendMessage("", producer.Message{Headers: msg.Headers, Body: msg.Body})
if err != nil {
app.logger.WithError(err).WithMonitoringEvent("Forwarding", msg.Headers["X-Request-Id"], "").Error("Error happened during message forwarding")
return
}

app.logger.WithMonitoringEvent("Forwarding", msg.Headers["X-Request-Id"], "").Info("Message has been forwarded")
}

func (app BridgeApp) forwardMsg(msg queueConsumer.Message) {
func enrichMsg(msg *consumer.Message, region string, log *log.UPPLogger) {
tid, err := extractTID(msg.Headers)
if err != nil {
tid = "tid_" + uniuri.NewLen(10) + "_kafka_bridge"
app.logger.WithError(err).Info("Couldn't extract transaction id. TID was generated.")
log.WithError(err).Info("Couldn't extract transaction id. TID was generated.")
}
msg.Headers["X-Request-Id"] = tid
err = app.producerInstance.SendMessage("", queueProducer.Message{Headers: msg.Headers, Body: msg.Body})
if err != nil {
app.logger.WithError(err).WithMonitoringEvent("Forwarding", tid, "").Error("Error happened during message forwarding")
} else {
app.logger.WithMonitoringEvent("Forwarding", tid, "").Info("Message has been forwarded")

// Write region in synchronisation bridges only
if region == "eu" {
msg.Headers["Origin-Region"] = "us"
} else if region == "us" {
msg.Headers["Origin-Region"] = "eu"
}
}

func isMsgForwardable(msg consumer.Message, region string) bool {
origRegion := msg.Headers["Origin-Region"]
if region != "" && origRegion != "" && region == origRegion {
return false
}

return true
}

func extractTID(headers map[string]string) (string, error) {
header := headers["X-Request-Id"]
if header == "" {
return "", errors.New("X-Request-Id header could not be found.")
return "", errors.New(`header "X-Request-Id" is empty or missing`)
}

return header, nil
}
204 changes: 181 additions & 23 deletions message_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ import (
"strings"
"testing"

queueConsumer "github.com/Financial-Times/message-queue-gonsumer"
"github.com/stretchr/testify/assert"

log "github.com/Financial-Times/go-logger/v2"
producer "github.com/Financial-Times/message-queue-go-producer/producer"
consumer "github.com/Financial-Times/message-queue-gonsumer"
)

func TestExtractTID(t *testing.T) {
var tests = []struct {
msg queueConsumer.Message
msg consumer.Message
expectedTransactionID string
expectedErrorMsg string
}{
{queueConsumer.Message{
{consumer.Message{
Headers: map[string]string{
"Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c",
"Message-Timestamp": "2015-07-06T07:03:09.362Z",
Expand All @@ -28,7 +32,7 @@ func TestExtractTID(t *testing.T) {
"",
},
{
queueConsumer.Message{
consumer.Message{
Headers: map[string]string{
"Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c",
"Message-Timestamp": "2015-07-06T07:03:09.362Z",
Expand All @@ -38,10 +42,10 @@ func TestExtractTID(t *testing.T) {
},
Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`},
"",
"X-Request-Id header could not be found",
`header "X-Request-Id" is empty or missing`,
},
{
queueConsumer.Message{
consumer.Message{
Headers: map[string]string{
"Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c",
"Message-Timestamp": "2015-07-06T07:03:09.362Z",
Expand All @@ -67,27 +71,181 @@ func TestExtractTID(t *testing.T) {
}
}

func TestExtractTID_TIDRegexp(t *testing.T) {
// Tests that both enrichment and the check if a message is forwardable are performed in forwardMsg
func TestForwardMsg(t *testing.T) {
logger := log.NewUPPLogger("Test", "PANIC")

var tests = []struct {
header string
tid string
name string
region string
msg consumer.Message
expectedMsg producer.Message
}{
{"X-Request-Id:tid_ABCDe12345", "tid_ABCDe12345"},
{"X-Request-Id: tid_ABCDe12345", "tid_ABCDe12345"},
{"X-Request-Id: SYNTHETIC-REQ-MON_ABCDe12345", "SYNTHETIC-REQ-MON_ABCDe12345"},
{"X-Request-Id: SYNTHETIC-REQ-MON_ABCDe12345", "SYNTHETIC-REQ-MON_ABCDe12345"},
{"X-Request-Id: SYNTHETIC-REQ-MON-abcdefgh-1234-pqrs-5678-stuvwxyz", "SYNTHETIC-REQ-MON-abcdefgh-1234-pqrs-5678-stuvwxyz"},
{"X-Request-Id: SYNTHETIC-REQ-MONabcdefgh-1234-pqrs-5678-stuvwxyz", "SYNTHETIC-REQ-MONabcdefgh-1234-pqrs-5678-stuvwxyz"},
{"X-Request-Id: SYN-REQ-MON_ABCDe12345", ""},
{"X-Request-Id: ABCDE12345", ""},
{"X-Request-Id: tid_ABCDe1234%", ""},
{
name: "Message is skipped successfully",
region: "eu",
msg: buildConsumerMessageWithRegion("eu"),
expectedMsg: producer.Message{},
},
{
name: "Origin-Region header is enriched successfully",
region: "eu",
msg: buildConsumerMessageWithRegion(""),
expectedMsg: producer.Message{
Headers: map[string]string{
"Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c",
"Message-Timestamp": "2015-07-06T07:03:09.362Z",
"Message-Type": "cms-content-published",
"Origin-System-Id": "http://cmdb.ft.com/systems/methode-web-pub",
"Origin-Region": "us",
"Content-Type": "application/json",
"X-Request-Id": "tid_t9happe59y",
},
Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`},
},
}
validRegexp := regexp.MustCompile(tidValidRegexp)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := &MockMessageProducer{}
app := &BridgeApp{
producerInstance: p,
logger: logger,
region: test.region,
}

actualTID := validRegexp.FindString(test.header)
if actualTID != test.tid {
t.Errorf("\nHeader: %s\nExpectedTID: %s\nActualTID: %s\n", test.header, test.tid, actualTID)
}
app.forwardMsg(test.msg)
assert.Equal(t, test.expectedMsg, p.Msg)
})
}
}

type MockMessageProducer struct {
Msg producer.Message
}

func (p *MockMessageProducer) SendMessage(uuid string, msg producer.Message) error {
p.Msg = msg
return nil
}

func (p MockMessageProducer) ConnectivityCheck() (string, error) {
return "Connectivity is OK", nil
}

func TestIsMsgForwardable(t *testing.T) {
var tests = []struct {
name string
region string
msg consumer.Message
expected bool
}{
{
name: "Service without region forwards messages with no Origin-Region header",
region: "",
msg: buildConsumerMessageWithRegion(""),
expected: true,
},
{
name: "Service without region forwards all regional messages",
region: "",
msg: buildConsumerMessageWithRegion("us"),
expected: true,
},
{
name: "Same-region message to be skipped",
region: "eu",
msg: buildConsumerMessageWithRegion("eu"),
expected: false,
},
{
name: "Different-region message to be forwarded",
region: "eu",
msg: buildConsumerMessageWithRegion("us"),
expected: true,
},
{
name: "Message without Origin-Region header to be forwarded",
region: "eu",
msg: buildConsumerMessageWithRegion(""),
expected: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expected, isMsgForwardable(test.msg, test.region))
})
}
}

func TestEnrichMsg(t *testing.T) {
logger := log.NewUPPLogger("Test", "PANIC")

var tests = []struct {
name string
msg consumer.Message
appRegion string
expectedMsgRegion string
}{
{
name: `EU app writes messages with "us" region`,
msg: buildConsumerMessageWithRegion(""),
appRegion: "eu",
expectedMsgRegion: "us",
},
{
name: `US app writes messages with "eu" region`,
msg: buildConsumerMessageWithRegion(""),
appRegion: "us",
expectedMsgRegion: "eu",
},
{
name: `App with no region does not add "Origin-Region" header`,
msg: buildConsumerMessageWithRegion(""),
appRegion: "",
expectedMsgRegion: "",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
enrichMsg(&test.msg, test.appRegion, logger)
assert.Equal(t, test.expectedMsgRegion, test.msg.Headers["Origin-Region"])
})
}

t.Run("Transaction ID added to message header", func(t *testing.T) {
msg := &consumer.Message{
Headers: map[string]string{
"Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c",
"Message-Timestamp": "2015-07-06T07:03:09.362Z",
"Message-Type": "cms-content-published",
"Origin-System-Id": "http://cmdb.ft.com/systems/methode-web-pub",
"Content-Type": "application/json",
},
Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`}
enrichMsg(msg, "", logger)

var validID = regexp.MustCompile(`^tid_.+_kafka_bridge$`)
assert.True(t, validID.MatchString(msg.Headers["X-Request-Id"]))
})
}

func buildConsumerMessageWithRegion(region string) consumer.Message {
msg := consumer.Message{
Headers: map[string]string{
"Message-Id": "fc429b46-2500-4fe7-88bb-fd507fbaf00c",
"Message-Timestamp": "2015-07-06T07:03:09.362Z",
"Message-Type": "cms-content-published",
"Origin-System-Id": "http://cmdb.ft.com/systems/methode-web-pub",
"Content-Type": "application/json",
"X-Request-Id": "tid_t9happe59y",
},
Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`}

if region != "" {
msg.Headers["Origin-Region"] = region
}
return msg
}