Skip to content

Commit

Permalink
Workaround confluent-kafka-go bug #463
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed May 2, 2020
1 parent 2bfa34b commit b7232d1
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion neli.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type neli struct {
barrier Barrier
state concurrent.AtomicReference
stateMutex sync.Mutex
deliveryHandlerDone chan int
}

// State of the Neli instance.
Expand Down Expand Up @@ -83,6 +84,7 @@ func New(config Config, barrier ...Barrier) (Neli, error) {
barrier: barrierArg,
pollDeadline: concurrent.NewDeadline(*config.MinPollInterval),
state: concurrent.NewAtomicReference(Live),
deliveryHandlerDone: make(chan int),
}

consumerConfigs := copyKafkaConfig(n.config.KafkaConfig)
Expand Down Expand Up @@ -134,6 +136,8 @@ func New(config Config, barrier ...Barrier) (Neli, error) {
}
n.producer = p
go func() {
defer close(n.deliveryHandlerDone)

for event := range p.Events() {
switch e := event.(type) {
case *kafka.Message:
Expand Down Expand Up @@ -360,7 +364,17 @@ func (n *neli) Close() error {
defer n.state.Set(Closed)

n.state.Set(Closing)
n.producer.Close()
defer func() {
<-n.deliveryHandlerDone
}()
defer func() {
go func() {
// A bug in confluent-kafka-go (#463) occasionally causes an indefinite syscall hang in Close(), after it closes
// the Events channel. So we delegate this to a separate goroutine — better an orphaned goroutine than a
// frozen harvester. (The rest of the battery will still unwind normally.)
n.producer.Close()
}()
}()
return n.consumer.Close()
}

Expand Down

0 comments on commit b7232d1

Please sign in to comment.