Skip to content

Commit 337e3ed

Browse files
committed
Connection/Channel.NotifyClose: allow only buffered channel=1
see knative-extensions/eventing-rabbitmq#1369
1 parent 4172682 commit 337e3ed

File tree

6 files changed

+97
-37
lines changed

6 files changed

+97
-37
lines changed

_examples/consumer/consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (
9696
}
9797

9898
go func() {
99-
Log.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
99+
Log.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error, 1)))
100100
}()
101101

102102
Log.Printf("got Connection, getting Channel")

channel.go

+6
Original file line numberDiff line numberDiff line change
@@ -504,8 +504,14 @@ graceful close, no error will be sent.
504504
505505
In case of a non graceful close the error will be notified synchronously by the library
506506
so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks
507+
508+
The chan provided must be a buffered channel of size 1.
507509
*/
508510
func (ch *Channel) NotifyClose(c chan *Error) chan *Error {
511+
if cap(c) != 1 {
512+
panic("channel.NotifyClose expectes cap=1 buffered channel")
513+
}
514+
509515
ch.notifyM.Lock()
510516
defer ch.notifyM.Unlock()
511517

client_test.go

+79-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package amqp091
77

88
import (
9+
"fmt"
910
"bytes"
1011
"context"
1112
"io"
@@ -632,7 +633,41 @@ func TestNotifyClosesReusedPublisherConfirmChan(t *testing.T) {
632633
}
633634
}
634635

635-
func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
636+
func TestConnectionNotifyCloseAcceptsOnlyBufferedChannels(t *testing.T) {
637+
rwc, srv := newSession(t)
638+
t.Cleanup(func() { rwc.Close() })
639+
640+
go func() {
641+
srv.connectionOpen()
642+
643+
srv.recv(0, &connectionClose{})
644+
srv.send(0, &connectionCloseOk{})
645+
}()
646+
647+
c, err := Open(rwc, defaultConfig())
648+
if err != nil {
649+
t.Fatalf("could not create connection: %v (%s)", c, err)
650+
}
651+
652+
653+
if err := c.Close(); err != nil {
654+
t.Fatalf("could not close connection: %v (%s)", c, err)
655+
}
656+
657+
defer func() {
658+
_ = recover()
659+
}()
660+
661+
select {
662+
case <-c.NotifyClose(make(chan *Error)):
663+
case <-time.After(time.Millisecond):
664+
t.Errorf("expected to close NotifyClose chan after Connection.Close")
665+
}
666+
667+
t.Errorf("connection.NotifyClose shouldn't accept unbuffered channels")
668+
}
669+
670+
func TestChannelNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
636671
rwc, srv := newSession(t)
637672

638673
go func() {
@@ -657,8 +692,10 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
657692
t.Fatalf("could not close connection: %v (%s)", c, err)
658693
}
659694

695+
defer func() { _ = recover() }()
696+
660697
select {
661-
case <-c.NotifyClose(make(chan *Error)):
698+
case <-c.NotifyClose(make(chan *Error, 1)):
662699
case <-time.After(time.Millisecond):
663700
t.Errorf("expected to close NotifyClose chan after Connection.Close")
664701
}
@@ -669,6 +706,46 @@ func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
669706
t.Errorf("expected to close Connection.NotifyClose chan after Connection.Close")
670707
}
671708

