From 4b29e598809bd454d7443d33f0635755aef0b560 Mon Sep 17 00:00:00 2001 From: shaazmik Date: Sun, 1 Dec 2024 13:12:01 +0300 Subject: [PATCH 1/2] fix leaks in SendWithCallbacks function --- driver/generic/sendwithcallbacks.go | 51 +++++++++++++++-------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/driver/generic/sendwithcallbacks.go b/driver/generic/sendwithcallbacks.go index 9befa59..4fe7b1a 100644 --- a/driver/generic/sendwithcallbacks.go +++ b/driver/generic/sendwithcallbacks.go @@ -2,6 +2,7 @@ package generic import ( "bytes" + "context" "fmt" "regexp" "time" @@ -164,50 +165,52 @@ func (d *Driver) handleCallbacks( b, fb []byte, timeout time.Duration, ) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + c := make(chan *callbackResult) go func() { defer close(c) - for { - rb, err := d.Channel.Read() - if err != nil { - c <- &callbackResult{ - err: err, - } - + select { + case <-ctx.Done(): return - } - - b = append(b, rb...) - fb = append(fb, rb...) - - for i, cb := range callbacks { - if cb.check(b) { + default: + rb, err := d.Channel.Read() + if err != nil { c <- &callbackResult{ - i: i, - callbacks: callbacks, - b: b, - fb: fb, - err: nil, + err: err, } - return } + + b = append(b, rb...) + fb = append(fb, rb...) + + for i, cb := range callbacks { + if cb.check(b) { + c <- &callbackResult{ + i: i, + callbacks: callbacks, + b: b, + fb: fb, + err: nil, + } + return + } + } } } }() - timer := time.NewTimer(timeout) - select { case r := <-c: if r.err != nil { return nil, r.err } - return d.executeCallback(r.i, r.callbacks, r.b, r.fb, timeout) - case <-timer.C: + case <-ctx.Done(): return nil, fmt.Errorf("%w: timeout handling callbacks", util.ErrTimeoutError) } } From 393c675fb5c2d997a7678c4e954daf90ad7975ec Mon Sep 17 00:00:00 2001 From: shaazmik Date: Mon, 2 Dec 2024 15:09:40 +0300 Subject: [PATCH 2/2] fix linter issues --- driver/generic/sendwithcallbacks.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/driver/generic/sendwithcallbacks.go b/driver/generic/sendwithcallbacks.go index 4fe7b1a..1911e4a 100644 --- a/driver/generic/sendwithcallbacks.go +++ b/driver/generic/sendwithcallbacks.go @@ -172,6 +172,7 @@ func (d *Driver) handleCallbacks( go func() { defer close(c) + for { select { case <-ctx.Done(): @@ -182,6 +183,7 @@ func (d *Driver) handleCallbacks( c <- &callbackResult{ err: err, } + return } @@ -197,6 +199,7 @@ func (d *Driver) handleCallbacks( fb: fb, err: nil, } + return } } @@ -209,6 +212,7 @@ func (d *Driver) handleCallbacks( if r.err != nil { return nil, r.err } + return d.executeCallback(r.i, r.callbacks, r.b, r.fb, timeout) case <-ctx.Done(): return nil, fmt.Errorf("%w: timeout handling callbacks", util.ErrTimeoutError)