diff --git a/publish.go b/publish.go index 946db03..11c0480 100644 --- a/publish.go +++ b/publish.go @@ -99,8 +99,7 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager - notifyReturnChan chan Return - shouldNotifyReturn bool + notifyReturnChan chan Return disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -154,17 +153,17 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, - notifyReturnChan: make(chan Return), - shouldNotifyReturn: false, + notifyReturnChan: nil, } - go func() { - go publisher.startNotifyFlowHandler() + go publisher.startNotifyFlowHandler() + // restart notifiers when cancel/close is triggered + go func() { for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) go publisher.startNotifyFlowHandler() - if publisher.shouldNotifyReturn { + if publisher.notifyReturnChan != nil { go publisher.startNotifyReturnHandler() } } @@ -176,7 +175,8 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher // NotifyReturn registers a listener for basic.return methods. // These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. func (publisher *Publisher) NotifyReturn() <-chan Return { - publisher.shouldNotifyReturn = true + publisher.notifyReturnChan = make(chan Return) + go publisher.startNotifyReturnHandler() return publisher.notifyReturnChan }