Summary
The Producer struct requires Start() or Run() to be called to drain the internal events channel (delivery reports). However, AsyncPublish works without Start() being called — it silently pushes messages via ProduceChannel() without any compile-time or runtime indication that the event loop is missing.
Problem
When Start() is not called:
- The
events channel (producer.go#L17) fills up with unhandled delivery reports
- The confluent-kafka-go internal poller blocks because it cannot push to the full events channel
ProduceChannel() backs up because the poller is blocked
- Any goroutine calling
AsyncPublish blocks forever on p.kafka.ProduceChannel() <- km
- This results in an unbounded goroutine and memory leak
Reproduction
Call NewProducer() followed by AsyncPublish() repeatedly without ever calling Start() or Run(). Observe goroutine count growing without bound.
Suggested Improvements
- Document the requirement: The current doc on
Start() only says "starts kafka event handling" without explaining it is mandatory for AsyncPublish to function correctly. Add explicit documentation that Start()/Run() must be called when using AsyncPublish.
- Return an error: Consider returning an error from
AsyncPublish if Start() has not been called yet.
- Auto-start: Alternatively, auto-start the event loop lazily on the first
AsyncPublish call via sync.Once.
Summary
The
Producerstruct requiresStart()orRun()to be called to drain the internaleventschannel (delivery reports). However,AsyncPublishworks withoutStart()being called — it silently pushes messages viaProduceChannel()without any compile-time or runtime indication that the event loop is missing.Problem
When
Start()is not called:eventschannel (producer.go#L17) fills up with unhandled delivery reportsProduceChannel()backs up because the poller is blockedAsyncPublishblocks forever onp.kafka.ProduceChannel() <- kmReproduction
Call
NewProducer()followed byAsyncPublish()repeatedly without ever callingStart()orRun(). Observe goroutine count growing without bound.Suggested Improvements
Start()only says "starts kafka event handling" without explaining it is mandatory forAsyncPublishto function correctly. Add explicit documentation thatStart()/Run()must be called when usingAsyncPublish.AsyncPublishifStart()has not been called yet.AsyncPublishcall viasync.Once.