Skip to content

Commit

Permalink
try fix goroutines leak
Browse files Browse the repository at this point in the history
  • Loading branch information
WiRight committed Jun 14, 2023
1 parent 3f6ae03 commit 56e0b50
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 30 deletions.
36 changes: 24 additions & 12 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Consumer struct {
chanManager *channelmanager.ChannelManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
notifyClosedChan <-chan error
options ConsumerOptions

isClosedMux *sync.RWMutex
Expand Down Expand Up @@ -67,12 +68,13 @@ func NewConsumer(
if err != nil {
return nil, err
}
reconnectErrCh, closeCh, _ := chanManager.NotifyReconnect()
reconnectErrCh, closeCh, notifyClosedChan := chanManager.NotifyReconnect()

consumer := &Consumer{
chanManager: chanManager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
notifyClosedChan: notifyClosedChan,
options: *options,
isClosedMux: &sync.RWMutex{},
isClosed: false,
Expand All @@ -82,20 +84,24 @@ func NewConsumer(
handler,
*options,
)

if err != nil {
return nil, err
}

go func() {
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)

err = consumer.startGoroutines(
handler,
*options,
)

if err != nil {
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err)
consumer.options.Logger.Fatalf("consumer closing, unable to recover")

return
}
}
Expand All @@ -111,18 +117,18 @@ func NewConsumer(
func (consumer *Consumer) Close() {
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()

consumer.isClosed = true

// close the channel so that rabbitmq server knows that the
// consumer has been stopped.
err := consumer.chanManager.Close()
if err != nil {
if err := consumer.chanManager.Close(); err != nil {
consumer.options.Logger.Warnf("error while closing the channel: %v", err)
}

consumer.options.Logger.Infof("closing consumer...")
go func() {
consumer.closeConnectionToManagerCh <- struct{}{}
}()

close(consumer.closeConnectionToManagerCh)
}

// startGoroutines declares the queue if it doesn't exist,
Expand All @@ -137,19 +143,20 @@ func (consumer *Consumer) startGoroutines(
0,
options.QOSGlobal,
)

if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
err = declareExchange(consumer.chanManager, options.ExchangeOptions)
if err != nil {

if err = declareExchange(consumer.chanManager, options.ExchangeOptions); err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {

if err = declareQueue(consumer.chanManager, options.QueueOptions); err != nil {
return fmt.Errorf("declare queue failed: %w", err)
}
err = declareBindings(consumer.chanManager, options)
if err != nil {

if err = declareBindings(consumer.chanManager, options); err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
}

Expand All @@ -162,20 +169,24 @@ func (consumer *Consumer) startGoroutines(
options.RabbitConsumerOptions.NoWait,
tableToAMQPTable(options.RabbitConsumerOptions.Args),
)

if err != nil {
return err
}

for i := 0; i < options.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, options, handler)
}

consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency)

return nil
}

func (consumer *Consumer) getIsClosed() bool {
consumer.isClosedMux.RLock()
defer consumer.isClosedMux.RUnlock()

return consumer.isClosed
}

Expand Down Expand Up @@ -208,5 +219,6 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
}
}
}

consumer.options.Logger.Infof("rabbit consumer goroutine closed")
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/DizoftTeam/go-rabbitmq

go 1.20

