Skip to content

Commit ca93cfe

Browse files
authored
Merge pull request #13 from frain-dev/ogban/feat/pause-retries-for-ratelimit-error
Add error handler
2 parents 48d0dd2 + 88c1281 commit ca93cfe

File tree

5 files changed

+51
-14
lines changed

5 files changed

+51
-14
lines changed

brokers/localstorage/localstorage.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ func (b *LocalStorage) Process(msg *disq.Message) error {
8585

8686
if msgErr != nil {
8787
//retry
88-
atomic.AddUint32(&b.retries, 1)
88+
disq.Logger.Println(disq.FormatHandlerError(msg, task.RetryLimit()))
89+
_ = disq.ErrorHandler(msg, msgErr, &b.retries)
8990
msg.Err = msgErr
9091
err := b.Requeue(msg)
9192
if err != nil {
@@ -100,7 +101,6 @@ func (b *LocalStorage) Process(msg *disq.Message) error {
100101

101102
func (b *LocalStorage) Requeue(msg *disq.Message) error {
102103
//Requeue
103-
msg.RetryCount++
104104
err := b.Publish(msg)
105105
if err != nil {
106106
return err

brokers/redis/list.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ func (b *List) Process(msg *disq.Message) error {
103103
msgErr := task.HandleMessage(msg)
104104

105105
if msgErr != nil {
106-
atomic.AddUint32(&b.retries, 1)
106+
disq.Logger.Println(disq.FormatHandlerError(msg, task.RetryLimit()))
107+
_ = disq.ErrorHandler(msg, msgErr, &b.retries)
107108
msg.Err = msgErr
108109
err := b.Requeue(msg)
109110
if err != nil {
@@ -126,7 +127,6 @@ func (b *List) Requeue(msg *disq.Message) error {
126127
if err != nil {
127128
return err
128129
}
129-
msg.RetryCount++
130130
err = b.Publish(msg)
131131
if err != nil {
132132
return err

brokers/redis/stream.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ func (b *Stream) Process(msg *disq.Message) error {
125125

126126
if msgErr != nil {
127127
//retry
128-
msg.Delay = disq.Delay(msg, msgErr)
129-
atomic.AddUint32(&b.retries, 1)
128+
disq.Logger.Println(disq.FormatHandlerError(msg, task.RetryLimit()))
129+
_ = disq.ErrorHandler(msg, msgErr, &b.retries)
130130
msg.Err = msgErr
131131
err := b.Requeue(msg)
132132
if err != nil {
@@ -253,7 +253,7 @@ func (b *Stream) Requeue(msg *disq.Message) error {
253253
return err
254254
}
255255
//Requeue
256-
msg.RetryCount++ //to know how many times it has been retried.
256+
// msg.RetryCount++ //to know how many times it has been retried.
257257
err = b.Publish(msg)
258258
if err != nil {
259259
return err

errors.go

+38-7
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,45 @@
11
package disq
22

3-
import "time"
3+
import (
4+
"sync/atomic"
5+
"time"
6+
)
47

5-
type Delayer interface {
6-
Delay() time.Duration
8+
type DisqError interface {
9+
GetDelay() time.Duration
10+
GetRateLimit() bool
711
}
812

9-
func Delay(msg *Message, msgErr error) time.Duration {
10-
if delayer, ok := msgErr.(Delayer); ok {
11-
return delayer.Delay()
13+
type Error struct {
14+
RateLimit bool
15+
Delay time.Duration
16+
Err error
17+
}
18+
19+
func (e *Error) Error() string {
20+
return e.Err.Error()
21+
}
22+
23+
func (e *Error) GetDelay() time.Duration {
24+
return e.Delay
25+
}
26+
27+
func (e *Error) GetRateLimit() bool {
28+
return e.RateLimit
29+
}
30+
31+
func ErrorHandler(msg *Message, msgErr error, retries *uint32) error {
32+
if disqError, ok := msgErr.(DisqError); ok {
33+
if disqError.GetRateLimit() {
34+
msg.Delay = disqError.GetDelay()
35+
return nil
36+
}
37+
msg.RetryCount++
38+
atomic.AddUint32(retries, 1)
39+
msg.Delay = disqError.GetDelay()
40+
return nil
1241
}
13-
return msg.Delay
42+
msg.RetryCount++
43+
atomic.AddUint32(retries, 1)
44+
return nil
1445
}

util.go

+6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package disq
22

33
import (
4+
"fmt"
45
"math/rand"
56
"os"
67
"strconv"
@@ -37,3 +38,8 @@ func StringToBytes(s string) []byte {
3738
}{s, len(s)},
3839
))
3940
}
41+
42+
func FormatHandlerError(msg *Message, retrylimit int) error {
43+
return fmt.Errorf("task=%q failed (retrycount=%d/%d will retry after delay=%s): reason:%s",
44+
msg.TaskName, msg.RetryCount, retrylimit, msg.Delay, msg.Err)
45+
}

0 commit comments

Comments
 (0)