From 81b33b5387b9f72437c5a8adc3912f7a3056c5dc Mon Sep 17 00:00:00 2001 From: Alvaro Denis Date: Thu, 29 Jul 2021 10:53:52 +0300 Subject: [PATCH 1/3] Add a cli option Be able to configure the encoder for the producer side Options: `model` or `base64` --- go.mod | 2 +- go.sum | 45 ++++++++++++++++++++++++++++++++++++++++++--- kafka-bridge.go | 37 ++++++++++++++++++++++++++++++++++--- 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 9a0a8db..8e2b7c4 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/Financial-Times/go-fthealth v0.0.0-20171204124831-1b007e2b37b7 github.com/Financial-Times/go-logger v0.0.0-20170914081945-83fc3e64dc55 - github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20170622111849-0bb065111416 + github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0f255f821a github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566 github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 diff --git a/go.sum b/go.sum index a16993e..b3bd41f 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,27 @@ github.com/Financial-Times/go-fthealth v0.0.0-20171204124831-1b007e2b37b7 h1:dkf1EOTiHXA2lG2EJuePEim6y0HEOPt0hcqsT/qUr/k= github.com/Financial-Times/go-fthealth v0.0.0-20171204124831-1b007e2b37b7/go.mod h1:gpAzq6W5rCheYlY32JOIxS/VjVcYHbC2PkMzQngHT9c= +github.com/Financial-Times/go-fthealth v0.0.0-20180807113633-3d8eb430d5b5 h1:XH5h45aAyG1bAFBYmkgJkT4q13CbkCJ+gj9+rIfzuL8= +github.com/Financial-Times/go-fthealth v0.0.0-20180807113633-3d8eb430d5b5/go.mod h1:gpAzq6W5rCheYlY32JOIxS/VjVcYHbC2PkMzQngHT9c= github.com/Financial-Times/go-logger v0.0.0-20170914081945-83fc3e64dc55 h1:+/OZNdWdTPM7A7JxR/8ul1QZgXYfPxJdo6iIcK3K17I= github.com/Financial-Times/go-logger v0.0.0-20170914081945-83fc3e64dc55/go.mod h1:NI4Dg39A21H57YC2nG8C42C6ENz/YVsI0jMQWngJzR0= -github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20170622111849-0bb065111416 h1:7qa+AbW0xYSUhqzUtWgYN9JFva9CqsKy1A0lNs3WpYI= -github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20170622111849-0bb065111416/go.mod h1:1mfkRkBPglgCeE8w0gzoC4ujulQAqWfMKVCIdR+2svQ= +github.com/Financial-Times/go-logger v0.0.0-20180323124113-febee6537e90 h1:U7wPaeMESlG0WVwOobaw4qv6I6s9F8b0SdmJKH3Vh6A= +github.com/Financial-Times/go-logger v0.0.0-20180323124113-febee6537e90/go.mod h1:NI4Dg39A21H57YC2nG8C42C6ENz/YVsI0jMQWngJzR0= +github.com/Financial-Times/http-handlers-go v0.0.0-20170809121007-229ac16f1d9e h1:/Y2wrSfkueFmdOIyQSABebfEe5P+yFyxBnmtnx1C0HM= +github.com/Financial-Times/http-handlers-go v0.0.0-20170809121007-229ac16f1d9e/go.mod h1:sAkXv1oPYgNTYBYsYs83HwpYp7R50mvgBGGcsOlJtOw= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210727141350-ffe1f42420fa h1:ar/IMEcnU7otfDH/pLM8pbj2ImNWJIAFWOJiaD/ozek= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210727141350-ffe1f42420fa/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728114901-dd89c7140d11 h1:IaB/Y4snlcLeyHx5pCfNQq9HhH+hx3wMDtFFsmFY6jQ= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728114901-dd89c7140d11/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0f255f821a h1:6hmJtu00PKdlpERIGHPJ4BgzcAOqrITw4SUmVV4ROrg= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0f255f821a/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566 h1:RZ8LXUuj6nDvGoXKF5m+TmHlR0PxG8hU85PLHV8GRBE= github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566/go.mod h1:A88i3psx3Zm80Ai2OYTrwzKkZGKj+x5KL02z+YrRd10= +github.com/Financial-Times/post-publication-combiner/v2 v2.0.0-20200429053931-78b45d72d0ff h1:1CcRc4KcKU5ouy9VCoRclnX0+wBSUqtiU6C/uyv9zag= +github.com/Financial-Times/post-publication-combiner/v2 v2.0.0-20200429053931-78b45d72d0ff/go.mod h1:AH/4UbHiDqRs5OtvVlD0dJSXuxzLBwTa2CVqEQsmMUg= github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d h1:USNBTIof6vWGM49SYrxvC5Y8NqyDL3YuuYmID81ORZQ= github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d/go.mod h1:7zULC9rrq6KxFkpB3Y5zNVaEwrf1g2m3dvXJBPDXyvM= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/Financial-Times/transactionid-utils-go v0.2.0 h1:YcET5Hd1fUGWWpQSVszYUlAc15ca8tmjRetUuQKRqEQ= +github.com/Financial-Times/transactionid-utils-go v0.2.0/go.mod h1:tPAcAFs/dR6Q7hBDGNyUyixHRvg/n9NW/JTq8C58oZ0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -29,29 +42,51 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZsA= +github.com/gorilla/handlers v1.4.0/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= +github.com/gorilla/mux v1.4.1-0.20170830053917-a659b61323b0 h1:WufQb+4501Pn15bGwgA1eE6QREDVyecaTILO3GJv/UQ= +github.com/gorilla/mux v1.4.1-0.20170830053917-a659b61323b0/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/hashicorp/go-version v0.0.0-20170202080759-03c5bf6be031 h1:c3Xdf5fTpk+hqhxqCO+ymqjfUXV9+GZqNgTtlnVzDos= github.com/hashicorp/go-version v0.0.0-20170202080759-03c5bf6be031/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/go-version v1.0.0 h1:21MVWPKDphxa7ineQQTrCU5brh7OuVVAzGOCnnCPtE8= +github.com/hashicorp/go-version v1.0.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.1.0 h1:NdtHXRc0CwZQ507wMvQ/IS+Q3W3x2fycn973/b8Zuk8= github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5 h1:gwcdIpH6NU2iF8CmcqD+CP6+1CkRBOhHaPR+iu6raBY= +github.com/rcrowley/go-metrics v0.0.0-20161128210544-1f30fe9094a5/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/satori/go.uuid v1.2.1-0.20181016170032-d91630c85102 h1:WAQaHPfnpevd8SKXCcy5nk3JzEv2h5Q0kSwvoMqXiZs= +github.com/satori/go.uuid v1.2.1-0.20181016170032-d91630c85102/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2 h1:a07zp0wovcAE2jH+wlD22JLqUH6Rdl8Aon+NiyPxE+0= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= +github.com/stathat/go v1.0.0 h1:HFIS5YkyaI6tXu7JXIRRZBLRvYstdNZm034zcCeaybI= +github.com/stathat/go v1.0.0/go.mod h1:+9Eg2szqkcOGWv6gfheJmBBsmq9Qf5KDbzy8/aYYR0c= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/testify v1.1.5-0.20170130113145-4d4bfba8f1d1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -60,6 +95,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -75,7 +111,9 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e h1:FDhOuMEY4JVRztM/gsbk+IKUQ8kj74bxZrgw87eMMVc= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -89,6 +127,7 @@ gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNat gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= diff --git a/kafka-bridge.go b/kafka-bridge.go index 0bbf042..e168782 100644 --- a/kafka-bridge.go +++ b/kafka-bridge.go @@ -1,10 +1,12 @@ package main import ( + "errors" "fmt" "net" "net/http" "os" + "reflect" "strings" "time" @@ -31,7 +33,7 @@ const ( proxy = "proxy" ) -func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset string, consumerAutoCommitEnable bool, consumerAuthorizationKey string, topic string, producerAddress string, producerAuth string, producerType string, serviceName string) *BridgeApp { +func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset string, consumerAutoCommitEnable bool, consumerAuthorizationKey string, topic string, producerAddress string, producerAuth string, producerType string, serviceName string, producerEncoding int) *BridgeApp { consumerConfig := consumer.QueueConfig{} consumerConfig.Addrs = strings.Split(consumerAddrs, ",") consumerConfig.Group = consumerGroupID @@ -48,7 +50,7 @@ func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset s var producerInstance producer.MessageProducer switch producerType { case proxy: - producerInstance = producer.NewMessageProducer(producerConfig) + producerInstance = producer.NewMessageProducerWithEncoder(producerConfig, producer.NewEncoder(producerEncoding)) case plainHTTP: producerInstance = newPlainHTTPMessageProducer(producerConfig) default: @@ -154,10 +156,39 @@ func main() { }) logger.InitDefaultLogger(*serviceName) + argument2Encoding := map[string]int { + "base64": producer.Base64E, + "model": producer.CombinedModelE, + } + producerEncoding := app.String(cli.StringOpt{ + Name: "producer_encoding", + Value: reflect.ValueOf(argument2Encoding).MapKeys()[0].String(), + Desc: "Two possible values are accepted: model - if you want to send data encoded in a json mapping the model; or base64 if you want a base64 encoding.", + EnvVar: "PRODUCER_ENCODING", + }) + if _, ok := argument2Encoding[*producerEncoding]; !ok { + var options []string + for k := range argument2Encoding { + options = append(options, k) + } + logger.Fatalf(map[string]interface{}{"valid options": options}, errors.New("producer_encoding"), "invalid argument") + } logger.Infof(nil, "Starting Kafka Bridge") app.Action = func() { - bridgeApp := newBridgeApp(*consumerAddrs, *consumerGroup, *consumerOffset, *consumerAutoCommitEnable, *consumerAuthorizationKey, *topic, *producerAddress, *producerAuth, *producerType, *serviceName) + bridgeApp := newBridgeApp( + *consumerAddrs, + *consumerGroup, + *consumerOffset, + *consumerAutoCommitEnable, + *consumerAuthorizationKey, + *topic, + *producerAddress, + *producerAuth, + *producerType, + *serviceName, + argument2Encoding[*producerEncoding], + ) go bridgeApp.enableHealthchecksAndGTG(bridgeApp.serviceName) bridgeApp.consumeMessages() } From 3ed53e20fb01c23e7e3fd084cd7f079c27874617 Mon Sep 17 00:00:00 2001 From: Alvaro Denis Date: Thu, 29 Jul 2021 11:26:34 +0300 Subject: [PATCH 2/3] Handle some unchecked errors --- go.mod | 2 +- go.sum | 2 ++ healthchecks.go | 1 - healthchecks_test.go | 8 ++++---- kafka-bridge.go | 8 ++++---- message_consumer.go | 8 ++++---- message_forwarder.go | 7 ++++--- message_forwarder_test.go | 6 +++--- plain_http_producer.go | 12 ++++++++---- plain_http_producer_test.go | 23 +++++++++++------------ 10 files changed, 41 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 8e2b7c4..9ad9ecd 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/Financial-Times/go-fthealth v0.0.0-20171204124831-1b007e2b37b7 github.com/Financial-Times/go-logger v0.0.0-20170914081945-83fc3e64dc55 - github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0f255f821a + github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805085551-b267a0f9582d github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566 github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 diff --git a/go.sum b/go.sum index b3bd41f..a3dd9c2 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728114901-dd8 github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728114901-dd89c7140d11/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0f255f821a h1:6hmJtu00PKdlpERIGHPJ4BgzcAOqrITw4SUmVV4ROrg= github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0f255f821a/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805085551-b267a0f9582d h1:H4YIluCZesW3nUwW1bGpcg76dlFHR8c+umFvulnsa8Y= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805085551-b267a0f9582d/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566 h1:RZ8LXUuj6nDvGoXKF5m+TmHlR0PxG8hU85PLHV8GRBE= github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566/go.mod h1:A88i3psx3Zm80Ai2OYTrwzKkZGKj+x5KL02z+YrRd10= github.com/Financial-Times/post-publication-combiner/v2 v2.0.0-20200429053931-78b45d72d0ff h1:1CcRc4KcKU5ouy9VCoRclnX0+wBSUqtiU6C/uyv9zag= diff --git a/healthchecks.go b/healthchecks.go index 22c930a..d1388bf 100644 --- a/healthchecks.go +++ b/healthchecks.go @@ -35,7 +35,6 @@ func (hc HealthCheck) Health(serviceName string) func(w http.ResponseWriter, r * if hc.producerType == proxy { description = "Services: source-kafka-proxy, destination-kafka-proxy" checks = []fthealth.Check{hc.consumeHealthcheck(), hc.proxyForwarderHealthcheck()} - } healthCheck := fthealth.TimedHealthCheck{ diff --git a/healthchecks_test.go b/healthchecks_test.go index afadb60..c5517f8 100644 --- a/healthchecks_test.go +++ b/healthchecks_test.go @@ -30,7 +30,7 @@ func (p *mockProducerInstance) ConnectivityCheck() (string, error) { return "", nil } - return "", errors.New("Error connecting to the queue") + return "", errors.New("error connecting to the queue") } func (c *mockConsumerInstance) Start() { @@ -44,7 +44,7 @@ func (c *mockConsumerInstance) ConnectivityCheck() (string, error) { return "", nil } - return "", errors.New("Error connecting to the queue") + return "", errors.New("error connecting to the queue") } func initializeHealthcheck(isProducerConnectionHealthy bool, isConsumerConnectionHealthy bool, producerType string) HealthCheck { @@ -81,7 +81,7 @@ func TestGTGBrokenConsumer(t *testing.T) { status := hc.GTG() assert.False(t, status.GoodToGo) - assert.Equal(t, "Error connecting to the queue", status.Message) + assert.Equal(t, "error connecting to the queue", status.Message) } func TestGTGCheckBrokenProducer(t *testing.T) { @@ -89,7 +89,7 @@ func TestGTGCheckBrokenProducer(t *testing.T) { status := hc.GTG() assert.False(t, status.GoodToGo) - assert.Equal(t, "Error connecting to the queue", status.Message) + assert.Equal(t, "error connecting to the queue", status.Message) } func TestHealthHappyFlow(t *testing.T) { diff --git a/kafka-bridge.go b/kafka-bridge.go index e168782..da5ffbc 100644 --- a/kafka-bridge.go +++ b/kafka-bridge.go @@ -54,7 +54,7 @@ func newBridgeApp(consumerAddrs string, consumerGroupID string, consumerOffset s case plainHTTP: producerInstance = newPlainHTTPMessageProducer(producerConfig) default: - logger.Fatalf(nil, fmt.Errorf("Unknown producer type %s", producerType), "The provided producer type '%v' is invalid", producerType) + logger.Fatalf(nil, fmt.Errorf("unknown producer type %s", producerType), "The provided producer type '%v' is invalid", producerType) } httpClient := &http.Client{ @@ -156,9 +156,9 @@ func main() { }) logger.InitDefaultLogger(*serviceName) - argument2Encoding := map[string]int { + argument2Encoding := map[string]int{ "base64": producer.Base64E, - "model": producer.CombinedModelE, + "model": producer.CombinedModelE, } producerEncoding := app.String(cli.StringOpt{ Name: "producer_encoding", @@ -188,7 +188,7 @@ func main() { *producerType, *serviceName, argument2Encoding[*producerEncoding], - ) + ) go bridgeApp.enableHealthchecksAndGTG(bridgeApp.serviceName) bridgeApp.consumeMessages() } diff --git a/message_consumer.go b/message_consumer.go index 742ca6c..4a1b1b7 100644 --- a/message_consumer.go +++ b/message_consumer.go @@ -12,10 +12,10 @@ import ( queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer" ) -func (bridge BridgeApp) consumeMessages() { - consumerConfig := bridge.consumerConfig +func (bridgeApp BridgeApp) consumeMessages() { + consumerConfig := bridgeApp.consumerConfig - consumer := queueConsumer.NewAgeingConsumer(*consumerConfig, bridge.forwardMsg, queueConsumer.AgeingClient{ + consumer := queueConsumer.NewAgeingConsumer(*consumerConfig, bridgeApp.forwardMsg, queueConsumer.AgeingClient{ Client: &http.Client{ Timeout: 60 * time.Second, Transport: &http.Transport{ @@ -36,7 +36,7 @@ func (bridge BridgeApp) consumeMessages() { wg.Done() }() - ch := make(chan os.Signal) + ch := make(chan os.Signal, 2) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) <-ch consumer.Stop() diff --git a/message_forwarder.go b/message_forwarder.go index a1e4c51..a7bc521 100644 --- a/message_forwarder.go +++ b/message_forwarder.go @@ -2,6 +2,7 @@ package main import ( "errors" + "github.com/Financial-Times/go-logger" queueProducer "github.com/Financial-Times/message-queue-go-producer/producer" queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer" @@ -10,14 +11,14 @@ import ( const tidValidRegexp = "(tid|SYNTHETIC-REQ-MON)[a-zA-Z0-9_-]*$" -func (bridge BridgeApp) forwardMsg(msg queueConsumer.Message) { +func (bridgeApp BridgeApp) forwardMsg(msg queueConsumer.Message) { tid, err := extractTID(msg.Headers) if err != nil { tid = "tid_" + uniuri.NewLen(10) + "_kafka_bridge" logger.NewEntry(tid).Info("Couldn't extract transaction id, due to %s. TID was generated.", err.Error()) } msg.Headers["X-Request-Id"] = tid - err = bridge.producerInstance.SendMessage("", queueProducer.Message{Headers: msg.Headers, Body: msg.Body}) + err = bridgeApp.producerInstance.SendMessage("", queueProducer.Message{Headers: msg.Headers, Body: msg.Body}) if err != nil { logger.NewMonitoringEntry("Forwarding", tid, "").Error("Error happened during message forwarding: " + err.Error()) } else { @@ -28,7 +29,7 @@ func (bridge BridgeApp) forwardMsg(msg queueConsumer.Message) { func extractTID(headers map[string]string) (string, error) { header := headers["X-Request-Id"] if header == "" { - return "", errors.New("X-Request-Id header could not be found.") + return "", errors.New("header X-Request-Id could not be found") } return header, nil } diff --git a/message_forwarder_test.go b/message_forwarder_test.go index 54eb3ea..e850704 100644 --- a/message_forwarder_test.go +++ b/message_forwarder_test.go @@ -1,10 +1,11 @@ package main import ( - queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer" "regexp" "strings" "testing" + + queueConsumer "github.com/Financial-Times/message-queue-gonsumer/consumer" ) func TestExtractTID(t *testing.T) { @@ -37,7 +38,7 @@ func TestExtractTID(t *testing.T) { }, Body: `{"uuid":"7543220a-2389-11e5-bd83-71cb60e8f08c","type":"EOM::CompoundStory","value":"test"}`}, "", - "X-Request-Id header could not be found", + "header X-Request-Id could not be found", }, { queueConsumer.Message{ @@ -83,7 +84,6 @@ func TestExtractTID_TIDRegexp(t *testing.T) { } validRegexp := regexp.MustCompile(tidValidRegexp) for _, test := range tests { - actualTID := validRegexp.FindString(test.header) if actualTID != test.tid { t.Errorf("\nHeader: %s\nExpectedTID: %s\nActualTID: %s\n", test.header, test.tid, actualTID) diff --git a/plain_http_producer.go b/plain_http_producer.go index b27d2cc..3713140 100644 --- a/plain_http_producer.go +++ b/plain_http_producer.go @@ -16,10 +16,10 @@ import ( type plainHTTPMessageProducer struct { config queueProducer.MessageProducerConfig - client plainHttpClient + client plainHTTPClient } -type plainHttpClient interface { +type plainHTTPClient interface { Do(req *http.Request) (resp *http.Response, err error) } @@ -76,7 +76,9 @@ func (c *plainHTTPMessageProducer) SendMessage(uuid string, message queueProduce return errors.New(errMsg) } defer func() { - io.Copy(ioutil.Discard, resp.Body) + if n, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + logger.Fatalf(map[string]interface{}{"written": n, "body": resp.Body}, err, "discarding body") + } resp.Body.Close() }() @@ -100,7 +102,9 @@ func (c *plainHTTPMessageProducer) ConnectivityCheck() (string, error) { } defer func() { - io.Copy(ioutil.Discard, resp.Body) + if n, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + logger.Fatalf(map[string]interface{}{"written": n, "body": resp.Body}, err, "discarding body") + } resp.Body.Close() }() diff --git a/plain_http_producer_test.go b/plain_http_producer_test.go index d7709ca..b32143e 100644 --- a/plain_http_producer_test.go +++ b/plain_http_producer_test.go @@ -41,7 +41,7 @@ func TestSendMessage(t *testing.T) { "X-Request-Id": "t9happe59y", "Authorization": "authorizationkey", "Message-Timestamp": "2015-07-06T07:03:09.362Z", - "Content-Type": "application/json", + "Content-Type": "application/json", }, }, { //missing content-type @@ -86,7 +86,7 @@ func TestSendMessage(t *testing.T) { "X-Request-Id": "t9happe59y", "Authorization": "", "Message-Timestamp": "2015-07-06T07:03:09.362Z", - "Content-Type": "application/json", + "Content-Type": "application/json", }, }, { //host header (queue) is missing @@ -109,7 +109,7 @@ func TestSendMessage(t *testing.T) { "X-Request-Id": "t9happe59y", "Authorization": "", "Message-Timestamp": "2015-07-06T07:03:09.362Z", - "Content-Type": "application/json", + "Content-Type": "application/json", }, }, { //origin system id is missing @@ -131,7 +131,7 @@ func TestSendMessage(t *testing.T) { "X-Request-Id": "t9happe59y", "Authorization": "", "Message-Timestamp": "2015-07-06T07:03:09.362Z", - "Content-Type": "application/json", + "Content-Type": "application/json", }, }, { // origin system id is invalid (but the bridge shouldn't care) @@ -155,7 +155,7 @@ func TestSendMessage(t *testing.T) { "X-Request-Id": "t9happe59y", "Authorization": "authorizationkey", "Message-Timestamp": "2015-07-06T07:03:09.362Z", - "Content-Type": "application/json", + "Content-Type": "application/json", }, }, { //Message-Timestamp is missing @@ -177,7 +177,7 @@ func TestSendMessage(t *testing.T) { "X-Request-Id": "t9happe59y", "Authorization": "", "Message-Timestamp": "", - "Content-Type": "application/json", + "Content-Type": "application/json", }, }, { //native-hash forward @@ -203,7 +203,7 @@ func TestSendMessage(t *testing.T) { "Authorization": "authorizationkey", "Message-Timestamp": "2015-07-06T07:03:09.362Z", "X-Native-Hash": "27f79e6d884acdd642d1758c4fd30d43074f8384d552d1ebb1959345", - "Content-Type": "application/json", + "Content-Type": "application/json", }, }, } @@ -211,7 +211,7 @@ func TestSendMessage(t *testing.T) { for _, test := range tests { cmsNotifierTest := &plainHTTPMessageProducer{ test.config, - &dummyHttpClient{ + &dummyHTTPClient{ assert: assert.New(t), address: test.config.Addr, headers: test.expectedHeaders, @@ -226,10 +226,9 @@ func TestSendMessage(t *testing.T) { t.Errorf("\nExpected error was nil \nActual: %s", err.Error()) } } - } -type dummyHttpClient struct { +type dummyHTTPClient struct { assert *assert.Assertions address string headers map[string]string @@ -237,9 +236,9 @@ type dummyHttpClient struct { host string } -func (d *dummyHttpClient) Do(req *http.Request) (resp *http.Response, err error) { +func (d *dummyHTTPClient) Do(req *http.Request) (resp *http.Response, err error) { // Check url - d.assert.Contains(req.URL.String(), fmt.Sprintf("%s/notify", d.address), fmt.Sprintf("Expected URL incorrect")) + d.assert.Contains(req.URL.String(), fmt.Sprintf("%s/notify", d.address), "Expected URL incorrect") // Check that the correct headers were set for key, value := range d.headers { From 701dab8825eca9f69247b10beb675a87a755839a Mon Sep 17 00:00:00 2001 From: Alvaro Denis Date: Thu, 5 Aug 2021 12:57:49 +0300 Subject: [PATCH 3/3] Fix producer_encoding cli arg validation ref #10 --- go.mod | 4 ++-- go.sum | 8 ++++++++ kafka-bridge.go | 19 ++++++++++--------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 9ad9ecd..3f65366 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/Financial-Times/go-fthealth v0.0.0-20171204124831-1b007e2b37b7 github.com/Financial-Times/go-logger v0.0.0-20170914081945-83fc3e64dc55 - github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805085551-b267a0f9582d + github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805140503-941e89a60d7f github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566 github.com/Financial-Times/service-status-go v0.0.0-20160323111542-3f5199736a3d github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 @@ -14,7 +14,7 @@ require ( github.com/onsi/ginkgo v1.14.2 // indirect github.com/onsi/gomega v1.10.3 // indirect github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2 // indirect - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.7.0 gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect ) diff --git a/go.sum b/go.sum index a3dd9c2..671ce51 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0 github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210728120155-bd0f255f821a/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805085551-b267a0f9582d h1:H4YIluCZesW3nUwW1bGpcg76dlFHR8c+umFvulnsa8Y= github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805085551-b267a0f9582d/go.mod h1:O+R/PyICNDU76FSWeusXl/crkwUBo1X6J9Te8Q6fQ0I= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805140503-941e89a60d7f h1:XudPZs2dqW7idLZ3wQcsPTzUP8WqoLK2CblQ/w8MHN4= +github.com/Financial-Times/message-queue-go-producer v0.1.1-0.20210805140503-941e89a60d7f/go.mod h1:f0BOegIQhC4zQyokEaH40dEQ8CG5I3qzalmWz7uWIPA= github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566 h1:RZ8LXUuj6nDvGoXKF5m+TmHlR0PxG8hU85PLHV8GRBE= github.com/Financial-Times/message-queue-gonsumer v0.0.0-20180518165041-cd41937c7566/go.mod h1:A88i3psx3Zm80Ai2OYTrwzKkZGKj+x5KL02z+YrRd10= github.com/Financial-Times/post-publication-combiner/v2 v2.0.0-20200429053931-78b45d72d0ff h1:1CcRc4KcKU5ouy9VCoRclnX0+wBSUqtiU6C/uyv9zag= @@ -44,6 +46,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZsA= @@ -88,6 +91,8 @@ github.com/stretchr/testify v1.1.5-0.20170130113145-4d4bfba8f1d1/go.mod h1:a8OnR github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= @@ -138,3 +143,6 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka-bridge.go b/kafka-bridge.go index da5ffbc..a105682 100644 --- a/kafka-bridge.go +++ b/kafka-bridge.go @@ -166,16 +166,16 @@ func main() { Desc: "Two possible values are accepted: model - if you want to send data encoded in a json mapping the model; or base64 if you want a base64 encoding.", EnvVar: "PRODUCER_ENCODING", }) - if _, ok := argument2Encoding[*producerEncoding]; !ok { - var options []string - for k := range argument2Encoding { - options = append(options, k) - } - logger.Fatalf(map[string]interface{}{"valid options": options}, errors.New("producer_encoding"), "invalid argument") - } - logger.Infof(nil, "Starting Kafka Bridge") - app.Action = func() { + if _, ok := argument2Encoding[*producerEncoding]; !ok { + var options []string + for k := range argument2Encoding { + options = append(options, k) + } + logger.Errorf(map[string]interface{}{"valid options": options}, errors.New("producer_encoding"), "invalid argument") + app.PrintHelp() + cli.Exit(-1) + } bridgeApp := newBridgeApp( *consumerAddrs, *consumerGroup, @@ -192,6 +192,7 @@ func main() { go bridgeApp.enableHealthchecksAndGTG(bridgeApp.serviceName) bridgeApp.consumeMessages() } + logger.Infof(nil, "Starting Kafka Bridge") err := app.Run(os.Args) if err != nil {