diff --git a/.github/workflows/build-push-cauldron-github-comsumer.yml b/.github/workflows/build-push-cauldron-github-group-consumer.yml similarity index 88% rename from .github/workflows/build-push-cauldron-github-comsumer.yml rename to .github/workflows/build-push-cauldron-github-group-consumer.yml index 0b3701c..3756c55 100644 --- a/.github/workflows/build-push-cauldron-github-comsumer.yml +++ b/.github/workflows/build-push-cauldron-github-group-consumer.yml @@ -1,4 +1,4 @@ -name: Build and push Cauldron GitHub Consumer +name: Build and push Cauldron GitHub Group Consumer on: workflow_dispatch: @@ -24,7 +24,7 @@ jobs: uses: docker/build-push-action@v6 with: context: . - file: Dockerfile.github-consumer + file: Dockerfile.github-consumer-group platforms: linux/amd64,linux/arm64 push: true provenance: false diff --git a/.golangci.yml b/.golangci.yml index d176115..4bedcf7 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -35,6 +35,7 @@ linters-settings: - h - d - p + - l # --------------------------------------------------------------------------- errcheck: check-type-assertions: true diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 995344e..a0c71c2 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -71,7 +71,8 @@ bundle |:---------|:------------|---------| | `LOG_LEVEL` | Logging level, Valid values are: `"DEBUG"`, `"INFO"`, `"WARN"`, `"ERROR"` | `"INFO"` | | `KCP_BROKERS` | Kafka consumer/producer brokers list, comma separated | `"127.0.0.1:9094"` | -| `KC_TOPIC_GITHUB` | Topic name for GitHub webhook consumer | `github` | +| `KC_TOPIC_GITHUB` | Topic name for GitHub webhook consumer | `""` | +| `KCG_NAME` | Kafka consumer group name | `""` | | `KC_PARTITION` | Consumer partition number | `0` | | `KC_DIAL_TIMEOUT` | Initial connection timeout used by broker (shared with consumer) | "`30s`" (seconds) | | `KC_READ_TIMEOUT` | Response timeout used by broker (shared with consumer) | "`30s`" (seconds) | @@ -154,8 +155,11 @@ export KP_GITHUB_MESSAGE_QUEUE_SIZE=100 # export KP_BACKOFF="2s" # export KP_MAX_RETRIES="10" -# kafka github consumer optional values. -# export KC_TOPIC_GITHUB="github" +# kafka github consumer group values. +export KC_TOPIC_GITHUB="github" +export KCG_NAME="github-group" + +# kafka github consumer group optional values. # export KC_PARTITION="0" # export KC_DIAL_TIMEOUT="30s" # export KC_READ_TIMEOUT="30s" @@ -212,31 +216,27 @@ of `rake tasks`: ```bash rake -T -rake db:init # init database -rake db:migrate # runs rake db:migrate up (shortcut) -rake db:migrate:down # run migrate down -rake db:migrate:goto[index] # go to migration -rake db:migrate:up # run migrate up -rake db:psql # connect local db with psql -rake db:reset # reset database (drop and create) -rake default # default task, runs server -rake docker:build:github_consumer # build github consumer -rake docker:build:migrator # build migrator -rake docker:build:server # build server -rake docker:compose:infra:down # stop the infra with all components -rake docker:compose:infra:up # run the infra with all components -rake docker:compose:kafka:down # stop the kafka and kafka-ui only -rake docker:compose:kafka:up # run the kafka and kafka-ui only -rake docker:run:github_consumer # run github consumer -rake docker:run:migrator # run migrator -rake docker:run:server # run server -rake lint # run golang-ci linter -rake rubocop:autofix # lint ruby and autofix -rake rubocop:lint # lint ruby -rake run:kafka:github:consumer # run kafka github consumer -rake run:server # run server -rake test # runs tests (shortcut) -rake test:coverage # run tests and show coverage +rake db:init # init database +rake db:migrate # runs rake db:migrate up (shortcut) +rake db:migrate:down # run migrate down +rake db:migrate:goto[index] # go to migration +rake db:migrate:up # run migrate up +rake db:psql # connect local db with psql +rake db:reset # reset database (drop and create) +rake default # default task, runs server +rake docker:compose:infra:down # stop the infra with all components +rake docker:compose:infra:up # run the infra with all components +rake docker:compose:kafka:down # stop the kafka and kafka-ui only +rake docker:compose:kafka:up # run the kafka and kafka-ui only +rake lint # run golang-ci linter +rake psql:infra # connect to infra database with psql +rake rubocop:autofix # lint ruby and autofix +rake rubocop:lint # lint ruby +rake run:kafka:github:consumer # run kafka github consumer +rake run:kafka:github:consumer_group # run kafka github consumer group +rake run:server # run server +rake test # runs tests (shortcut) +rake test:coverage # run tests and show coverage ``` You can run tests: @@ -347,24 +347,12 @@ rake rubocop:autofix # lints ruby code and auto fixes. ```bash rake -T "docker:" -rake docker:build:github_consumer # build github consumer -rake docker:build:migrator # build migrator -rake docker:build:server # build server - -rake docker:compose:infra:down # stop the infra with all components -rake docker:compose:infra:up # run the infra with all components -rake docker:compose:kafka:down # stop the kafka and kafka-ui only -rake docker:compose:kafka:up # run the kafka and kafka-ui only - -rake docker:run:github_consumer # run github consumer -rake docker:run:migrator # run migrator -rake docker:run:server # run server +rake docker:compose:infra:down # stop the infra with all components +rake docker:compose:infra:up # run the infra with all components +rake docker:compose:kafka:down # stop the kafka and kafka-ui only +rake docker:compose:kafka:up # run the kafka and kafka-ui only ``` -- `docker:build:*`: builds images locally, testing purposes. -- `docker:run:*`: runs containers locally, testing purposes. -- `docker:compose:*`: ups or downs whole infrastructure with services. - --- ## Infrastructure Diagram @@ -406,10 +394,10 @@ Now you can access: - Kafka UI: `http://127.0.0.1:8080/` - Ngrok: `http://127.0.0.1:4040` -- PostgreSQL: `PGPASSWORD="${POSTGRES_PASSWORD}" psql -h localhost -p 5433 -U postgres -d devchain_webhook` +- PostgreSQL: `PGOPTIONS="--search_path=cauldron,public" PGPASSWORD="${POSTGRES_PASSWORD}" psql -h localhost -p 5433 -U postgres -d devchain_webhook` For PostgreSQL, `5433` is exposed in container to avoid conflicts with the -local PostgreSQL instance. +local PostgreSQL instance. Use `rake psql:infra` to connect your infra database. Logging for **kafka** and **kafka-ui** is set to `error` only. Due to development purposes, both were producing too much information, little clean up required. diff --git a/Dockerfile.github-consumer-group b/Dockerfile.github-consumer-group new file mode 100644 index 0000000..96ec92b --- /dev/null +++ b/Dockerfile.github-consumer-group @@ -0,0 +1,31 @@ +FROM golang:1.23-alpine AS builder + +WORKDIR /build +COPY . . + +ARG GOOS +ARG GOARCH +RUN CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build -o consumer cmd/githubconsumergroup/main.go + +FROM alpine:latest AS certs +RUN apk add --update --no-cache ca-certificates + +FROM busybox:latest +ARG UID=10001 +RUN adduser \ + --disabled-password \ + --gecos "" \ + --home "/nonexistent" \ + --shell "/sbin/nologin" \ + --no-create-home \ + --uid "${UID}" \ + appuser +USER appuser +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=builder /build/consumer /consumer + +ENTRYPOINT ["/consumer"] + +LABEL org.opencontainers.image.authors="Uğur vigo Özyılmazel " +LABEL org.opencontainers.image.licenses="MIT" +LABEL org.opencontainers.image.source="https://github.com/devchain-network/cauldron" \ No newline at end of file diff --git a/Rakefile b/Rakefile index af5b31e..0ce43c7 100644 --- a/Rakefile +++ b/Rakefile @@ -36,6 +36,7 @@ namespace :run do namespace :kafka do namespace :github do + desc 'run kafka github consumer' task :consumer do run = %{ go run -race cmd/githubconsumer/main.go } @@ -47,77 +48,25 @@ namespace :run do Process.kill('KILL', pid) 0 end - end - end -end - -namespace :docker do - namespace :run do - desc 'run server' - task :server do - system %{ - docker run \ - --env GITHUB_HMAC_SECRET=${GITHUB_HMAC_SECRET} \ - -p 8000:8000 \ - devchain-server:latest - } - $CHILD_STATUS&.exitstatus || 1 - rescue Interrupt - 0 - end - - desc 'run github consumer' - task :github_consumer do - system %{ - docker run \ - --env KC_TOPIC=${KC_TOPIC} \ - devchain-gh-consumer:latest - } - $CHILD_STATUS&.exitstatus || 1 - rescue Interrupt - 0 - end - desc 'run migrator' - task :migrator do - system %{ - docker run \ - --env DATABASE_URL=${DATABASE_URL_DOCKER_TO_HOST} \ - devchain-migrator:latest - } - $CHILD_STATUS&.exitstatus || 1 - rescue Interrupt - 0 - end - end - namespace :build do - desc 'build server' - task :server do - system %{ docker build -f Dockerfile.server -t devchain-server:latest . } - $CHILD_STATUS&.exitstatus || 1 - rescue Interrupt - 0 - end - - desc 'build github consumer' - task :github_consumer do - system %{ docker build -f Dockerfile.github-consumer -t devchain-gh-consumer:latest . } - $CHILD_STATUS&.exitstatus || 1 - rescue Interrupt - 0 - end + desc 'run kafka github consumer group' + task :consumer_group do + run = %{ go run -race cmd/githubconsumergroup/main.go } + pid = Process.spawn(run) + Process.wait(pid) + $CHILD_STATUS&.exitstatus || 1 + rescue Interrupt + Process.getpgid(pid) + Process.kill('KILL', pid) + 0 + end - desc 'build migrator' - task :migrator do - system %{ docker build -f Dockerfile.migrator -t devchain-migrator:latest . } - $CHILD_STATUS&.exitstatus || 1 - rescue Interrupt - 0 end end +end +namespace :docker do namespace :compose do - namespace :kafka do desc 'run the kafka and kafka-ui only' task :up do @@ -295,3 +244,19 @@ end desc 'runs tests (shortcut)' task test: 'test:test_all' + +INFRA_POSTGRES_PASSWORD = ENV['POSTGRES_PASSWORD'] || nil +namespace :psql do + desc 'connect to infra database with psql' + task :infra do + abort 'infra POSTGRES_PASSWORD environment variable is not set' if INFRA_POSTGRES_PASSWORD.nil? + system %{ + PGPASSWORD="#{INFRA_POSTGRES_PASSWORD}" \ + PGOPTIONS="--search_path=cauldron,public" \ + psql -h localhost -p 5433 -U postgres -d #{DATABASE_NAME} + } + $CHILD_STATUS&.exitstatus || 1 + rescue Interrupt + 0 + end +end diff --git a/cmd/githubconsumer/main.go b/cmd/githubconsumer/main.go index dfd6082..b36ee63 100644 --- a/cmd/githubconsumer/main.go +++ b/cmd/githubconsumer/main.go @@ -5,6 +5,7 @@ import ( "fmt" "log" + "github.com/IBM/sarama" "github.com/devchain-network/cauldron/internal/kafkacp" "github.com/devchain-network/cauldron/internal/kafkacp/kafkaconsumer" "github.com/devchain-network/cauldron/internal/slogger" @@ -13,16 +14,22 @@ import ( "github.com/vigo/getenv" ) -const ( - defaultKafkaConsumerTopic = "github" -) +func storeMessage(strg storage.PingStorer) kafkaconsumer.ProcessMessageFunc { + return func(ctx context.Context, msg *sarama.ConsumerMessage) error { + if err := strg.MessageStore(ctx, msg); err != nil { + return fmt.Errorf("message store error: [%w]", err) + } + + return nil + } +} // Run runs kafa github consumer. func Run() error { logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel) brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) - kafkaTopic := getenv.String("KC_TOPIC_GITHUB", defaultKafkaConsumerTopic) + kafkaTopic := getenv.String("KC_TOPIC_GITHUB", "") kafkaPartition := getenv.Int("KC_PARTITION", kafkaconsumer.DefaultPartition) kafkaDialTimeout := getenv.Duration("KC_DIAL_TIMEOUT", kafkaconsumer.DefaultDialTimeout) kafkaReadTimeout := getenv.Duration("KC_READ_TIMEOUT", kafkaconsumer.DefaultReadTimeout) @@ -64,7 +71,7 @@ func Run() error { kafkaGitHubConsumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(db), + kafkaconsumer.WithProcessMessageFunc(storeMessage(db)), kafkaconsumer.WithKafkaBrokers(*brokersList), kafkaconsumer.WithDialTimeout(*kafkaDialTimeout), kafkaconsumer.WithReadTimeout(*kafkaReadTimeout), diff --git a/cmd/githubconsumergroup/main.go b/cmd/githubconsumergroup/main.go new file mode 100644 index 0000000..792aa5e --- /dev/null +++ b/cmd/githubconsumergroup/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "context" + "fmt" + "log" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/kafkacp/kafkaconsumergroup" + "github.com/devchain-network/cauldron/internal/slogger" + "github.com/devchain-network/cauldron/internal/storage" + "github.com/devchain-network/cauldron/internal/storage/githubstorage" + "github.com/vigo/getenv" +) + +func storeMessage(strg storage.PingStorer) kafkaconsumergroup.ProcessMessageFunc { + return func(ctx context.Context, msg *sarama.ConsumerMessage) error { + if err := strg.MessageStore(ctx, msg); err != nil { + return fmt.Errorf("message store error: [%w]", err) + } + + return nil + } +} + +// Run runs kafa github consumer group. +func Run() error { + logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel) + brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) + kafkaTopic := getenv.String("KC_TOPIC_GITHUB", "") + kafkaConsumerGroup := getenv.String("KCG_NAME", "") + kafkaDialTimeout := getenv.Duration("KC_DIAL_TIMEOUT", kafkaconsumergroup.DefaultDialTimeout) + kafkaReadTimeout := getenv.Duration("KC_READ_TIMEOUT", kafkaconsumergroup.DefaultReadTimeout) + kafkaWriteTimeout := getenv.Duration("KC_WRITE_TIMEOUT", kafkaconsumergroup.DefaultWriteTimeout) + kafkaBackoff := getenv.Duration("KC_BACKOFF", kafkaconsumergroup.DefaultBackoff) + kafkaMaxRetries := getenv.Int("KC_MAX_RETRIES", kafkaconsumergroup.DefaultMaxRetries) + databaseURL := getenv.String("DATABASE_URL", "") + + if err := getenv.Parse(); err != nil { + return fmt.Errorf("environment variable parse error: [%w]", err) + } + + logger, err := slogger.New( + slogger.WithLogLevelName(*logLevel), + ) + if err != nil { + return fmt.Errorf("logger instantiate error: [%w]", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := githubstorage.New( + ctx, + githubstorage.WithDatabaseDSN(*databaseURL), + githubstorage.WithLogger(logger), + ) + if err != nil { + return fmt.Errorf("github storage instantiate error: [%w]", err) + } + + if err = db.Ping(ctx, storage.DefaultDBPingMaxRetries, storage.DefaultDBPingBackoff); err != nil { + return fmt.Errorf("github storage ping error: [%w]", err) + } + defer func() { + logger.Info("github storage - closing pgx pool") + db.Pool.Close() + }() + + kafkaGitHubConsumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(storeMessage(db)), + kafkaconsumergroup.WithKafkaBrokers(*brokersList), + kafkaconsumergroup.WithDialTimeout(*kafkaDialTimeout), + kafkaconsumergroup.WithReadTimeout(*kafkaReadTimeout), + kafkaconsumergroup.WithWriteTimeout(*kafkaWriteTimeout), + kafkaconsumergroup.WithBackoff(*kafkaBackoff), + kafkaconsumergroup.WithMaxRetries(*kafkaMaxRetries), + kafkaconsumergroup.WithTopic(*kafkaTopic), + kafkaconsumergroup.WithKafkaGroupName(*kafkaConsumerGroup), + ) + if err != nil { + return fmt.Errorf("github kafka group consumer instantiate error: [%w]", err) + } + + defer func() { _ = kafkaGitHubConsumer.SaramaConsumerGroup.Close() }() + + if err = kafkaGitHubConsumer.StartConsume(); err != nil { + return fmt.Errorf("github kafka group consumer start consume error: [%w]", err) + } + + return nil +} + +func main() { + if err := Run(); err != nil { + log.Fatal(err) + } +} diff --git a/cmd/server/main.go b/cmd/server/main.go index 8f1bd27..9f9257a 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -55,12 +55,9 @@ func Run() error { return fmt.Errorf("logger instantiate error: [%w]", err) } - var kafkaBrokers kafkacp.KafkaBrokers - kafkaBrokers.AddFromString(*brokersList) - kafkaProducer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), - kafkaproducer.WithKafkaBrokers(kafkaBrokers), + kafkaproducer.WithKafkaBrokers(*brokersList), kafkaproducer.WithMaxRetries(*kafkaProducerMaxRetries), kafkaproducer.WithBackoff(*kafkaProducerBackoff), kafkaproducer.WithDialTimeout(*kafkaProducerDialTimeout), @@ -73,7 +70,7 @@ func Run() error { defer kafkaProducer.AsyncClose() - logger.Info("connected to kafka brokers", "addrs", kafkaBrokers) + logger.Info("connected to kafka brokers", "addrs", *brokersList) githubWebhookMessageQueue := make(chan *sarama.ProducerMessage, *kafkaProducerGithubWebhookMessageQueueSize) @@ -93,7 +90,7 @@ func Run() error { githubWebhookHandler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret(*githubHMACSecret), githubwebhookhandler.WithProducerGitHubMessageQueue(githubWebhookMessageQueue), ) @@ -107,8 +104,8 @@ func Run() error { apiserver.WithReadTimeout(*serverReadTimeout), apiserver.WithWriteTimeout(*serverWriteTimeout), apiserver.WithIdleTimeout(*serverIdleTimeout), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), - apiserver.WithKafkaBrokers(kafkaBrokers), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + apiserver.WithKafkaBrokers(*brokersList), apiserver.WithHTTPHandler(fasthttp.MethodGet, "/healthz", healthCheckHandler.Handle), apiserver.WithHTTPHandler(fasthttp.MethodPost, "/v1/webhook/github", githubWebhookHandler.Handle), ) diff --git a/docker-compose.infra.yml b/docker-compose.infra.yml index ac57d2b..472c8f1 100644 --- a/docker-compose.infra.yml +++ b/docker-compose.infra.yml @@ -100,9 +100,10 @@ services: github-consumer: build: context: . - dockerfile: Dockerfile.github-consumer + dockerfile: Dockerfile.github-consumer-group environment: - KC_TOPIC: "github" + KC_TOPIC_GITHUB: "github" + KCG_NAME: "github-group" KCP_BROKERS: "kafka:9092" DATABASE_URL: "${DATABASE_URL_INFRA}" depends_on: diff --git a/go.mod b/go.mod index 7ad02b9..ee5d343 100644 --- a/go.mod +++ b/go.mod @@ -46,5 +46,3 @@ require ( golang.org/x/text v0.21.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/go-playground/webhooks/v6 => github.com/devchain-network/webhooks/v6 v6.0.0-20250113163359-43c20e82b17e diff --git a/internal/apiserver/apiserver.go b/internal/apiserver/apiserver.go index b02235b..6f7cfce 100644 --- a/internal/apiserver/apiserver.go +++ b/internal/apiserver/apiserver.go @@ -5,6 +5,7 @@ import ( _ "embed" "fmt" "log/slog" + "slices" "time" "github.com/devchain-network/cauldron/internal/cerrors" @@ -53,11 +54,25 @@ type Server struct { IdleTimeout time.Duration } +func validHTTPMethods() []string { + return []string{ + fasthttp.MethodGet, + fasthttp.MethodHead, + fasthttp.MethodPost, + fasthttp.MethodPut, + fasthttp.MethodPatch, + fasthttp.MethodDelete, + fasthttp.MethodConnect, + fasthttp.MethodOptions, + fasthttp.MethodTrace, + } +} + // Start starts the fast http server. func (s *Server) Start() error { s.Logger.Info("start listening at", "addr", s.ListenAddr, "version", ServerVersion) if err := s.FastHTTP.ListenAndServe(s.ListenAddr); err != nil { - return fmt.Errorf("fast http listen and serve error: [%w]", err) + return fmt.Errorf("[apiserver.Start][ListenAndServe] error: [%w]", err) } return nil @@ -69,7 +84,7 @@ func (s *Server) Stop() error { if err := s.FastHTTP.ShutdownWithContext(context.Background()); err != nil { s.Logger.Error("fast http shutdown with context error", "error", err) - return fmt.Errorf("fast http shutdown with context error: [%w]", err) + return fmt.Errorf("[apiserver.Start][ShutdownWithContext] error: [%w]", err) } return nil @@ -77,15 +92,18 @@ func (s *Server) Stop() error { func (s Server) checkRequired() error { if s.Logger == nil { - return fmt.Errorf("api server check required, Logger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf("[apiserver.checkRequired] Logger error: [%w, 'nil' received]", cerrors.ErrValueRequired) } if s.Handlers == nil { - return fmt.Errorf("api server check required, Handlers error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf("[apiserver.checkRequired] Handlers error: [%w, 'nil' received]", cerrors.ErrValueRequired) } if !s.KafkaGitHubTopic.Valid() { - return fmt.Errorf("api server check required, KafkaGitHubTopic error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[apiserver.checkRequired] KafkaGitHubTopic error: [%w, '%s' received]", + cerrors.ErrInvalid, s.KafkaGitHubTopic, + ) } return nil @@ -95,7 +113,10 @@ func (s Server) checkRequired() error { func WithLogger(l *slog.Logger) Option { return func(server *Server) error { if l == nil { - return fmt.Errorf("api server WithLogger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[apiserver.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } server.Logger = l @@ -107,13 +128,30 @@ func WithLogger(l *slog.Logger) Option { func WithHTTPHandler(method, path string, handler fasthttp.RequestHandler) Option { return func(server *Server) error { if method == "" { - return fmt.Errorf("api server WithHTTPHandler method error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[apiserver.WithHTTPHandler] method error: [%w, empty string]", + cerrors.ErrValueRequired, + ) } + + if !slices.Contains(validHTTPMethods(), method) { + return fmt.Errorf( + "[apiserver.WithHTTPHandler] method error: ['%s' is %w]", + method, cerrors.ErrInvalid, + ) + } + if path == "" { - return fmt.Errorf("api server WithHTTPHandler path error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[apiserver.WithHTTPHandler] path error: [%w, empty string]", + cerrors.ErrValueRequired, + ) } if handler == nil { - return fmt.Errorf("api server WithHTTPHandler http handler error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[apiserver.WithHTTPHandler] handler error: [%w, empty string]", + cerrors.ErrValueRequired, + ) } if server.Handlers == nil { @@ -129,11 +167,17 @@ func WithHTTPHandler(method, path string, handler fasthttp.RequestHandler) Optio func WithListenAddr(addr string) Option { return func(server *Server) error { if addr == "" { - return fmt.Errorf("api server WithListenAddr addr error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[apiserver.WithListenAddr] error: [%w, empty string]", + cerrors.ErrValueRequired, + ) } if _, err := getenv.ValidateTCPNetworkAddress(addr); err != nil { - return fmt.Errorf("api server WithListenAddr tcp addr error: [%w] [%w]", err, cerrors.ErrInvalid) + return fmt.Errorf( + "[apiserver.WithListenAddr] error: [%w] ['%s' %w]", + err, addr, cerrors.ErrInvalid, + ) } server.ListenAddr = addr @@ -146,7 +190,10 @@ func WithListenAddr(addr string) Option { func WithReadTimeout(d time.Duration) Option { return func(server *Server) error { if d < 0 { - return fmt.Errorf("api server WithReadTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[apiserver.WithReadTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } server.ReadTimeout = d @@ -159,7 +206,10 @@ func WithReadTimeout(d time.Duration) Option { func WithWriteTimeout(d time.Duration) Option { return func(server *Server) error { if d < 0 { - return fmt.Errorf("api server WithWriteTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[apiserver.WithWriteTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } server.WriteTimeout = d @@ -171,7 +221,10 @@ func WithWriteTimeout(d time.Duration) Option { func WithIdleTimeout(d time.Duration) Option { return func(server *Server) error { if d < 0 { - return fmt.Errorf("api server WithIdleTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[apiserver.WithIdleTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } server.IdleTimeout = d @@ -180,25 +233,36 @@ func WithIdleTimeout(d time.Duration) Option { } // WithKafkaBrokers sets kafka brokers list. -func WithKafkaBrokers(brokers kafkacp.KafkaBrokers) Option { +func WithKafkaBrokers(brokers string) Option { return func(server *Server) error { - if !brokers.Valid() { - return fmt.Errorf("api server WithKafkaBrokers error: [%w]", cerrors.ErrInvalid) + var kafkaBrokers kafkacp.KafkaBrokers + kafkaBrokers.AddFromString(brokers) + + if !kafkaBrokers.Valid() { + return fmt.Errorf( + "[apiserver.WithKafkaBrokers] error: [%w, '%s' received]", + cerrors.ErrInvalid, brokers, + ) } - server.KafkaBrokers = brokers + server.KafkaBrokers = kafkaBrokers return nil } } // WithKafkaGitHubTopic sets kafka topic name for github webhooks. -func WithKafkaGitHubTopic(s kafkacp.KafkaTopicIdentifier) Option { +func WithKafkaGitHubTopic(s string) Option { return func(server *Server) error { - if !s.Valid() { - return fmt.Errorf("api server WithKafkaGitHubTopic error: [%w]", cerrors.ErrInvalid) + topic := kafkacp.KafkaTopicIdentifier(s) + + if !topic.Valid() { + return fmt.Errorf( + "[apiserver.WithKafkaGitHubTopic] error: [%w, '%s' received]", + cerrors.ErrInvalid, s, + ) } - server.KafkaGitHubTopic = s + server.KafkaGitHubTopic = topic return nil } @@ -218,7 +282,7 @@ func New(options ...Option) (*Server, error) { for _, option := range options { if err := option(server); err != nil { - return nil, fmt.Errorf("api server option error: [%w]", err) + return nil, err } } diff --git a/internal/apiserver/apiserver_test.go b/internal/apiserver/apiserver_test.go index e44860d..020b7cd 100644 --- a/internal/apiserver/apiserver_test.go +++ b/internal/apiserver/apiserver_test.go @@ -1,36 +1,18 @@ package apiserver_test import ( - "context" - "log/slog" + "sync" "testing" "time" "github.com/devchain-network/cauldron/internal/apiserver" "github.com/devchain-network/cauldron/internal/cerrors" "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" "github.com/stretchr/testify/assert" "github.com/valyala/fasthttp" ) -type mockLogger struct{} - -func (h *mockLogger) Enabled(_ context.Context, _ slog.Level) bool { - return true -} - -func (h *mockLogger) Handle(_ context.Context, record slog.Record) error { - return nil -} - -func (h *mockLogger) WithAttrs(attrs []slog.Attr) slog.Handler { - return h -} - -func (h *mockLogger) WithGroup(name string) slog.Handler { - return h -} - func TestNew_NoParams(t *testing.T) { server, err := apiserver.New() @@ -48,7 +30,7 @@ func TestNew_NilLogger(t *testing.T) { } func TestNew_EmptyListenAddr(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -60,7 +42,7 @@ func TestNew_EmptyListenAddr(t *testing.T) { } func TestNew_InvalidListenAddr(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -72,11 +54,11 @@ func TestNew_InvalidListenAddr(t *testing.T) { } func TestNew_InvalidKafkaTopic(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifier("foo")), + apiserver.WithKafkaGitHubTopic("foo"), ) assert.ErrorIs(t, err, cerrors.ErrInvalid) @@ -84,14 +66,11 @@ func TestNew_InvalidKafkaTopic(t *testing.T) { } func TestNew_InvalidBrokers(t *testing.T) { - logger := slog.New(new(mockLogger)) - - var kafkaBrokers kafkacp.KafkaBrokers - kafkaBrokers.AddFromString("foo") + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), - apiserver.WithKafkaBrokers(kafkaBrokers), + apiserver.WithKafkaBrokers("foo"), ) assert.ErrorIs(t, err, cerrors.ErrInvalid) @@ -99,7 +78,7 @@ func TestNew_InvalidBrokers(t *testing.T) { } func TestNew_InvalidReadTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -111,7 +90,7 @@ func TestNew_InvalidReadTimeout(t *testing.T) { } func TestNew_InvalidWriteTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -123,7 +102,7 @@ func TestNew_InvalidWriteTimeout(t *testing.T) { } func TestNew_InvalidIdleTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -135,10 +114,7 @@ func TestNew_InvalidIdleTimeout(t *testing.T) { } func TestNew_NilHTTPHandler(t *testing.T) { - logger := slog.New(new(mockLogger)) - - var kafkaBrokers kafkacp.KafkaBrokers - kafkaBrokers.AddFromString("localhost:9194") + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -146,8 +122,8 @@ func TestNew_NilHTTPHandler(t *testing.T) { apiserver.WithReadTimeout(5*time.Second), apiserver.WithWriteTimeout(5*time.Second), apiserver.WithIdleTimeout(5*time.Second), - apiserver.WithKafkaBrokers(kafkaBrokers), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), + apiserver.WithKafkaBrokers("localhost:9194"), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), ) assert.ErrorIs(t, err, cerrors.ErrValueRequired) @@ -155,10 +131,7 @@ func TestNew_NilHTTPHandler(t *testing.T) { } func TestNew_InvalidKafkaTopic_check(t *testing.T) { - logger := slog.New(new(mockLogger)) - - var kafkaBrokers kafkacp.KafkaBrokers - kafkaBrokers.AddFromString("localhost:9194") + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -166,7 +139,7 @@ func TestNew_InvalidKafkaTopic_check(t *testing.T) { apiserver.WithReadTimeout(5*time.Second), apiserver.WithWriteTimeout(5*time.Second), apiserver.WithIdleTimeout(5*time.Second), - apiserver.WithKafkaBrokers(kafkaBrokers), + apiserver.WithKafkaBrokers("localhost:9194"), apiserver.WithHTTPHandler( fasthttp.MethodGet, "/test", @@ -179,7 +152,7 @@ func TestNew_InvalidKafkaTopic_check(t *testing.T) { } func TestNew_MissingArgsHTTPHandler_method(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -187,7 +160,7 @@ func TestNew_MissingArgsHTTPHandler_method(t *testing.T) { apiserver.WithReadTimeout(5*time.Second), apiserver.WithWriteTimeout(5*time.Second), apiserver.WithIdleTimeout(5*time.Second), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), apiserver.WithHTTPHandler( "", "/test", @@ -199,8 +172,29 @@ func TestNew_MissingArgsHTTPHandler_method(t *testing.T) { assert.Nil(t, server) } +func TestNew_InvalidArgsHTTPHandler_method(t *testing.T) { + logger := mockslogger.New() + + server, err := apiserver.New( + apiserver.WithLogger(logger), + apiserver.WithListenAddr(":9000"), + apiserver.WithReadTimeout(5*time.Second), + apiserver.WithWriteTimeout(5*time.Second), + apiserver.WithIdleTimeout(5*time.Second), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + apiserver.WithHTTPHandler( + "FOO", + "/test", + func(ctx *fasthttp.RequestCtx) { ctx.SetStatusCode(fasthttp.StatusOK) }, + ), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, server) +} + func TestNew_MissingArgsHTTPHandler_path(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -208,7 +202,7 @@ func TestNew_MissingArgsHTTPHandler_path(t *testing.T) { apiserver.WithReadTimeout(5*time.Second), apiserver.WithWriteTimeout(5*time.Second), apiserver.WithIdleTimeout(5*time.Second), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), apiserver.WithHTTPHandler( fasthttp.MethodGet, "", @@ -221,7 +215,7 @@ func TestNew_MissingArgsHTTPHandler_path(t *testing.T) { } func TestNew_MissingArgsHTTPHandler_handler(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), @@ -229,7 +223,7 @@ func TestNew_MissingArgsHTTPHandler_handler(t *testing.T) { apiserver.WithReadTimeout(5*time.Second), apiserver.WithWriteTimeout(5*time.Second), apiserver.WithIdleTimeout(5*time.Second), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), apiserver.WithHTTPHandler( fasthttp.MethodGet, "/test", @@ -242,11 +236,11 @@ func TestNew_MissingArgsHTTPHandler_handler(t *testing.T) { } func TestHttpRouter_NotFound(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), apiserver.WithHTTPHandler( fasthttp.MethodGet, "/existing-path", @@ -265,10 +259,10 @@ func TestHttpRouter_NotFound(t *testing.T) { } func TestHttpRouter_MethodNotAllowed(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), apiserver.WithHTTPHandler( fasthttp.MethodGet, "/existing-path", @@ -287,10 +281,10 @@ func TestHttpRouter_MethodNotAllowed(t *testing.T) { } func TestHttpRouter_ValidRouteAndMethod(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() server, err := apiserver.New( apiserver.WithLogger(logger), - apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), apiserver.WithHTTPHandler( fasthttp.MethodGet, "/existing-path", @@ -311,3 +305,38 @@ func TestHttpRouter_ValidRouteAndMethod(t *testing.T) { assert.Equal(t, fasthttp.StatusOK, ctx.Response.StatusCode()) assert.Equal(t, "success", string(ctx.Response.Body())) } + +func TestServer_Start(t *testing.T) { + logger := mockslogger.New() + server, err := apiserver.New( + apiserver.WithLogger(logger), + apiserver.WithListenAddr(":0"), + apiserver.WithKafkaGitHubTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + apiserver.WithHTTPHandler( + fasthttp.MethodGet, + "/existing-path", + func(ctx *fasthttp.RequestCtx) { + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.SetBody([]byte("success")) + }, + ), + ) + assert.NoError(t, err) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + err = server.Start() + assert.NoError(t, err) + }() + + time.Sleep(500 * time.Millisecond) + + err = server.Stop() + assert.NoError(t, err) + + wg.Wait() +} diff --git a/internal/kafkacp/kafkaconsumer/kafkaconsumer.go b/internal/kafkacp/kafkaconsumer/kafkaconsumer.go index 2173533..440f65b 100644 --- a/internal/kafkacp/kafkaconsumer/kafkaconsumer.go +++ b/internal/kafkacp/kafkaconsumer/kafkaconsumer.go @@ -14,7 +14,6 @@ import ( "github.com/IBM/sarama" "github.com/devchain-network/cauldron/internal/cerrors" "github.com/devchain-network/cauldron/internal/kafkacp" - "github.com/devchain-network/cauldron/internal/storage" ) // defaults. @@ -27,6 +26,8 @@ const ( DefaultMaxRetries = 10 ) +var _ KafkaConsumer = (*Consumer)(nil) // compile time proof + // KafkaConsumer defines kafka consumer behaviours. type KafkaConsumer interface { Consume() error @@ -35,13 +36,16 @@ type KafkaConsumer interface { // SaramaConsumerFactoryFunc is a factory function. type SaramaConsumerFactoryFunc func([]string, *sarama.Config) (sarama.Consumer, error) +// ProcessMessageFunc is a factory function for callers. +type ProcessMessageFunc func(ctx context.Context, msg *sarama.ConsumerMessage) error + // Consumer represents kafa consumer setup. type Consumer struct { Topic kafkacp.KafkaTopicIdentifier Logger *slog.Logger - Storage storage.PingStorer SaramaConsumer sarama.Consumer SaramaConsumerFactoryFunc SaramaConsumerFactoryFunc + ProcessMessageFunc ProcessMessageFunc KafkaBrokers kafkacp.KafkaBrokers DialTimeout time.Duration ReadTimeout time.Duration @@ -55,25 +59,34 @@ type Consumer struct { func (c *Consumer) checkRequired() error { if c.Logger == nil { - return fmt.Errorf("kafka consumer check required, Logger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[kafkaconsumer.checkRequired] Logger error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } - if c.Storage == nil { - return fmt.Errorf("kafka consumer check required, Storage error: [%w]", cerrors.ErrValueRequired) + if c.ProcessMessageFunc == nil { + return fmt.Errorf( + "[kafkaconsumer.checkRequired] ProcessMessageFunc error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } if !c.Topic.Valid() { - return fmt.Errorf("kafka consumer check required, Topic error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.checkRequired] Topic error: [%w, false received]", + cerrors.ErrInvalid, + ) } return nil } -// Consume consumes message and stores it to database. +// Consume consumes kafka message with using partition consumer. func (c Consumer) Consume() error { partitionConsumer, err := c.SaramaConsumer.ConsumePartition(c.Topic.String(), c.Partition, sarama.OffsetNewest) if err != nil { - return fmt.Errorf("kafka consumer partition consumer instantiation error: [%w]", err) + return fmt.Errorf("[kafkaconsumer.Consume][SaramaConsumer.ConsumePartition] error: [%w]", err) } defer func() { _ = partitionConsumer.Close() }() @@ -110,7 +123,7 @@ func (c Consumer) Consume() error { }() for msg := range messagesQueue { - if err = c.Storage.MessageStore(ctx, msg); err != nil { + if err = c.ProcessMessageFunc(ctx, msg); err != nil { c.Logger.Error("kafka consumer message store", "error", err, "worker", i) continue @@ -160,7 +173,10 @@ type Option func(*Consumer) error func WithLogger(l *slog.Logger) Option { return func(c *Consumer) error { if l == nil { - return fmt.Errorf("kafka consumer WithLogger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[kafkaconsumer.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } c.Logger = l @@ -168,24 +184,15 @@ func WithLogger(l *slog.Logger) Option { } } -// WithStorage sets storage value. -func WithStorage(st storage.PingStorer) Option { - return func(c *Consumer) error { - if st == nil { - return fmt.Errorf("kafka consumer WithStorage error: [%w]", cerrors.ErrValueRequired) - } - c.Storage = st - - return nil - } -} - // WithTopic sets topic name to consume. func WithTopic(s string) Option { return func(c *Consumer) error { kt := kafkacp.KafkaTopicIdentifier(s) if !kt.Valid() { - return fmt.Errorf("kafka consumer WithTopic error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithTopic] error: [%w, '%s' received]", + cerrors.ErrInvalid, s, + ) } c.Topic = kt @@ -197,7 +204,10 @@ func WithTopic(s string) Option { func WithPartition(i int) Option { return func(c *Consumer) error { if i < 0 || i > math.MaxInt32 { - return fmt.Errorf("kafka consumer WithPartition error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithPartition] error: [%w, '%d' received, must > 0 or must < %d ]", + cerrors.ErrInvalid, i, math.MaxInt32, + ) } c.Partition = int32(i) @@ -211,7 +221,10 @@ func WithKafkaBrokers(brokers string) Option { var kafkaBrokers kafkacp.KafkaBrokers kafkaBrokers.AddFromString(brokers) if !kafkaBrokers.Valid() { - return fmt.Errorf("kafka consumer WithKafkaBrokers error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithKafkaBrokers] error: [%w, '%s' received]", + cerrors.ErrInvalid, brokers, + ) } c.KafkaBrokers = kafkaBrokers @@ -224,7 +237,10 @@ func WithKafkaBrokers(brokers string) Option { func WithDialTimeout(d time.Duration) Option { return func(c *Consumer) error { if d < 0 { - return fmt.Errorf("kafka consumer WithDialTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithDialTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } c.DialTimeout = d @@ -236,7 +252,10 @@ func WithDialTimeout(d time.Duration) Option { func WithReadTimeout(d time.Duration) Option { return func(c *Consumer) error { if d < 0 { - return fmt.Errorf("kafka consumer WithReadTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithReadTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } c.ReadTimeout = d @@ -248,7 +267,10 @@ func WithReadTimeout(d time.Duration) Option { func WithWriteTimeout(d time.Duration) Option { return func(c *Consumer) error { if d < 0 { - return fmt.Errorf("kafka consumer WithWriteTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithWriteTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } c.WriteTimeout = d @@ -260,11 +282,17 @@ func WithWriteTimeout(d time.Duration) Option { func WithBackoff(d time.Duration) Option { return func(c *Consumer) error { if d == 0 { - return fmt.Errorf("kafka consumer WithBackoff error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[kafkaconsumer.WithBackoff] error: [%w, '%s' received, 0 is not allowed]", + cerrors.ErrValueRequired, d, + ) } if d < 0 || d > time.Minute { - return fmt.Errorf("kafka consumer WithBackoff error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithBackoff] error: [%w, '%s' received, must > 0 or < minute]", + cerrors.ErrInvalid, d, + ) } c.Backoff = d @@ -277,7 +305,10 @@ func WithBackoff(d time.Duration) Option { func WithMaxRetries(i int) Option { return func(c *Consumer) error { if i > math.MaxUint8 || i < 0 { - return fmt.Errorf("kafka consumer WithMaxRetries error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaconsumer.WithMaxRetries] error: [%w, '%d' received, must < %d or > 0]", + cerrors.ErrInvalid, i, math.MaxUint8, + ) } c.MaxRetries = uint8(i) @@ -286,12 +317,30 @@ func WithMaxRetries(i int) Option { } // WithSaramaConsumerFactoryFunc sets a custom factory function for creating Sarama consumers. -func WithSaramaConsumerFactoryFunc(factory SaramaConsumerFactoryFunc) Option { +func WithSaramaConsumerFactoryFunc(fn SaramaConsumerFactoryFunc) Option { + return func(c *Consumer) error { + if fn == nil { + return fmt.Errorf( + "[kafkaconsumer.WithSaramaConsumerFactoryFunc] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + c.SaramaConsumerFactoryFunc = fn + + return nil + } +} + +// WithProcessMessageFunc sets the message processor. +func WithProcessMessageFunc(fn ProcessMessageFunc) Option { return func(c *Consumer) error { - if factory == nil { - return fmt.Errorf("kafka consumer WithSaramaConsumerFactoryFunc error: [%w]", cerrors.ErrValueRequired) + if fn == nil { + return fmt.Errorf( + "[kafkaconsumer.WithProcessMessageFunc] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } - c.SaramaConsumerFactoryFunc = factory + c.ProcessMessageFunc = fn return nil } @@ -316,7 +365,7 @@ func New(options ...Option) (*Consumer, error) { for _, option := range options { if err := option(consumer); err != nil { - return nil, fmt.Errorf("kafka consumer option error: [%w]", err) + return nil, err } } @@ -335,7 +384,10 @@ func New(options ...Option) (*Consumer, error) { backoff := consumer.Backoff for i := range consumer.MaxRetries { - sconsumer, sconsumerErr = consumer.SaramaConsumerFactoryFunc(consumer.KafkaBrokers.ToStringSlice(), config) + sconsumer, sconsumerErr = consumer.SaramaConsumerFactoryFunc( + consumer.KafkaBrokers.ToStringSlice(), + config, + ) if sconsumerErr == nil { break } @@ -352,7 +404,10 @@ func New(options ...Option) (*Consumer, error) { } if sconsumerErr != nil { - return nil, fmt.Errorf("kafka consumer NewConsumer error: [%w]", sconsumerErr) + return nil, fmt.Errorf( + "[kafkaconsumer.New][SaramaConsumerFactoryFunc] error: [%w]", + sconsumerErr, + ) } consumer.Logger.Info("successfully connected to", "broker", consumer.KafkaBrokers) diff --git a/internal/kafkacp/kafkaconsumer/kafkaconsumer_test.go b/internal/kafkacp/kafkaconsumer/kafkaconsumer_test.go index d4ec10f..620ae4c 100644 --- a/internal/kafkacp/kafkaconsumer/kafkaconsumer_test.go +++ b/internal/kafkacp/kafkaconsumer/kafkaconsumer_test.go @@ -2,8 +2,8 @@ package kafkaconsumer_test import ( "context" - "log/slog" "os" + "sync" "syscall" "testing" "time" @@ -11,7 +11,9 @@ import ( "github.com/IBM/sarama" "github.com/IBM/sarama/mocks" "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/kafkacp" "github.com/devchain-network/cauldron/internal/kafkacp/kafkaconsumer" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -25,36 +27,10 @@ func (m *mockConsumerFactory) NewConsumer(brokers []string, config *sarama.Confi return args.Get(0).(sarama.Consumer), args.Error(1) } -type mockLogger struct{} - -func (h *mockLogger) Enabled(_ context.Context, _ slog.Level) bool { - return true -} - -func (h *mockLogger) Handle(_ context.Context, record slog.Record) error { - return nil -} - -func (h *mockLogger) WithAttrs(attrs []slog.Attr) slog.Handler { - return h -} - -func (h *mockLogger) WithGroup(name string) slog.Handler { - return h -} - -type mockStorage struct { - mock.Mock -} - -func (m *mockStorage) MessageStore(ctx context.Context, msg *sarama.ConsumerMessage) error { - args := m.Called(ctx, msg) - return args.Error(0) -} - -func (m *mockStorage) Ping(ctx context.Context, maxRetries uint8, backoff time.Duration) error { - args := m.Called(ctx, maxRetries, backoff) - return args.Error(0) +func mockProcessMessageFunc() kafkaconsumer.ProcessMessageFunc { + return func(ctx context.Context, msg *sarama.ConsumerMessage) error { + return nil + } } func TestNew_MissingRequiredFields(t *testing.T) { @@ -73,8 +49,8 @@ func TestNew_NilLogger(t *testing.T) { assert.Nil(t, consumer) } -func TestNew_NoStorage(t *testing.T) { - logger := slog.New(new(mockLogger)) +func TestNew_NoProcessMessageFunc(t *testing.T) { + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), @@ -84,12 +60,12 @@ func TestNew_NoStorage(t *testing.T) { assert.Nil(t, consumer) } -func TestNew_NilStorage(t *testing.T) { - logger := slog.New(new(mockLogger)) +func TestNew_NilProcessMessageFunc(t *testing.T) { + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(nil), + kafkaconsumer.WithProcessMessageFunc(nil), ) assert.ErrorIs(t, err, cerrors.ErrValueRequired) @@ -97,12 +73,11 @@ func TestNew_NilStorage(t *testing.T) { } func TestNew_EmptyTopic(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), ) assert.ErrorIs(t, err, cerrors.ErrInvalid) @@ -110,12 +85,11 @@ func TestNew_EmptyTopic(t *testing.T) { } func TestNew_InvalidTopic(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), kafkaconsumer.WithTopic("invalid"), ) @@ -124,13 +98,12 @@ func TestNew_InvalidTopic(t *testing.T) { } func TestNew_InvalidPartition(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithPartition(2147483648), ) @@ -139,13 +112,12 @@ func TestNew_InvalidPartition(t *testing.T) { } func TestNew_InvalidBrokers(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("invalid"), ) @@ -154,13 +126,12 @@ func TestNew_InvalidBrokers(t *testing.T) { } func TestNew_InvalidDialTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("127.0.0.1:9094"), kafkaconsumer.WithDialTimeout(-1*time.Second), ) @@ -170,13 +141,12 @@ func TestNew_InvalidDialTimeout(t *testing.T) { } func TestNew_InvalidReadTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("127.0.0.1:9094"), kafkaconsumer.WithReadTimeout(-1*time.Second), ) @@ -186,13 +156,12 @@ func TestNew_InvalidReadTimeout(t *testing.T) { } func TestNew_InvalidWriteTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("127.0.0.1:9094"), kafkaconsumer.WithWriteTimeout(-1*time.Second), ) @@ -202,13 +171,12 @@ func TestNew_InvalidWriteTimeout(t *testing.T) { } func TestNew_ZeroBackoff(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("127.0.0.1:9094"), kafkaconsumer.WithBackoff(0), ) @@ -218,13 +186,12 @@ func TestNew_ZeroBackoff(t *testing.T) { } func TestNew_InvalidBackoff(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("127.0.0.1:9094"), kafkaconsumer.WithBackoff(2*time.Minute), ) @@ -234,13 +201,12 @@ func TestNew_InvalidBackoff(t *testing.T) { } func TestNew_InvalidMaxRetries(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("127.0.0.1:9094"), kafkaconsumer.WithMaxRetries(256), ) @@ -249,14 +215,13 @@ func TestNew_InvalidMaxRetries(t *testing.T) { assert.Nil(t, consumer) } -func TestNew_NilSaramaConsumerFactor(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) +func TestNew_NilSaramaConsumerFactoryFunc(t *testing.T) { + logger := mockslogger.New() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithKafkaBrokers("127.0.0.1:9094"), kafkaconsumer.WithSaramaConsumerFactoryFunc(nil), ) @@ -266,8 +231,7 @@ func TestNew_NilSaramaConsumerFactor(t *testing.T) { } func TestNew_WithSaramaConsumerFactoryFunc_Error(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() mockConfig := mocks.NewTestConfig() mockSarama := mocks.NewConsumer(t, mockConfig) @@ -277,8 +241,8 @@ func TestNew_WithSaramaConsumerFactoryFunc_Error(t *testing.T) { consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithBackoff(100*time.Millisecond), kafkaconsumer.WithSaramaConsumerFactoryFunc(mockFactory.NewConsumer), kafkaconsumer.WithMaxRetries(1), @@ -292,8 +256,7 @@ func TestNew_WithSaramaConsumerFactoryFunc_Error(t *testing.T) { } func TestNew_WithSaramaConsumerFactoryFunc_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() mockConfig := mocks.NewTestConfig() mockSarama := mocks.NewConsumer(t, mockConfig) @@ -304,8 +267,8 @@ func TestNew_WithSaramaConsumerFactoryFunc_Success(t *testing.T) { consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithPartition(0), kafkaconsumer.WithDialTimeout(10*time.Second), kafkaconsumer.WithReadTimeout(10*time.Second), @@ -323,16 +286,14 @@ func TestNew_WithSaramaConsumerFactoryFunc_Success(t *testing.T) { } func TestConsumer_Consume_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) - storage.On("MessageStore", mock.Anything, mock.Anything).Return(nil) + logger := mockslogger.New() mockConfig := mocks.NewTestConfig() mockSarama := mocks.NewConsumer(t, mockConfig) - mockSarama.ExpectConsumePartition("github", 0, sarama.OffsetNewest).YieldMessage( + mockSarama.ExpectConsumePartition(kafkacp.KafkaTopicIdentifierGitHub.String(), 0, sarama.OffsetNewest).YieldMessage( &sarama.ConsumerMessage{ Value: []byte(`{"test": "message"}`), - Topic: "github", + Topic: kafkacp.KafkaTopicIdentifierGitHub.String(), Partition: 0, }, ) @@ -342,8 +303,8 @@ func TestConsumer_Consume_Success(t *testing.T) { consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithSaramaConsumerFactoryFunc(mockFactory.NewConsumer), kafkaconsumer.WithMaxRetries(1), ) @@ -351,48 +312,62 @@ func TestConsumer_Consume_Success(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, consumer) + var wg sync.WaitGroup + + wg.Add(1) go func() { + defer wg.Done() + err := consumer.Consume() assert.NoError(t, err) }() - time.Sleep(1 * time.Second) + time.Sleep(100 * time.Millisecond) + + process, _ := os.FindProcess(syscall.Getpid()) + _ = process.Signal(os.Interrupt) mockFactory.AssertNumberOfCalls(t, "NewConsumer", 1) mockFactory.AssertExpectations(t) - storage.AssertNumberOfCalls(t, "MessageStore", 1) - storage.AssertExpectations(t) + wg.Wait() } func TestConsumer_Consume_PartitionConsumeError(t *testing.T) { - logger := slog.New(new(mockLogger)) - storage := new(mockStorage) + logger := mockslogger.New() mockConfig := mocks.NewTestConfig() mockSarama := mocks.NewConsumer(t, mockConfig) - mockSarama.ExpectConsumePartition("github", 0, sarama.OffsetNewest).YieldError(sarama.ErrOutOfBrokers) + mockSarama.ExpectConsumePartition(kafkacp.KafkaTopicIdentifierGitHub.String(), 0, sarama.OffsetNewest). + YieldError(sarama.ErrOutOfBrokers) mockFactory := &mockConsumerFactory{} mockFactory.On("NewConsumer", mock.Anything, mock.Anything).Return(mockSarama, nil).Once() consumer, err := kafkaconsumer.New( kafkaconsumer.WithLogger(logger), - kafkaconsumer.WithStorage(storage), - kafkaconsumer.WithTopic("github"), + kafkaconsumer.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumer.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), kafkaconsumer.WithSaramaConsumerFactoryFunc(mockFactory.NewConsumer), kafkaconsumer.WithMaxRetries(1), ) assert.NoError(t, err) assert.NotNil(t, consumer) + var wg sync.WaitGroup + + wg.Add(1) go func() { + defer wg.Done() + err = consumer.Consume() assert.NoError(t, err) }() time.Sleep(100 * time.Millisecond) + process, _ := os.FindProcess(syscall.Getpid()) _ = process.Signal(os.Interrupt) mockFactory.AssertNumberOfCalls(t, "NewConsumer", 1) mockFactory.AssertExpectations(t) + wg.Wait() } diff --git a/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go new file mode 100644 index 0000000..54afa5d --- /dev/null +++ b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go @@ -0,0 +1,506 @@ +package kafkaconsumergroup + +import ( + "context" + "fmt" + "log/slog" + "math" + "os" + "os/signal" + "runtime" + "sync" + "time" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/kafkacp" +) + +// defaults. +const ( + DefaultDialTimeout = 30 * time.Second + DefaultReadTimeout = 30 * time.Second + DefaultWriteTimeout = 30 * time.Second + DefaultBackoff = 2 * time.Second + DefaultMaxRetries = 10 +) + +var _ sarama.ConsumerGroupHandler = (*Consumer)(nil) // compile time proof + +// Consumer represents kafa group consumer setup. +type Consumer struct { + KafkaGroupName string + Logger *slog.Logger + SaramaConsumerGroupFactoryFunc SaramaConsumerGroupFactoryFunc + ProcessMessageFunc ProcessMessageFunc + MessageQueue chan *sarama.ConsumerMessage + SaramaConsumerGroup sarama.ConsumerGroup + SaramaConsumerGroupHandler sarama.ConsumerGroupHandler + Topic kafkacp.KafkaTopicIdentifier + KafkaBrokers kafkacp.KafkaBrokers + KafkaVersion sarama.KafkaVersion + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + Backoff time.Duration + MaxRetries uint8 + MessageBufferSize int + NumberOfWorkers int +} + +// SaramaConsumerGroupFactoryFunc is a factory function. +type SaramaConsumerGroupFactoryFunc func([]string, string, *sarama.Config) (sarama.ConsumerGroup, error) + +// ProcessMessageFunc is a factory function for callers. +type ProcessMessageFunc func(ctx context.Context, msg *sarama.ConsumerMessage) error + +func (c *Consumer) checkRequired() error { + if c.Logger == nil { + return fmt.Errorf( + "[kafkaconsumergroup.checkRequired] Logger error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + + if c.ProcessMessageFunc == nil { + return fmt.Errorf( + "[kafkaconsumergroup.checkRequired] ProcessMessageFunc error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + + if c.KafkaGroupName == "" { + return fmt.Errorf( + "[kafkaconsumergroup.checkRequired] KafkaGroupName error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) + } + + if !c.Topic.Valid() { + return fmt.Errorf( + "[kafkaconsumergroup.checkRequired] Topic error: [%w, false received]", + cerrors.ErrInvalid, + ) + } + + return nil +} + +// Setup implements sarama ConsumerGroupHandler interface. +func (Consumer) Setup(_ sarama.ConsumerGroupSession) error { return nil } + +// Cleanup implements sarama ConsumerGroupHandler interface. +func (Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +// ConsumeClaim implements sarama ConsumerGroupHandler interface. +func (c Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + c.MessageQueue <- msg + + sess.MarkMessage(msg, "") + } + + return nil +} + +// StartConsume consumes message from kafka. +func (c Consumer) StartConsume() error { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + + topics := []string{c.Topic.String()} + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case err, ok := <-c.SaramaConsumerGroup.Errors(): + if !ok { + c.Logger.Info("error chan closed, exiting error handler") + + return + } + c.Logger.Error("kafka consumer group error", "error", err) + case <-ctx.Done(): + c.Logger.Info("context canceled, stopping error handler") + + return + } + } + }() + + c.Logger.Info("starting workers", "count", c.NumberOfWorkers) + for i := range c.NumberOfWorkers { + wg.Add(1) + + go func() { + defer func() { + wg.Done() + c.Logger.Info("worker stopped", "id", i) + }() + + for { + select { + case msg, ok := <-c.MessageQueue: + if !ok { + return + } + + if err := c.ProcessMessageFunc(ctx, msg); err != nil { + c.Logger.Error("kafka consumer group process message", "error", err, "worker", i) + + continue + } + + c.Logger.Info( + "message is stored to database", + "worker", i, + "topic", msg.Topic, + "partition", msg.Partition, + "offset", msg.Offset, + "key", string(msg.Key), + ) + + case <-ctx.Done(): + return + } + } + }() + } + + wg.Add(1) + go func() { + defer func() { + wg.Done() + close(c.MessageQueue) + }() + + for { + if ctx.Err() != nil { + c.Logger.Info("context canceled, stopping consumer loop") + + return + } + + if err := c.SaramaConsumerGroup.Consume(ctx, topics, c.SaramaConsumerGroupHandler); err != nil { + if ctx.Err() != nil { + c.Logger.Info("consume stopped due to context cancellation") + + return + } + + c.Logger.Error("kafka consume group consume", "error", err) + } + } + }() + + <-ctx.Done() + wg.Wait() + + if err := c.SaramaConsumerGroup.Close(); err != nil { + return fmt.Errorf( + "[kafkaconsumergroup.StartConsume][SaramaConsumerGroup.Close] error: [%w]", + err, + ) + } + + c.Logger.Info("all workers are stopped") + + return nil +} + +// Option represents option function type. +type Option func(*Consumer) error + +// WithLogger sets logger. +func WithLogger(l *slog.Logger) Option { + return func(c *Consumer) error { + if l == nil { + return fmt.Errorf( + "[kafkaconsumergroup.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + c.Logger = l + + return nil + } +} + +// WithTopic sets topic name to consume. +func WithTopic(s string) Option { + return func(c *Consumer) error { + kt := kafkacp.KafkaTopicIdentifier(s) + if !kt.Valid() { + return fmt.Errorf( + "[kafkaconsumergroup.WithTopic] error: [%w, '%s' received]", + cerrors.ErrInvalid, s, + ) + } + c.Topic = kt + + return nil + } +} + +// WithKafkaVersion sets kafka version. +func WithKafkaVersion(s string) Option { + return func(c *Consumer) error { + version, err := sarama.ParseKafkaVersion(s) + if err != nil { + return fmt.Errorf( + "[kafkaconsumergroup.WithKafkaVersion] error: [(%w) %w, '%s' received]", + err, cerrors.ErrInvalid, s, + ) + } + + c.KafkaVersion = version + + return nil + } +} + +// WithKafkaBrokers sets kafka brokers list. +func WithKafkaBrokers(brokers string) Option { + return func(c *Consumer) error { + var kafkaBrokers kafkacp.KafkaBrokers + kafkaBrokers.AddFromString(brokers) + if !kafkaBrokers.Valid() { + return fmt.Errorf( + "[kafkaconsumergroup.WithKafkaBrokers] error: [%w, '%s' received]", + cerrors.ErrInvalid, brokers, + ) + } + + c.KafkaBrokers = kafkaBrokers + + return nil + } +} + +// WithDialTimeout sets dial timeout. +func WithDialTimeout(d time.Duration) Option { + return func(c *Consumer) error { + if d < 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithDialTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) + } + c.DialTimeout = d + + return nil + } +} + +// WithReadTimeout sets read timeout. +func WithReadTimeout(d time.Duration) Option { + return func(c *Consumer) error { + if d < 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithReadTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) + } + c.ReadTimeout = d + + return nil + } +} + +// WithWriteTimeout sets write timeout. +func WithWriteTimeout(d time.Duration) Option { + return func(c *Consumer) error { + if d < 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithWriteTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) + } + c.WriteTimeout = d + + return nil + } +} + +// WithBackoff sets backoff duration. +func WithBackoff(d time.Duration) Option { + return func(c *Consumer) error { + if d == 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithBackoff] error: [%w, '%s' received, 0 is not allowed]", + cerrors.ErrValueRequired, d, + ) + } + + if d < 0 || d > time.Minute { + return fmt.Errorf( + "[kafkaconsumergroup.WithBackoff] error: [%w, '%s' received, must > 0 or < minute]", + cerrors.ErrInvalid, d, + ) + } + + c.Backoff = d + + return nil + } +} + +// WithMaxRetries sets max retries value. +func WithMaxRetries(i int) Option { + return func(c *Consumer) error { + if i > math.MaxUint8 || i < 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithMaxRetries] error: [%w, '%d' received, must < %d or > 0]", + cerrors.ErrInvalid, i, math.MaxUint8, + ) + } + c.MaxRetries = uint8(i) + + return nil + } +} + +// WithKafkaGroupName sets kafka consumer group name. +func WithKafkaGroupName(s string) Option { + return func(c *Consumer) error { + if s == "" { + return fmt.Errorf( + "[kafkaconsumergroup.WithKafkaGroupName] error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) + } + c.KafkaGroupName = s + + return nil + } +} + +// WithSaramaConsumerGroupFactoryFunc sets a custom factory function for Sarama consumer group. +func WithSaramaConsumerGroupFactoryFunc(fn SaramaConsumerGroupFactoryFunc) Option { + return func(c *Consumer) error { + if fn == nil { + return fmt.Errorf( + "[kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + + c.SaramaConsumerGroupFactoryFunc = fn + + return nil + } +} + +// WithProcessMessageFunc sets the message processor. +func WithProcessMessageFunc(fn ProcessMessageFunc) Option { + return func(c *Consumer) error { + if fn == nil { + return fmt.Errorf( + "[kafkaconsumergroup.WithProcessMessageFunc] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + c.ProcessMessageFunc = fn + + return nil + } +} + +// WithSaramaConsumerGroupHandler sets sarama consumer group handler. +func WithSaramaConsumerGroupHandler(handler sarama.ConsumerGroupHandler) Option { + return func(c *Consumer) error { + if handler == nil { + return fmt.Errorf( + "[kafkaconsumergroup.WithSaramaConsumerGroupHandler] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + c.SaramaConsumerGroupHandler = handler + + return nil + } +} + +// New instantiates new kafka github consumer group instance. +func New(options ...Option) (*Consumer, error) { + consumer := new(Consumer) + + var kafkaBrokers kafkacp.KafkaBrokers + kafkaBrokers.AddFromString(kafkacp.DefaultKafkaBrokers) + + consumer.KafkaBrokers = kafkaBrokers + consumer.DialTimeout = DefaultDialTimeout + consumer.ReadTimeout = DefaultReadTimeout + consumer.WriteTimeout = DefaultWriteTimeout + consumer.Backoff = DefaultBackoff + consumer.MaxRetries = DefaultMaxRetries + consumer.NumberOfWorkers = runtime.NumCPU() + consumer.MessageBufferSize = consumer.NumberOfWorkers * 10 + consumer.KafkaVersion = sarama.V3_9_0_0 + consumer.SaramaConsumerGroupFactoryFunc = sarama.NewConsumerGroup + consumer.MessageQueue = make(chan *sarama.ConsumerMessage, consumer.MessageBufferSize) + + for _, option := range options { + if err := option(consumer); err != nil { + return nil, err + } + } + + if err := consumer.checkRequired(); err != nil { + return nil, err + } + + if consumer.SaramaConsumerGroupHandler == nil { + consumer.SaramaConsumerGroupHandler = consumer + } + + config := sarama.NewConfig() + config.Net.DialTimeout = consumer.DialTimeout + config.Net.ReadTimeout = consumer.ReadTimeout + config.Net.WriteTimeout = consumer.WriteTimeout + config.Version = sarama.V3_9_0_0 + config.Consumer.Return.Errors = true + + var saramaConsumerGroup sarama.ConsumerGroup + var saramaConsumerGroupErr error + backoff := consumer.Backoff + + for i := range consumer.MaxRetries { + saramaConsumerGroup, saramaConsumerGroupErr = consumer.SaramaConsumerGroupFactoryFunc( + consumer.KafkaBrokers.ToStringSlice(), + consumer.KafkaGroupName, + config, + ) + + if saramaConsumerGroupErr == nil { + break + } + + consumer.Logger.Error( + "can not connect to", + "brokers", consumer.KafkaBrokers, + "error", saramaConsumerGroupErr, + "retry", fmt.Sprintf("%d/%d", i, consumer.MaxRetries), + "backoff", backoff.String(), + ) + time.Sleep(backoff) + backoff *= 2 + } + if saramaConsumerGroupErr != nil { + return nil, fmt.Errorf( + "[kafkaconsumergroup.New][SaramaConsumerGroupFactoryFunc] error: [%w]", + saramaConsumerGroupErr, + ) + } + + consumer.Logger.Info("successfully connected to", "broker", consumer.KafkaBrokers) + + consumer.SaramaConsumerGroup = saramaConsumerGroup + + return consumer, nil +} diff --git a/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go new file mode 100644 index 0000000..d7a8662 --- /dev/null +++ b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go @@ -0,0 +1,446 @@ +package kafkaconsumergroup_test + +import ( + "context" + "errors" + "os" + "sync" + "syscall" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/kafkacp/kafkaconsumergroup" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func mockProcessMessageFunc() kafkaconsumergroup.ProcessMessageFunc { + return func(ctx context.Context, msg *sarama.ConsumerMessage) error { + return nil + } +} + +// mockConsumerGroup ---------------------------------------------------------- +type mockConsumerGroup struct { + mock.Mock +} + +func (m *mockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error { + args := m.Called(ctx, topics, handler) + return args.Error(0) +} + +func (m *mockConsumerGroup) Errors() <-chan error { + args := m.Called() + return args.Get(0).(<-chan error) +} + +func (m *mockConsumerGroup) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockConsumerGroup) Pause(partitions map[string][]int32) { + m.Called(partitions) +} + +func (m *mockConsumerGroup) Resume(partitions map[string][]int32) { + m.Called(partitions) +} + +func (m *mockConsumerGroup) PauseAll() { + m.Called() +} + +func (m *mockConsumerGroup) ResumeAll() { + m.Called() +} + +// mockConsumerGroupFactory --------------------------------------------------- +type mockConsumerGroupFactory struct { + mock.Mock +} + +func (m *mockConsumerGroupFactory) CreateConsumerGroup( + brokers []string, + groupName string, + config *sarama.Config, +) (sarama.ConsumerGroup, error) { + args := m.Called(brokers, groupName, config) + return args.Get(0).(sarama.ConsumerGroup), args.Error(1) +} + +func TestNew_MissingRequiredFields(t *testing.T) { + consumer, err := kafkaconsumergroup.New() + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_NilLogger(t *testing.T) { + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(nil), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_NoProcessMessageFunc(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_NilProcessMessageFunc(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(nil), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_NoGroupName(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_EmptyGroupName(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName(""), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_NoTopic(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidTopic(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic("invalid"), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidBrokers(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers("invalid"), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidDialTimeout(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithDialTimeout(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidReadTimeout(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithReadTimeout(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidWriteTimeout(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithWriteTimeout(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_ZeroBackoff(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithBackoff(0), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_InvalidBackoff(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithBackoff(2*time.Minute), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidMaxRetries(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithMaxRetries(256), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidKafkaVersion(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithKafkaVersion("1111"), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_NilSaramaConsumerGroupHandler(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithSaramaConsumerGroupHandler(nil), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_NilSaramaConsumerGroupFactoryFunc(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithKafkaVersion("3.9.0"), + kafkaconsumergroup.WithDialTimeout(5*time.Second), + kafkaconsumergroup.WithReadTimeout(5*time.Second), + kafkaconsumergroup.WithWriteTimeout(5*time.Second), + kafkaconsumergroup.WithBackoff(1*time.Second), + kafkaconsumergroup.WithMaxRetries(2), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(nil), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, consumer) +} + +func TestNew_SaramaConsumerGroupFactoryFunc_Error(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On( + "CreateConsumerGroup", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(consumerGroup, errors.New("error")).Once() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithBackoff(100*time.Millisecond), + kafkaconsumergroup.WithMaxRetries(1), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.Nil(t, consumer) + assert.Error(t, err) + consumerGroupFactory.AssertNumberOfCalls(t, "CreateConsumerGroup", 1) + consumerGroupFactory.AssertExpectations(t) +} + +func TestNew_SaramaConsumerGroupFactoryFunc_Success(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On("CreateConsumerGroup", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(consumerGroup, nil).Once() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithBackoff(100*time.Millisecond), + kafkaconsumergroup.WithMaxRetries(1), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.NotNil(t, consumer) + assert.NoError(t, err) + consumerGroupFactory.AssertNumberOfCalls(t, "CreateConsumerGroup", 1) + consumerGroupFactory.AssertExpectations(t) +} + +func TestNew_Consume_Success(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroup.On("Errors").Return((<-chan error)(make(chan error))) + + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On("CreateConsumerGroup", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(consumerGroup, nil) + + consumerGroup.On("Consume", mock.Anything, mock.Anything, mock.Anything).Return(nil) + consumerGroup.On("Close").Return(nil).Once() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithBackoff(100*time.Millisecond), + kafkaconsumergroup.WithMaxRetries(1), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.NotNil(t, consumer) + assert.NoError(t, err) + + consumerGroupFactory.AssertNumberOfCalls(t, "CreateConsumerGroup", 1) + consumerGroupFactory.AssertExpectations(t) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + err := consumer.StartConsume() + assert.NoError(t, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + consumer.MessageQueue <- &sarama.ConsumerMessage{ + Topic: kafkacp.KafkaTopicIdentifierGitHub.String(), + Partition: 0, + Offset: 1, + Key: []byte("key"), + Value: []byte("value"), + } + }() + + time.Sleep(100 * time.Millisecond) + process, _ := os.FindProcess(syscall.Getpid()) + _ = process.Signal(os.Interrupt) + wg.Wait() +} diff --git a/internal/kafkacp/kafkacp.go b/internal/kafkacp/kafkacp.go index 5c4ec3f..bf4b8af 100644 --- a/internal/kafkacp/kafkacp.go +++ b/internal/kafkacp/kafkacp.go @@ -16,10 +16,12 @@ const ( KafkaTopicIdentifierBitBucket KafkaTopicIdentifier = "bitbucket" ) -var validKafkaTopicIdentifiers = []KafkaTopicIdentifier{ - KafkaTopicIdentifierGitHub, - KafkaTopicIdentifierGitLab, - KafkaTopicIdentifierBitBucket, +func validKafkaTopicIdentifiers() []KafkaTopicIdentifier { + return []KafkaTopicIdentifier{ + KafkaTopicIdentifierGitHub, + KafkaTopicIdentifierGitLab, + KafkaTopicIdentifierBitBucket, + } } // KafkaTopicIdentifier represents custom type for kafka topic names. @@ -35,7 +37,7 @@ func (s KafkaTopicIdentifier) Valid() bool { return false } - return slices.Contains(validKafkaTopicIdentifiers, s) + return slices.Contains(validKafkaTopicIdentifiers(), s) } // TCPAddr represents tcp address as string. diff --git a/internal/kafkacp/kafkaproducer/kafkaproducer.go b/internal/kafkacp/kafkaproducer/kafkaproducer.go index 754fae6..6df449b 100644 --- a/internal/kafkacp/kafkaproducer/kafkaproducer.go +++ b/internal/kafkacp/kafkaproducer/kafkaproducer.go @@ -37,7 +37,10 @@ type SaramaProducerFactoryFunc func([]string, *sarama.Config) (sarama.AsyncProdu func (p Producer) checkRequired() error { if p.Logger == nil { - return fmt.Errorf("kafka producer check required, Logger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[kafkaproducer.checkRequired] Logger error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } return nil @@ -50,7 +53,10 @@ type Option func(*Producer) error func WithLogger(l *slog.Logger) Option { return func(p *Producer) error { if l == nil { - return fmt.Errorf("kafka producer WithLogger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[kafkaproducer.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } p.Logger = l @@ -59,13 +65,19 @@ func WithLogger(l *slog.Logger) Option { } // WithKafkaBrokers sets kafka brokers list. -func WithKafkaBrokers(brokers kafkacp.KafkaBrokers) Option { +func WithKafkaBrokers(brokers string) Option { return func(p *Producer) error { - if !brokers.Valid() { - return fmt.Errorf("kafka producer WithKafkaBrokers error: [%w]", cerrors.ErrInvalid) + var kafkaBrokers kafkacp.KafkaBrokers + kafkaBrokers.AddFromString(brokers) + + if !kafkaBrokers.Valid() { + return fmt.Errorf( + "[kafkaproducer.WithKafkaBrokers] error: [%w, '%s' received]", + cerrors.ErrInvalid, brokers, + ) } - p.KafkaBrokers = brokers + p.KafkaBrokers = kafkaBrokers return nil } @@ -75,7 +87,10 @@ func WithKafkaBrokers(brokers kafkacp.KafkaBrokers) Option { func WithMaxRetries(i int) Option { return func(p *Producer) error { if i > math.MaxUint8 || i < 0 { - return fmt.Errorf("kafka producer WithMaxRetries error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaproducer.WithMaxRetries] error: [%w, '%d' received, must < %d or > 0]", + cerrors.ErrInvalid, i, math.MaxUint8, + ) } p.MaxRetries = uint8(i) @@ -87,8 +102,19 @@ func WithMaxRetries(i int) Option { func WithBackoff(d time.Duration) Option { return func(p *Producer) error { if d == 0 { - return fmt.Errorf("kafka producer WithBackoff error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[kafkaproducer.WithBackoff] error: [%w, '%s' received, 0 is not allowed]", + cerrors.ErrValueRequired, d, + ) + } + + if d < 0 || d > time.Minute { + return fmt.Errorf( + "[kafkaproducer.WithBackoff] error: [%w, '%s' received, must > 0 or < minute]", + cerrors.ErrInvalid, d, + ) } + p.Backoff = d return nil @@ -99,7 +125,10 @@ func WithBackoff(d time.Duration) Option { func WithDialTimeout(d time.Duration) Option { return func(p *Producer) error { if d < 0 { - return fmt.Errorf("kafka producer WithDialTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaproducer.WithDialTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } p.DialTimeout = d @@ -111,7 +140,10 @@ func WithDialTimeout(d time.Duration) Option { func WithReadTimeout(d time.Duration) Option { return func(p *Producer) error { if d < 0 { - return fmt.Errorf("kafka producer WithReadTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaproducer.WithReadTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } p.ReadTimeout = d @@ -123,7 +155,10 @@ func WithReadTimeout(d time.Duration) Option { func WithWriteTimeout(d time.Duration) Option { return func(p *Producer) error { if d < 0 { - return fmt.Errorf("kafka producer WithWriteTimeout error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[kafkaproducer.WithWriteTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) } p.WriteTimeout = d @@ -132,12 +167,15 @@ func WithWriteTimeout(d time.Duration) Option { } // WithSaramaProducerFactoryFunc sets a custom factory function for creating Sarama producers. -func WithSaramaProducerFactoryFunc(factory SaramaProducerFactoryFunc) Option { +func WithSaramaProducerFactoryFunc(fn SaramaProducerFactoryFunc) Option { return func(p *Producer) error { - if factory == nil { - return fmt.Errorf("kafka producer WithSaramaProducerFactoryFunc error: [%w]", cerrors.ErrValueRequired) + if fn == nil { + return fmt.Errorf( + "[kafkaproducer.WithSaramaProducerFactoryFunc] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } - p.SaramaProducerFactoryFunc = factory + p.SaramaProducerFactoryFunc = fn return nil } @@ -160,7 +198,7 @@ func New(options ...Option) (sarama.AsyncProducer, error) { for _, option := range options { if err := option(producer); err != nil { - return nil, fmt.Errorf("kafka producer option error: [%w]", err) + return nil, err } } @@ -172,6 +210,7 @@ func New(options ...Option) (sarama.AsyncProducer, error) { config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true config.Producer.Return.Errors = true + config.Producer.Compression = sarama.CompressionSnappy config.Net.DialTimeout = producer.DialTimeout config.Net.ReadTimeout = producer.ReadTimeout config.Net.WriteTimeout = producer.WriteTimeout @@ -199,7 +238,10 @@ func New(options ...Option) (sarama.AsyncProducer, error) { backoff *= 2 } if kafkaProducerErr != nil { - return nil, fmt.Errorf("kafka producer sarama.NewAsyncProducer error: [%w]", kafkaProducerErr) + return nil, fmt.Errorf( + "[kafkaproducer.New][SaramaProducerFactoryFunc] error: [%w]", + kafkaProducerErr, + ) } return kafkaProducer, nil diff --git a/internal/kafkacp/kafkaproducer/kafkaproducer_test.go b/internal/kafkacp/kafkaproducer/kafkaproducer_test.go index dcb1918..a1e0b22 100644 --- a/internal/kafkacp/kafkaproducer/kafkaproducer_test.go +++ b/internal/kafkacp/kafkaproducer/kafkaproducer_test.go @@ -1,8 +1,6 @@ package kafkaproducer_test import ( - "context" - "log/slog" "testing" "time" @@ -11,28 +9,11 @@ import ( "github.com/devchain-network/cauldron/internal/cerrors" "github.com/devchain-network/cauldron/internal/kafkacp" "github.com/devchain-network/cauldron/internal/kafkacp/kafkaproducer" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) -type mockLogger struct{} - -func (h *mockLogger) Enabled(_ context.Context, _ slog.Level) bool { - return true -} - -func (h *mockLogger) Handle(_ context.Context, record slog.Record) error { - return nil -} - -func (h *mockLogger) WithAttrs(attrs []slog.Attr) slog.Handler { - return h -} - -func (h *mockLogger) WithGroup(name string) slog.Handler { - return h -} - type mockProducerFactory struct { mock.Mock } @@ -48,21 +29,31 @@ func TestNew_MissingRequiredFields(t *testing.T) { assert.Nil(t, producer) } -func TestNew_InvalidKafkaBrokers(t *testing.T) { +func TestNew_NilLogger(t *testing.T) { var kafkaBrokers kafkacp.KafkaBrokers - kafkaBrokers.AddFromString("invalid") + kafkaBrokers.AddFromString("127.0.0.1:9094") + + producer, err := kafkaproducer.New( + kafkaproducer.WithLogger(nil), + ) + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, producer) +} + +func TestNew_InvalidKafkaBrokers(t *testing.T) { + logger := mockslogger.New() - logger := slog.New(new(mockLogger)) producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), - kafkaproducer.WithKafkaBrokers(kafkaBrokers), + kafkaproducer.WithKafkaBrokers("invalid"), ) assert.ErrorIs(t, err, cerrors.ErrInvalid) assert.Nil(t, producer) } func TestNew_InvalidMaxRetries(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() + producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), kafkaproducer.WithMaxRetries(300), @@ -72,7 +63,8 @@ func TestNew_InvalidMaxRetries(t *testing.T) { } func TestNew_InvalidBackoff(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() + producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), kafkaproducer.WithBackoff(0), @@ -82,7 +74,8 @@ func TestNew_InvalidBackoff(t *testing.T) { } func TestNew_InvalidDialTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() + producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), kafkaproducer.WithDialTimeout(-1*time.Second), @@ -92,7 +85,8 @@ func TestNew_InvalidDialTimeout(t *testing.T) { } func TestNew_InvalidReadTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() + producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), kafkaproducer.WithReadTimeout(-1*time.Second), @@ -102,7 +96,8 @@ func TestNew_InvalidReadTimeout(t *testing.T) { } func TestNew_InvalidWriteTimeout(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() + producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), kafkaproducer.WithWriteTimeout(-1*time.Second), @@ -112,7 +107,8 @@ func TestNew_InvalidWriteTimeout(t *testing.T) { } func TestNew_WithNilProducerFactoryFunc(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() + producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), kafkaproducer.WithSaramaProducerFactoryFunc(nil), @@ -122,7 +118,7 @@ func TestNew_WithNilProducerFactoryFunc(t *testing.T) { } func TestNew_WithSaramaProducerFactoryFunc_Error(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() mockConfig := mocks.NewTestConfig() mockProducer := mocks.NewAsyncProducer(t, mockConfig) @@ -145,41 +141,8 @@ func TestNew_WithSaramaProducerFactoryFunc_Error(t *testing.T) { mockFactory.AssertExpectations(t) } -func TestNew_NoLogger(t *testing.T) { - var kafkaBrokers kafkacp.KafkaBrokers - kafkaBrokers.AddFromString("127.0.0.1:9094") - - producer, err := kafkaproducer.New( - kafkaproducer.WithKafkaBrokers(kafkaBrokers), - kafkaproducer.WithMaxRetries(2), - kafkaproducer.WithBackoff(time.Second), - kafkaproducer.WithDialTimeout(5*time.Second), - kafkaproducer.WithReadTimeout(5*time.Second), - kafkaproducer.WithWriteTimeout(5*time.Second), - ) - assert.ErrorIs(t, err, cerrors.ErrValueRequired) - assert.Nil(t, producer) -} - -func TestNew_NilLogger(t *testing.T) { - var kafkaBrokers kafkacp.KafkaBrokers - kafkaBrokers.AddFromString("127.0.0.1:9094") - - producer, err := kafkaproducer.New( - kafkaproducer.WithLogger(nil), - kafkaproducer.WithKafkaBrokers(kafkaBrokers), - kafkaproducer.WithMaxRetries(2), - kafkaproducer.WithBackoff(time.Second), - kafkaproducer.WithDialTimeout(5*time.Second), - kafkaproducer.WithReadTimeout(5*time.Second), - kafkaproducer.WithWriteTimeout(5*time.Second), - ) - assert.ErrorIs(t, err, cerrors.ErrValueRequired) - assert.Nil(t, producer) -} - func TestNew_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() mockConfig := mocks.NewTestConfig() mockProducer := mocks.NewAsyncProducer(t, mockConfig) @@ -192,9 +155,13 @@ func TestNew_Success(t *testing.T) { producer, err := kafkaproducer.New( kafkaproducer.WithLogger(logger), + kafkaproducer.WithKafkaBrokers("127.0.0.1:9094"), kafkaproducer.WithSaramaProducerFactoryFunc(mockFactory.NewAsyncProducer), kafkaproducer.WithMaxRetries(3), kafkaproducer.WithBackoff(100*time.Millisecond), + kafkaproducer.WithDialTimeout(5*time.Second), + kafkaproducer.WithReadTimeout(5*time.Second), + kafkaproducer.WithWriteTimeout(5*time.Second), ) assert.NoError(t, err) diff --git a/internal/slogger/mockslogger/mockslogger.go b/internal/slogger/mockslogger/mockslogger.go new file mode 100644 index 0000000..0a3a951 --- /dev/null +++ b/internal/slogger/mockslogger/mockslogger.go @@ -0,0 +1,31 @@ +//nolint:all +package mockslogger + +import ( + "context" + "log/slog" +) + +var _ slog.Handler = (*MockLogger)(nil) // compile time proof + +type MockLogger struct{} + +func (h *MockLogger) Enabled(_ context.Context, _ slog.Level) bool { + return true +} + +func (h *MockLogger) Handle(_ context.Context, _ slog.Record) error { + return nil +} + +func (h *MockLogger) WithAttrs(_ []slog.Attr) slog.Handler { + return h +} + +func (h *MockLogger) WithGroup(_ string) slog.Handler { + return h +} + +func New() *slog.Logger { + return slog.New(new(MockLogger)) +} diff --git a/internal/slogger/slogger.go b/internal/slogger/slogger.go index 8055215..7701ea6 100644 --- a/internal/slogger/slogger.go +++ b/internal/slogger/slogger.go @@ -32,7 +32,10 @@ type Option func(*JSONLogger) error func WithLogLevel(l slog.Leveler) Option { return func(jl *JSONLogger) error { if l == nil { - return fmt.Errorf("slogger WithLogLevel error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[slogger.WithLogLevel] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } jl.Level = l @@ -40,27 +43,35 @@ func WithLogLevel(l slog.Leveler) Option { } } +func validLogLevels() map[string]slog.Level { + return map[string]slog.Level{ + "DEBUG": LevelDebug, + "INFO": LevelInfo, + "WARN": LevelWarn, + "ERROR": LevelError, + } +} + // WithLogLevelName sets log level from level name, such as INFO. -func WithLogLevelName(n string) Option { +func WithLogLevelName(s string) Option { return func(jl *JSONLogger) error { - if n == "" { - return fmt.Errorf("slogger WithLogLevelName error: [%w]", cerrors.ErrValueRequired) - } - - logLevelMap := map[string]slog.Level{ - "DEBUG": LevelDebug, - "INFO": LevelInfo, - "WARN": LevelWarn, - "ERROR": LevelError, + if s == "" { + return fmt.Errorf( + "[slogger.WithLogLevelName] error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) } - if level, exists := logLevelMap[n]; exists { + if level, exists := validLogLevels()[s]; exists { jl.Level = level return nil } - return fmt.Errorf("slogger WithLogLevelName error: '%s' [%w]", n, cerrors.ErrInvalid) + return fmt.Errorf( + "[slogger.WithLogLevelName] error: [%w, '%s' received]", + cerrors.ErrInvalid, s, + ) } } @@ -68,7 +79,10 @@ func WithLogLevelName(n string) Option { func WithWriter(w io.Writer) Option { return func(jl *JSONLogger) error { if w == nil { - return fmt.Errorf("slogger WithWriter error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[slogger.WithWriter] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } jl.Writer = w @@ -82,7 +96,7 @@ func New(options ...Option) (*slog.Logger, error) { for _, option := range options { if err := option(jlogger); err != nil { - return nil, fmt.Errorf("slogger option error: [%w]", err) + return nil, err } } diff --git a/internal/storage/githubstorage/githubstorage.go b/internal/storage/githubstorage/githubstorage.go index e16a741..9e49e23 100644 --- a/internal/storage/githubstorage/githubstorage.go +++ b/internal/storage/githubstorage/githubstorage.go @@ -63,9 +63,13 @@ func (GitHubStorage) prepareGitHubPayload(message *sarama.ConsumerMessage) (*Git githubStorage.KafkaPartition = message.Partition githubStorage.KafkaOffset = message.Offset - deliveryID, err := uuid.Parse(string(message.Key)) + messageKey := string(message.Key) + deliveryID, err := uuid.Parse(messageKey) if err != nil { - return nil, fmt.Errorf("githubstorage prepareGitHubPayload deliveryID error: [%w]", err) + return nil, fmt.Errorf( + "[githubstorage.prepareGitHubPayload] deliveryID error: ['%s' received, %w]", + messageKey, err, + ) } githubStorage.DeliveryID = deliveryID @@ -90,13 +94,19 @@ func (GitHubStorage) prepareGitHubPayload(message *sarama.ConsumerMessage) (*Git case "target-id": targetID, targetIDErr = strconv.ParseUint(value, 10, 64) if targetIDErr != nil { - return nil, fmt.Errorf("githubstorage prepareGitHubPayload targetID error: [%w]", targetIDErr) + return nil, fmt.Errorf( + "[githubstorage.prepareGitHubPayload] targetID error: ['%s' received, %w]", + value, targetIDErr, + ) } githubStorage.TargetID = targetID case "hook-id": hookID, hookIDErr = strconv.ParseUint(value, 10, 64) if hookIDErr != nil { - return nil, fmt.Errorf("githubstorage prepareGitHubPayload hookID error: [%w]", hookIDErr) + return nil, fmt.Errorf( + "[githubstorage.prepareGitHubPayload] hookID error: ['%s' received, %w]", + value, hookIDErr, + ) } githubStorage.HookID = hookID case "sender-login": @@ -104,7 +114,10 @@ func (GitHubStorage) prepareGitHubPayload(message *sarama.ConsumerMessage) (*Git case "sender-id": userID, userIDErr = strconv.ParseInt(value, 10, 64) if userIDErr != nil { - return nil, fmt.Errorf("githubstorage prepareGitHubPayload userID error: [%w]", userIDErr) + return nil, fmt.Errorf( + "[githubstorage.prepareGitHubPayload] userID error: ['%s' received, %w]", + value, userIDErr, + ) } githubStorage.UserID = userID } @@ -138,7 +151,7 @@ func (s GitHubStorage) Ping(ctx context.Context, maxRetries uint8, backoff time. } if pingErr != nil { - return fmt.Errorf("githubstorage Ping error: [%w]", pingErr) + return fmt.Errorf("[githubstorage.Ping] error: [%w]", pingErr) } return nil @@ -148,7 +161,7 @@ func (s GitHubStorage) Ping(ctx context.Context, maxRetries uint8, backoff time. func (s GitHubStorage) MessageStore(ctx context.Context, message *sarama.ConsumerMessage) error { payload, err := s.prepareGitHubPayload(message) if err != nil { - return fmt.Errorf("githubstorage Store payload error: [%w]", err) + return fmt.Errorf("[githubstorage.MessageStore] payload error: [%w]", err) } _, err = s.Pool.Exec( @@ -166,7 +179,7 @@ func (s GitHubStorage) MessageStore(ctx context.Context, message *sarama.Consume payload.Payload, ) if err != nil { - return fmt.Errorf("githubstorage Store Pool.Exec error: [%w]", err) + return fmt.Errorf("[githubstorage.MessageStore][Pool.Exec] error: [%w]", err) } return nil @@ -174,11 +187,17 @@ func (s GitHubStorage) MessageStore(ctx context.Context, message *sarama.Consume func (s GitHubStorage) checkRequired() error { if s.Logger == nil { - return fmt.Errorf("githubstorage check required, Logger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubstorage.checkRequired] Logger error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } if s.DatabaseDSN == "" { - return fmt.Errorf("githubstorage check required, DatabaseDSN error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubstorage.checkRequired] DatabaseDSN error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) } return nil @@ -191,7 +210,10 @@ type Option func(*GitHubStorage) error func WithLogger(l *slog.Logger) Option { return func(s *GitHubStorage) error { if l == nil { - return fmt.Errorf("githubstorage WithLogger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubstorage.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } s.Logger = l @@ -203,7 +225,10 @@ func WithLogger(l *slog.Logger) Option { func WithDatabaseDSN(dsn string) Option { return func(s *GitHubStorage) error { if dsn == "" { - return fmt.Errorf("githubstorage WithDatabaseDSN error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubstorage.WithDatabaseDSN] error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) } s.DatabaseDSN = dsn @@ -217,7 +242,7 @@ func New(ctx context.Context, options ...Option) (*GitHubStorage, error) { for _, option := range options { if err := option(githubStorage); err != nil { - return nil, fmt.Errorf("githubstorage option error: [%w]", err) + return nil, err } } @@ -227,12 +252,12 @@ func New(ctx context.Context, options ...Option) (*GitHubStorage, error) { config, err := pgxpool.ParseConfig(githubStorage.DatabaseDSN) if err != nil { - return nil, fmt.Errorf("githubstorage pgxpool.ParseConfig error: [%w]", err) + return nil, fmt.Errorf("[githubstorage.New][pgxpool.ParseConfig] error: [%w]", err) } pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { - return nil, fmt.Errorf("githubstorage pgxpool.NewWithConfig error: [%w]", err) + return nil, fmt.Errorf("[githubstorage.New][pgxpool.NewWithConfig] error: [%w]", err) } githubStorage.Pool = pool diff --git a/internal/storage/githubstorage/githubstorage_test.go b/internal/storage/githubstorage/githubstorage_test.go index 2e3f449..bd4d5af 100644 --- a/internal/storage/githubstorage/githubstorage_test.go +++ b/internal/storage/githubstorage/githubstorage_test.go @@ -3,12 +3,12 @@ package githubstorage_test import ( "context" "errors" - "log/slog" "testing" "time" "github.com/IBM/sarama" "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" "github.com/devchain-network/cauldron/internal/storage" "github.com/devchain-network/cauldron/internal/storage/githubstorage" "github.com/google/uuid" @@ -18,24 +18,6 @@ import ( "github.com/stretchr/testify/mock" ) -type mockLogger struct{} - -func (h *mockLogger) Enabled(_ context.Context, _ slog.Level) bool { - return true -} - -func (h *mockLogger) Handle(_ context.Context, record slog.Record) error { - return nil -} - -func (h *mockLogger) WithAttrs(attrs []slog.Attr) slog.Handler { - return h -} - -func (h *mockLogger) WithGroup(name string) slog.Handler { - return h -} - type MockPGPooler struct { mock.Mock } @@ -95,7 +77,7 @@ func TestNew_NilLogger(t *testing.T) { } func TestNew_NoDSN(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -110,7 +92,7 @@ func TestNew_NoDSN(t *testing.T) { } func TestNew_EmptyDSN(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -126,7 +108,7 @@ func TestNew_EmptyDSN(t *testing.T) { } func TestNew_InvalidDSN(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -143,7 +125,7 @@ func TestNew_InvalidDSN(t *testing.T) { } func TestNew_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -161,7 +143,7 @@ func TestNew_Success(t *testing.T) { } func TestPing_Fail(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -188,7 +170,7 @@ func TestPing_Fail(t *testing.T) { } func TestPing_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -214,7 +196,7 @@ func TestPing_Success(t *testing.T) { } func TestStore_Fail_EmptyMessage(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -240,7 +222,7 @@ func TestStore_Fail_EmptyMessage(t *testing.T) { } func TestStore_Fail_Message_InvalidTargetID(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -276,7 +258,7 @@ func TestStore_Fail_Message_InvalidTargetID(t *testing.T) { } func TestStore_Fail_Message_InvalidHookID(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -313,7 +295,7 @@ func TestStore_Fail_Message_InvalidHookID(t *testing.T) { } func TestStore_Fail_Message_InvalidSenderID(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -353,7 +335,7 @@ func TestStore_Fail_Message_InvalidSenderID(t *testing.T) { } func TestStore_Insert_Error(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() @@ -399,7 +381,7 @@ func TestStore_Insert_Error(t *testing.T) { } func TestStore_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) defer cancel() diff --git a/internal/storage/storage.go b/internal/storage/storage.go index b596938..97520f9 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -19,10 +19,12 @@ const ( GitProviderBitbucket GitProvider = "bitbucket" ) -var validGitProviders = []GitProvider{ - GitProviderGitHub, - GitProviderGitLab, - GitProviderBitbucket, +func validGitProviders() []GitProvider { + return []GitProvider{ + GitProviderGitHub, + GitProviderGitLab, + GitProviderBitbucket, + } } // PGPooler defines pgxpool behaviours. @@ -57,7 +59,7 @@ func (g GitProvider) String() string { // Valid checks if the GitProvider is valid. func (g GitProvider) Valid() bool { - for _, provider := range validGitProviders { + for _, provider := range validGitProviders() { if g == provider { return true } diff --git a/internal/transport/http/githubwebhookhandler/githubwebhookhandler.go b/internal/transport/http/githubwebhookhandler/githubwebhookhandler.go index 2f065fd..1eae2a6 100644 --- a/internal/transport/http/githubwebhookhandler/githubwebhookhandler.go +++ b/internal/transport/http/githubwebhookhandler/githubwebhookhandler.go @@ -151,16 +151,28 @@ func (h Handler) Handle(ctx *fasthttp.RequestCtx) { func (h Handler) checkRequired() error { if h.Logger == nil { - return fmt.Errorf("github webhook handler check required, Logger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubwebhookhandler.checkRequired] Logger error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } if !h.Topic.Valid() { - return fmt.Errorf("github webhook handler check required, Topic error: [%w]", cerrors.ErrInvalid) + return fmt.Errorf( + "[githubwebhookhandler.checkRequired] Topic error: [%w, '%s' received]", + cerrors.ErrInvalid, h.Topic, + ) } if h.Secret == "" { - return fmt.Errorf("github webhook handler check required, Secret error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubwebhookhandler.checkRequired] Secret error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) } if h.MessageQueue == nil { - return fmt.Errorf("github webhook handler check required, MessageQueue error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubwebhookhandler.checkRequired] MessageQueue error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } return nil @@ -173,7 +185,10 @@ type Option func(*Handler) error func WithLogger(l *slog.Logger) Option { return func(h *Handler) error { if l == nil { - return fmt.Errorf("github webhook handler WithLogger error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubwebhookhandler.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) } h.Logger = l @@ -182,12 +197,16 @@ func WithLogger(l *slog.Logger) Option { } // WithTopic sets topic name to consume. -func WithTopic(s kafkacp.KafkaTopicIdentifier) Option { +func WithTopic(s string) Option { return func(h *Handler) error { - if !s.Valid() { - return fmt.Errorf("github webhook handler WithTopic h.Topic error: [%w]", cerrors.ErrInvalid) + topic := kafkacp.KafkaTopicIdentifier(s) + if !topic.Valid() { + return fmt.Errorf( + "[githubwebhookhandler.WithTopic] error: [%w, '%s' received]", + cerrors.ErrInvalid, s, + ) } - h.Topic = s + h.Topic = topic return nil } @@ -197,7 +216,10 @@ func WithTopic(s kafkacp.KafkaTopicIdentifier) Option { func WithWebhookSecret(s string) Option { return func(h *Handler) error { if s == "" { - return fmt.Errorf("github webhook handler WithWebhookSecret h.Secret error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[githubwebhookhandler.WithWebhookSecret] error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) } h.Secret = s @@ -211,7 +233,7 @@ func WithProducerGitHubMessageQueue(mq chan *sarama.ProducerMessage) Option { return func(h *Handler) error { if mq == nil { return fmt.Errorf( - "github webhook handler WithProducerGitHubMessageQueue error: [%w]", + "[githubwebhookhandler.WithProducerGitHubMessageQueue] error: [%w, 'nil' received]", cerrors.ErrValueRequired, ) } @@ -227,7 +249,7 @@ func New(options ...Option) (*Handler, error) { for _, option := range options { if err := option(handler); err != nil { - return nil, fmt.Errorf("github webhook handler option error: [%w]", err) + return nil, err } } diff --git a/internal/transport/http/githubwebhookhandler/githubwebhookhandler_test.go b/internal/transport/http/githubwebhookhandler/githubwebhookhandler_test.go index 36de928..608fa64 100644 --- a/internal/transport/http/githubwebhookhandler/githubwebhookhandler_test.go +++ b/internal/transport/http/githubwebhookhandler/githubwebhookhandler_test.go @@ -1,41 +1,22 @@ package githubwebhookhandler_test import ( - "context" "crypto/hmac" "crypto/sha256" "encoding/hex" - "log/slog" "testing" "time" "github.com/IBM/sarama" "github.com/devchain-network/cauldron/internal/cerrors" "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" "github.com/devchain-network/cauldron/internal/transport/http/githubwebhookhandler" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/valyala/fasthttp" ) -type mockLogger struct{} - -func (h *mockLogger) Enabled(_ context.Context, _ slog.Level) bool { - return true -} - -func (h *mockLogger) Handle(_ context.Context, record slog.Record) error { - return nil -} - -func (h *mockLogger) WithAttrs(attrs []slog.Attr) slog.Handler { - return h -} - -func (h *mockLogger) WithGroup(name string) slog.Handler { - return h -} - func TestNew_NoLogger(t *testing.T) { handler, err := githubwebhookhandler.New() @@ -53,7 +34,7 @@ func TestNew_NilLogger(t *testing.T) { } func TestNew_NoTopic(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), @@ -64,11 +45,11 @@ func TestNew_NoTopic(t *testing.T) { } func TestNew_InvalidTopic(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("foo")), + githubwebhookhandler.WithTopic("foo"), ) assert.ErrorIs(t, err, cerrors.ErrInvalid) @@ -76,11 +57,11 @@ func TestNew_InvalidTopic(t *testing.T) { } func TestNew_NoSecret(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), ) assert.ErrorIs(t, err, cerrors.ErrValueRequired) @@ -88,11 +69,11 @@ func TestNew_NoSecret(t *testing.T) { } func TestNew_EmptySecret(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret(""), ) @@ -101,11 +82,11 @@ func TestNew_EmptySecret(t *testing.T) { } func TestNew_NoMessageQueue(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), ) @@ -114,11 +95,11 @@ func TestNew_NoMessageQueue(t *testing.T) { } func TestNew_NilMessageQueue(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(nil), ) @@ -128,12 +109,12 @@ func TestNew_NilMessageQueue(t *testing.T) { } func TestNew_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -143,12 +124,12 @@ func TestNew_Success(t *testing.T) { } func TestHandle_NoBody(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -163,12 +144,12 @@ func TestHandle_NoBody(t *testing.T) { } func TestHandle_NoHMAC(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -184,12 +165,12 @@ func TestHandle_NoHMAC(t *testing.T) { } func TestHandle_InvalidHMAC(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -213,12 +194,12 @@ func newMockRequestCtx() *fasthttp.RequestCtx { } func TestHandle_NoXGithubEvent(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -253,12 +234,12 @@ func TestHandle_NoXGithubEvent(t *testing.T) { } func TestHandle_NoXGithubDeliveryID(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -294,12 +275,12 @@ func TestHandle_NoXGithubDeliveryID(t *testing.T) { } func TestHandle_NoXGithubHookID(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -336,12 +317,12 @@ func TestHandle_NoXGithubHookID(t *testing.T) { } func TestHandle_NoXGithubInstallationTargetID(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -379,12 +360,12 @@ func TestHandle_NoXGithubInstallationTargetID(t *testing.T) { } func TestHandle_NoXGithubInstallationTargetType(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -423,12 +404,12 @@ func TestHandle_NoXGithubInstallationTargetType(t *testing.T) { } func TestHandle_NoSenderLogin(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -468,12 +449,12 @@ func TestHandle_NoSenderLogin(t *testing.T) { } func TestHandle_NoSenderID(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -513,12 +494,12 @@ func TestHandle_NoSenderID(t *testing.T) { } func TestMessageQueue_Scenarios(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 1) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) @@ -554,12 +535,12 @@ func TestMessageQueue_Scenarios(t *testing.T) { } func TestHandle_Success(t *testing.T) { - logger := slog.New(new(mockLogger)) + logger := mockslogger.New() messageQueue := make(chan *sarama.ProducerMessage, 10) handler, err := githubwebhookhandler.New( githubwebhookhandler.WithLogger(logger), - githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifier("github")), + githubwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), githubwebhookhandler.WithWebhookSecret("my-secret"), githubwebhookhandler.WithProducerGitHubMessageQueue(messageQueue), ) diff --git a/internal/transport/http/healthcheckhandler/healthcheckhandler.go b/internal/transport/http/healthcheckhandler/healthcheckhandler.go index bf5b213..ae16ffa 100644 --- a/internal/transport/http/healthcheckhandler/healthcheckhandler.go +++ b/internal/transport/http/healthcheckhandler/healthcheckhandler.go @@ -28,7 +28,10 @@ func (h Handler) Handle(ctx *fasthttp.RequestCtx) { func (h Handler) checkRequired() error { if h.Version == "" { - return fmt.Errorf("health check handler check required, Version error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[healthcheckhandler.checkRequired], Version error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) } return nil @@ -41,7 +44,10 @@ type Option func(*Handler) error func WithVersion(s string) Option { return func(h *Handler) error { if s == "" { - return fmt.Errorf("health checkhandler WithVersion error: [%w]", cerrors.ErrValueRequired) + return fmt.Errorf( + "[healthcheckhandler.checkRequired] WithVersion error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) } h.Version = s @@ -55,7 +61,7 @@ func New(options ...Option) (*Handler, error) { for _, option := range options { if err := option(handler); err != nil { - return nil, fmt.Errorf("health check handler option error: [%w]", err) + return nil, err } }