diff --git a/neli.go b/neli.go index 6ccae61..b72744b 100644 --- a/neli.go +++ b/neli.go @@ -46,6 +46,7 @@ type neli struct { barrier Barrier state concurrent.AtomicReference stateMutex sync.Mutex + deliveryHandlerDone chan int } // State of the Neli instance. @@ -83,6 +84,7 @@ func New(config Config, barrier ...Barrier) (Neli, error) { barrier: barrierArg, pollDeadline: concurrent.NewDeadline(*config.MinPollInterval), state: concurrent.NewAtomicReference(Live), + deliveryHandlerDone: make(chan int), } consumerConfigs := copyKafkaConfig(n.config.KafkaConfig) @@ -134,6 +136,8 @@ func New(config Config, barrier ...Barrier) (Neli, error) { } n.producer = p go func() { + defer close(n.deliveryHandlerDone) + for event := range p.Events() { switch e := event.(type) { case *kafka.Message: @@ -360,7 +364,17 @@ func (n *neli) Close() error { defer n.state.Set(Closed) n.state.Set(Closing) - n.producer.Close() + defer func() { + <-n.deliveryHandlerDone + }() + defer func() { + go func() { + // A bug in confluent-kafka-go (#463) occasionally causes an indefinite syscall hang in Close(), after it closes + // the Events channel. So we delegate this to a separate goroutine — better an orphaned goroutine than a + // frozen harvester. (The rest of the battery will still unwind normally.) + n.producer.Close() + }() + }() return n.consumer.Close() }