require github.com/rabbitmq/amqp091-go v1.8.0
require (
github.com/rabbitmq/amqp091-go v1.8.0
github.com/wagslane/go-rabbitmq v0.12.3
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/wagslane/go-rabbitmq v0.12.3 h1:nHoW6SgwaGNTjNyHGhcZwdJGru2228RZTwucxqmgA9M=
github.com/wagslane/go-rabbitmq v0.12.3/go.mod h1:1sUJ53rrW2AIA7LEp8ymmmebHqqq8ksH/gXIfUP0I0s=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
20 changes: 14 additions & 6 deletions internal/channelmanager/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() {
chanManager.reconnectLoop()
chanManager.logger.Warnf("successfully reconnected to amqp server")
chanManager.dispatcher.Dispatch(err)
chanManager.dispatcher.DispatchLooseConnection(err)
}

if err == nil {
Expand All @@ -83,33 +84,41 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() {
chanManager.reconnectLoop()
chanManager.logger.Warnf("successfully reconnected to amqp server after cancel")
chanManager.dispatcher.Dispatch(errors.New(err))
chanManager.dispatcher.DispatchLooseConnection(errors.New(err))
}
}

// GetReconnectionCount -
func (chanManager *ChannelManager) GetReconnectionCount() uint {
chanManager.reconnectionCountMux.Lock()
defer chanManager.reconnectionCountMux.Unlock()

return chanManager.reconnectionCount
}

func (chanManager *ChannelManager) incrementReconnectionCount() {
chanManager.reconnectionCountMux.Lock()
defer chanManager.reconnectionCountMux.Unlock()

chanManager.reconnectionCount++
}

// reconnectLoop continuously attempts to reconnect
func (chanManager *ChannelManager) reconnectLoop() {
for {
chanManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chanManager.reconnectInterval)

time.Sleep(chanManager.reconnectInterval)

err := chanManager.reconnect()

if err != nil {
chanManager.logger.Errorf("error reconnecting to amqp server: %v", err)
} else {
chanManager.incrementReconnectionCount()

go chanManager.startNotifyCancelOrClosed()

return
}
}
Expand All @@ -119,7 +128,9 @@ func (chanManager *ChannelManager) reconnectLoop() {
func (chanManager *ChannelManager) reconnect() error {
chanManager.channelMux.Lock()
defer chanManager.channelMux.Unlock()

newChannel, err := getNewChannel(chanManager.connManager)

if err != nil {
return err
}
Expand All @@ -129,21 +140,18 @@ func (chanManager *ChannelManager) reconnect() error {
}

chanManager.channel = newChannel

return nil
}

// Close safely closes the current channel and connection
func (chanManager *ChannelManager) Close() error {
chanManager.logger.Infof("closing channel manager...")

chanManager.channelMux.Lock()
defer chanManager.channelMux.Unlock()

err := chanManager.channel.Close()
if err != nil {
return err
}

return nil
return chanManager.channel.Close()
}

// NotifyReconnect adds a new subscriber that will receive error messages whenever
Expand Down
23 changes: 14 additions & 9 deletions internal/connectionmanager/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type ConnectionManager struct {

// NewConnectionManager creates a new connection manager
func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
conn, err := amqp.DialConfig(url, amqp.Config(conf))
conn, err := amqp.DialConfig(url, conf)
if err != nil {
return nil, err
}

connManager := ConnectionManager{
logger: log,
url: url,
Expand All @@ -39,21 +40,20 @@ func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, recon
reconnectionCountMux: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
}

go connManager.startNotifyClose()

return &connManager, nil
}

// Close safely closes the current channel and connection
func (connManager *ConnectionManager) Close() error {
connManager.logger.Infof("closing connection manager...")

connManager.connectionMux.Lock()
defer connManager.connectionMux.Unlock()

err := connManager.connection.Close()
if err != nil {
return err
}
return nil
return connManager.connection.Close()
}

// NotifyReconnect adds a new subscriber that will receive error messages whenever
Expand Down Expand Up @@ -84,11 +84,11 @@ func (connManager *ConnectionManager) startNotifyClose() {

if err != nil {
connManager.logger.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err)
connManager.dispatcher.DispathLooseConnection(err)
connManager.dispatcher.DispatchLooseConnection(err)
connManager.reconnectLoop()
connManager.logger.Warnf("successfully reconnected to amqp server")
connManager.dispatcher.Dispatch(err)
connManager.dispatcher.DispathLooseConnection(nil)
connManager.dispatcher.DispatchLooseConnection(nil)
}

if err == nil {
Expand All @@ -100,20 +100,24 @@ func (connManager *ConnectionManager) startNotifyClose() {
func (connManager *ConnectionManager) GetReconnectionCount() uint {
connManager.reconnectionCountMux.Lock()
defer connManager.reconnectionCountMux.Unlock()

return connManager.reconnectionCount
}

func (connManager *ConnectionManager) incrementReconnectionCount() {
connManager.reconnectionCountMux.Lock()
defer connManager.reconnectionCountMux.Unlock()

connManager.reconnectionCount++
}

// reconnectLoop continuously attempts to reconnect
func (connManager *ConnectionManager) reconnectLoop() {
for {
connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.ReconnectInterval)

time.Sleep(connManager.ReconnectInterval)

err := connManager.reconnect()

if err != nil {
Expand All @@ -132,7 +136,8 @@ func (connManager *ConnectionManager) reconnectLoop() {
func (connManager *ConnectionManager) reconnect() error {
connManager.connectionMux.Lock()
defer connManager.connectionMux.Unlock()
newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig))

newConn, err := amqp.DialConfig(connManager.url, connManager.amqpConfig)

if err != nil {
return err
Expand Down
8 changes: 6 additions & 2 deletions internal/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (d *Dispatcher) Dispatch(err error) error {
return nil
}

// DispathLooseConnection dispatching that connection to RabbitMQ is loosed
func (d *Dispatcher) DispathLooseConnection(err error) error {
// DispatchLooseConnection dispatching that connection to RabbitMQ is loosed
func (d *Dispatcher) DispatchLooseConnection(err error) error {
d.subscribersMux.Lock()
defer d.subscribersMux.Unlock()

Expand Down Expand Up @@ -79,12 +79,16 @@ func (d *Dispatcher) AddSubscriber() (<-chan error, chan<- struct{}, <-chan erro

go func(id int) {
<-closeCh

d.subscribersMux.Lock()
defer d.subscribersMux.Unlock()

sub, ok := d.subscribers[id]

if !ok {
return
}

close(sub.notifyCancelOrCloseChan)
delete(d.subscribers, id)
}(id)
Expand Down

0 comments on commit 56e0b50

Please sign in to comment.