diff --git a/examples/logger/main.go b/examples/logger/main.go index 9ebcfa1..62f56ee 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -18,7 +18,7 @@ func (c *customLogger) Printf(fmt string, args ...interface{}) { func main() { mylogger := &customLogger{} - publisher, returns, err := rabbitmq.NewPublisher( + publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogger(mylogger), ) @@ -37,6 +37,7 @@ func main() { log.Fatal(err) } + returns := publisher.NotifyReturn() go func() { for r := range returns { log.Printf("message returned from server: %s", string(r.Body)) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 8339b6d..b5b0947 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -8,7 +8,7 @@ import ( ) func main() { - publisher, returns, err := rabbitmq.NewPublisher( + publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogging, ) @@ -27,6 +27,7 @@ func main() { log.Fatal(err) } + returns := publisher.NotifyReturn() go func() { for r := range returns { log.Printf("message returned from server: %s", string(r.Body)) diff --git a/publish.go b/publish.go index faf85e2..11c0480 100644 --- a/publish.go +++ b/publish.go @@ -134,7 +134,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown -func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) { +func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, error) { options := &PublisherOptions{} for _, optionFunc := range optionFuncs { optionFunc(options) @@ -145,26 +145,39 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher chManager, err := newChannelManager(url, config, options.Logger) if err != nil { - return Publisher{}, nil, err + return Publisher{}, err } publisher := Publisher{ chManager: chManager, - notifyReturnChan: make(chan Return, 1), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, + notifyReturnChan: nil, } + go publisher.startNotifyFlowHandler() + + // restart notifiers when cancel/close is triggered go func() { - publisher.startNotifyHandlers() for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - publisher.startNotifyHandlers() + go publisher.startNotifyFlowHandler() + if publisher.notifyReturnChan != nil { + go publisher.startNotifyReturnHandler() + } } }() - return publisher, publisher.notifyReturnChan, nil + return publisher, nil +} + +// NotifyReturn registers a listener for basic.return methods. +// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. +func (publisher *Publisher) NotifyReturn() <-chan Return { + publisher.notifyReturnChan = make(chan Return) + go publisher.startNotifyReturnHandler() + return publisher.notifyReturnChan } // Publish publishes the provided data to the given routing keys over the connection @@ -217,19 +230,8 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyHandlers() { - returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) - go func() { - for ret := range returnAMQPChan { - publisher.notifyReturnChan <- Return{ret} - } - }() - +func (publisher *Publisher) startNotifyFlowHandler() { notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) - go publisher.startNotifyFlowHandler(notifyFlowChan) -} - -func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlow = false publisher.disablePublishDueToFlowMux.Unlock() @@ -248,3 +250,10 @@ func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { publisher.disablePublishDueToFlowMux.Unlock() } } + +func (publisher *Publisher) startNotifyReturnHandler() { + returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) + for ret := range returnAMQPCh { + publisher.notifyReturnChan <- Return{ret} + } +}