709+
t.Errorf("connection.NotifyClose shouldn't accept unbuffered channels")
710+
}
711+
712+
func TestNotifyClosesAllChansAfterConnectionClose(t *testing.T) {
713+
rwc, srv := newSession(t)
714+
715+
go func() {
716+
srv.connectionOpen()
717+
srv.channelOpen(1)
718+
719+
srv.recv(0, &connectionClose{})
720+
srv.send(0, &connectionCloseOk{})
721+
}()
722+
723+
c, err := Open(rwc, defaultConfig())
724+
if err != nil {
725+
t.Fatalf("could not create connection: %v (%s)", c, err)
726+
}
727+
728+
ch, err := c.Channel()
729+
if err != nil {
730+
t.Fatalf("could not open channel: %v (%s)", ch, err)
731+
}
732+
733+
if err := c.Close(); err != nil {
734+
t.Fatalf("could not close connection: %v (%s)", c, err)
735+
}
736+
737+
select {
738+
case <-c.NotifyClose(make(chan *Error, 1)):
739+
case <-time.After(time.Millisecond):
740+
t.Errorf("expected to close NotifyClose chan after Connection.Close")
741+
}
742+
743+
select {
744+
case <-ch.NotifyClose(make(chan *Error, 1)):
745+
case <-time.After(time.Millisecond):
746+
t.Errorf("expected to close Connection.NotifyClose chan after Connection.Close")
747+
}
748+
672749
select {
673750
case <-ch.NotifyFlow(make(chan bool)):
674751
case <-time.After(time.Millisecond):

connection.go

+6
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,14 @@ so that it will be necessary to consume the Channel from the caller in order to
368368
369369
To reconnect after a transport or protocol error, register a listener here and
370370
re-run your setup process.
371+
372+
The chan provided must be a buffered channel of size 1.
371373
*/
372374
func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
375+
if cap(receiver) != 1 {
376+
panic("channel.NotifyClose expectes cap=1 buffered channel")
377+
}
378+
373379
c.m.Lock()
374380
defer c.m.Unlock()
375381

doc.go

+2-31
Original file line numberDiff line numberDiff line change
@@ -110,40 +110,11 @@ In order to be notified when a connection or channel gets closed, both
110110
structures offer the possibility to register channels using
111111
[Channel.NotifyClose] and [Connection.NotifyClose] functions:
112112
113-
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error))
113+
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1))
114114
115115
No errors will be sent in case of a graceful connection close. In case of a
116116
non-graceful closure due to e.g. network issue, or forced connection closure
117-
from the Management UI, the error will be notified synchronously by the library.
118-
119-
The error is sent synchronously to the channel, so that the flow will wait until
120-
the receiver consumes from the channel. To avoid deadlocks in the library, it is
121-
necessary to consume from the channels. This could be done inside a
122-
different goroutine with a select listening on the two channels inside a for
123-
loop like:
124-
125-
go func() {
126-
for notifyConnClose != nil || notifyChanClose != nil {
127-
select {
128-
case err, ok := <-notifyConnClose:
129-
if !ok {
130-
notifyConnClose = nil
131-
} else {
132-
fmt.Printf("connection closed, error %s", err)
133-
}
134-
case err, ok := <-notifyChanClose:
135-
if !ok {
136-
notifyChanClose = nil
137-
} else {
138-
fmt.Printf("channel closed, error %s", err)
139-
}
140-
}
141-
}
142-
}()
143-
144-
Another approach is to use buffered channels:
145-
146-
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1))
117+
from the Management UI, the error will be notified by the library.
147118
148119
The library sends to notification channels just once. After sending a notification
149120
to all channels, the library closes all registered notification channels. After

integration_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1644,7 +1644,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {
16441644
t.Cleanup(func() { conn.Close() })
16451645

16461646
go func() {
1647-
for err := range conn.NotifyClose(make(chan *Error)) {
1647+
for err := range conn.NotifyClose(make(chan *Error, 1)) {
16481648
t.Log(err.Error())
16491649
}
16501650
}()
@@ -1655,7 +1655,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {
16551655
}
16561656

16571657
go func() {
1658-
for err := range c1.NotifyClose(make(chan *Error)) {
1658+
for err := range c1.NotifyClose(make(chan *Error, 1)) {
16591659
t.Log("Channel1 Close: " + err.Error())
16601660
}
16611661
}()
@@ -1666,7 +1666,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {
16661666
}
16671667

16681668
go func() {
1669-
for err := range c2.NotifyClose(make(chan *Error)) {
1669+
for err := range c2.NotifyClose(make(chan *Error, 1)) {
16701670
t.Log("Channel2 Close: " + err.Error())
16711671
}
16721672
}()

0 commit comments

Comments
 (0)