From 355fdf613d137db1484729df80d78d9eb9d92b1b Mon Sep 17 00:00:00 2001 From: wagslane Date: Tue, 20 Jul 2021 12:33:01 -0600 Subject: [PATCH 1/2] notify return safety --- examples/logger/main.go | 3 ++- examples/publisher/main.go | 3 ++- publish.go | 47 +++++++++++++++++++++++--------------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/examples/logger/main.go b/examples/logger/main.go index 1844112..32404ea 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -18,7 +18,7 @@ func (c *customLogger) Printf(fmt string, args ...interface{}) { func main() { mylogger := &customLogger{} - publisher, returns, err := rabbitmq.NewPublisher( + publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogger(mylogger), ) @@ -37,6 +37,7 @@ func main() { log.Fatal(err) } + returns := publisher.NotifyReturn() go func() { for r := range returns { log.Printf("message returned from server: %s", string(r.Body)) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 7ac33b0..a92ae20 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -8,7 +8,7 @@ import ( ) func main() { - publisher, returns, err := rabbitmq.NewPublisher( + publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogging, ) @@ -27,6 +27,7 @@ func main() { log.Fatal(err) } + returns := publisher.NotifyReturn() go func() { for r := range returns { log.Printf("message returned from server: %s", string(r.Body)) diff --git a/publish.go b/publish.go index ee7cce3..f53a623 100644 --- a/publish.go +++ b/publish.go @@ -99,7 +99,8 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager - notifyReturnChan chan Return + notifyReturnChan chan Return + shouldNotifyReturn bool disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -134,7 +135,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown -func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) { +func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, error) { options := &PublisherOptions{} for _, optionFunc := range optionFuncs { optionFunc(options) @@ -145,26 +146,38 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher chManager, err := newChannelManager(url, config, options.Logger) if err != nil { - return Publisher{}, nil, err + return Publisher{}, err } publisher := Publisher{ chManager: chManager, - notifyReturnChan: make(chan Return, 1), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, + notifyReturnChan: make(chan Return), + shouldNotifyReturn: false, } go func() { - publisher.startNotifyHandlers() + go publisher.startNotifyFlowHandler() + for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - publisher.startNotifyHandlers() + go publisher.startNotifyFlowHandler() + if publisher.shouldNotifyReturn { + go publisher.startNotifyReturnHandler() + } } }() - return publisher, publisher.notifyReturnChan, nil + return publisher, nil +} + +// NotifyReturn registers a listener for basic.return methods. +// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. +func (publisher *Publisher) NotifyReturn() <-chan Return { + publisher.shouldNotifyReturn = true + return publisher.notifyReturnChan } // Publish publishes the provided data to the given routing keys over the connection @@ -217,19 +230,8 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyHandlers() { - returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) - go func() { - for ret := range returnAMQPChan { - publisher.notifyReturnChan <- Return{ret} - } - }() - +func (publisher *Publisher) startNotifyFlowHandler() { notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) - go publisher.startNotifyFlowHandler(notifyFlowChan) -} - -func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlow = false publisher.disablePublishDueToFlowMux.Unlock() @@ -248,3 +250,10 @@ func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { publisher.disablePublishDueToFlowMux.Unlock() } } + +func (publisher *Publisher) startNotifyReturnHandler() { + returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) + for ret := range returnAMQPCh { + publisher.notifyReturnChan <- Return{ret} + } +} From b89627cfad1776b8231cc79490d7525f195de168 Mon Sep 17 00:00:00 2001 From: wagslane Date: Mon, 6 Sep 2021 13:36:17 -0600 Subject: [PATCH 2/2] victors requests - one flag for notify return --- publish.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/publish.go b/publish.go index 946db03..11c0480 100644 --- a/publish.go +++ b/publish.go @@ -99,8 +99,7 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager - notifyReturnChan chan Return - shouldNotifyReturn bool + notifyReturnChan chan Return disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -154,17 +153,17 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, - notifyReturnChan: make(chan Return), - shouldNotifyReturn: false, + notifyReturnChan: nil, } - go func() { - go publisher.startNotifyFlowHandler() + go publisher.startNotifyFlowHandler() + // restart notifiers when cancel/close is triggered + go func() { for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) go publisher.startNotifyFlowHandler() - if publisher.shouldNotifyReturn { + if publisher.notifyReturnChan != nil { go publisher.startNotifyReturnHandler() } } @@ -176,7 +175,8 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher // NotifyReturn registers a listener for basic.return methods. // These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. func (publisher *Publisher) NotifyReturn() <-chan Return { - publisher.shouldNotifyReturn = true + publisher.notifyReturnChan = make(chan Return) + go publisher.startNotifyReturnHandler() return publisher.notifyReturnChan }