Skip to content

Commit a8194ff

Browse files
authored
Merge pull request #15 from frain-dev/ogban/feat/worker-queue-map
feat: worker-broker map
2 parents 922dce8 + 5fe6fb5 commit a8194ff

File tree

16 files changed

+947
-82
lines changed

16 files changed

+947
-82
lines changed

README.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,16 @@ var brokers = []disq.Broker{broker1, broker2, broker3}
6666
var w = disq.NewWorker(brokers)
6767

6868
//start processing messages
69-
var err = w.Start(ctx)
69+
var err = w.StartAll(ctx)
7070
if err != nil {
7171
log.Fatal(err)
7272
}
7373

7474
//Get stats from all brokers
75-
for i, b := range w.Brokers() {
76-
var len, _ = b.Len()
77-
log.Printf("Broker_%d Queue Size: %+v", i, len)
78-
log.Printf("Broker_%d Stats: %+v\n\n", i, b.Stats())
75+
for name, broker := range w.GetAllBrokers() {
76+
var len, _ = broker.Len()
77+
log.Printf("Broker_%d Queue Size: %+v", name, len)
78+
log.Printf("Broker_%d Stats: %+v\n\n", i, broker.Stats())
7979
}
8080
```
8181

brokers/localstorage/config.go

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package localstorage
22

33
import (
44
"errors"
5+
"log"
56
)
67

78
type LocalStorageConfig struct {
@@ -16,6 +17,10 @@ func (ls *LocalStorageConfig) Init() error {
1617
return errors.New("localstorage config already initiated")
1718
}
1819

20+
if ls.Name == "" {
21+
log.Fatalf("LocalStorageConfig.Name is required")
22+
}
23+
1924
if ls.Concurency == 0 {
2025
ls.Concurency = 100
2126
}

brokers/localstorage/localstorage.go

+8
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ func (b *LocalStorage) Stats() *disq.Stats {
163163
}
164164
}
165165

166+
func (b *LocalStorage) Name() string {
167+
return b.opts.Name
168+
}
169+
170+
func (b *LocalStorage) Config() disq.Config {
171+
return b.opts
172+
}
173+
166174
func (b *LocalStorage) Status() bool {
167175
return b.isConsuming
168176
}

brokers/localstorage/ls_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func TestDelay(t *testing.T) {
322322
tm := <-handlerCh
323323
sub := tm.Sub(start)
324324
_ = b.Stop()
325-
return disq.DurEqual(msg.Delay, sub, 3), nil
325+
return disq.DurEqual(msg.Delay, sub, 6), nil
326326
},
327327
expect: true,
328328
},

brokers/redis/list.go

+8
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,14 @@ func (b *List) Stats() *disq.Stats {
247247
}
248248
}
249249

250+
func (b *List) Name() string {
251+
return b.opts.Name
252+
}
253+
254+
func (b *List) Config() disq.Config {
255+
return b.opts
256+
}
257+
250258
func (b *List) Status() bool {
251259
return b.isConsuming
252260
}

brokers/redis/stream.go

+138
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"strconv"
78
"strings"
89
"sync"
@@ -15,6 +16,8 @@ import (
1516
log "github.com/sirupsen/logrus"
1617
)
1718

19+
const count = math.MaxInt64
20+
1821
// Broker based on redis STREAM and ZSET.
1922
// Implements a delayed queue with support for retries.
2023
type Stream struct {
@@ -275,6 +278,12 @@ func (b *Stream) Stop() error {
275278
}()
276279
err := b.Redis.XGroupDelConsumer(
277280
context.TODO(), b.stream, b.streamGroup, b.consumerName).Err()
281+
if err != nil {
282+
if strings.HasPrefix(err.Error(), "NOGROUP") {
283+
b.isConsuming = false
284+
return nil
285+
}
286+
}
278287
b.isConsuming = false
279288
return err
280289
}
@@ -305,10 +314,18 @@ func (b *Stream) Stats() *disq.Stats {
305314
}
306315
}
307316

317+
func (b *Stream) Name() string {
318+
return b.opts.Name
319+
}
320+
308321
func (b *Stream) Status() bool {
309322
return b.isConsuming
310323
}
311324

325+
func (b *Stream) Config() disq.Config {
326+
return b.opts
327+
}
328+
312329
func unixMs(tm time.Time) int64 {
313330
return tm.UnixNano() / int64(time.Millisecond)
314331
}
@@ -404,3 +421,124 @@ func (b *Stream) schedulePending(ctx context.Context) (int, error) {
404421

405422
return len(pending), nil
406423
}
424+
425+
func (q *Stream) ZRangebyScore(ctx context.Context, min string, max string) ([]string, error) {
426+
bodies, err := q.opts.Redis.ZRangeByScore(ctx, q.zset, &redis.ZRangeBy{
427+
Min: min,
428+
Max: max,
429+
}).Result()
430+
if err != nil {
431+
return nil, err
432+
}
433+
return bodies, nil
434+
}
435+
436+
func (q *Stream) XPendingExt(ctx context.Context, start string, end string) ([]redis.XPendingExt, error) {
437+
pending, err := q.opts.Redis.XPendingExt(ctx, &redis.XPendingExtArgs{
438+
Stream: q.stream,
439+
Group: q.streamGroup,
440+
Start: start,
441+
End: end,
442+
Count: count,
443+
}).Result()
444+
if err != nil {
445+
if strings.HasPrefix(err.Error(), "NOGROUP") {
446+
_ = q.Redis.XGroupCreateMkStream(ctx, q.stream, q.streamGroup, "0").Err()
447+
}
448+
return nil, err
449+
}
450+
return pending, nil
451+
}
452+
453+
func (q *Stream) XRange(ctx context.Context, start string, end string) *redis.XMessageSliceCmd {
454+
xrange := q.Redis.XRange(ctx, q.stream, start, end)
455+
return xrange
456+
}
457+
458+
func (q *Stream) XRangeN(ctx context.Context, start string, end string, count int64) *redis.XMessageSliceCmd {
459+
xrange := q.Redis.XRangeN(ctx, q.stream, start, end, count)
460+
return xrange
461+
}
462+
463+
func (q *Stream) XPending(ctx context.Context) (*redis.XPending, error) {
464+
pending, err := q.Redis.XPending(ctx, q.stream, q.streamGroup).Result()
465+
if err != nil {
466+
if strings.HasPrefix(err.Error(), "NOGROUP") {
467+
_ = q.opts.Redis.XGroupCreateMkStream(ctx, q.stream, q.streamGroup, "0").Err()
468+
}
469+
}
470+
return pending, err
471+
}
472+
473+
func (q *Stream) ZRem(ctx context.Context, body string) *redis.IntCmd {
474+
result := q.Redis.ZRem(ctx, q.zset, body)
475+
return result
476+
}
477+
478+
func (q *Stream) XDel(ctx context.Context, id string) *redis.IntCmd {
479+
result := q.Redis.XDel(ctx, q.stream, id)
480+
return result
481+
}
482+
483+
func (q *Stream) XAck(ctx context.Context, id string) *redis.IntCmd {
484+
result := q.Redis.XAck(ctx, q.stream, id)
485+
return result
486+
}
487+
488+
func (q *Stream) XInfoConsumers(ctx context.Context) *redis.XInfoConsumersCmd {
489+
consumersInfo := q.Redis.XInfoConsumers(ctx, q.stream, q.streamGroup)
490+
return consumersInfo
491+
}
492+
493+
func (q *Stream) XInfoStream(ctx context.Context) *redis.XInfoStreamCmd {
494+
infoStream := q.Redis.XInfoStream(ctx, q.stream)
495+
return infoStream
496+
}
497+
498+
func (q *Stream) ExportMessagesfromStream(ctx context.Context) ([]disq.Message, error) {
499+
xmsgs, err := q.XRange(ctx, "-", "+").Result()
500+
if err != nil {
501+
return nil, err
502+
}
503+
504+
msgs := make([]disq.Message, len(xmsgs))
505+
for i := range xmsgs {
506+
xmsg := &xmsgs[i]
507+
msg := &msgs[i]
508+
509+
err = StreamUnmarshalMessage(msg, xmsg)
510+
511+
if err != nil {
512+
return nil, err
513+
}
514+
}
515+
return msgs, nil
516+
}
517+
518+
func (q *Stream) ExportMessagesfromStreamXACK(ctx context.Context) ([]disq.Message, error) {
519+
xmsgs, err := q.XRange(ctx, "-", "+").Result()
520+
if err != nil {
521+
return nil, err
522+
}
523+
524+
msgs := make([]disq.Message, len(xmsgs))
525+
for i := range xmsgs {
526+
xmsg := &xmsgs[i]
527+
msg := &msgs[i]
528+
529+
err = StreamUnmarshalMessage(msg, xmsg)
530+
531+
if err != nil {
532+
return nil, err
533+
}
534+
if err := q.opts.Redis.XAck(ctx, q.stream, q.streamGroup, xmsg.ID).Err(); err != nil {
535+
return nil, err
536+
}
537+
538+
err = q.opts.Redis.XDel(ctx, q.stream, xmsg.ID).Err()
539+
if err != nil {
540+
return nil, err
541+
}
542+
}
543+
return msgs, nil
544+
}

example/api.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/frain-dev/disq"
1212
redisBroker "github.com/frain-dev/disq/brokers/redis"
13+
worker "github.com/frain-dev/disq/worker"
1314
"github.com/go-redis/redis/v8"
1415
)
1516

@@ -20,7 +21,7 @@ type RedisBroker struct {
2021
}
2122

2223
type Worker struct {
23-
Worker *disq.Worker
24+
Worker *worker.Worker
2425
inner *redis.Client
2526
}
2627

@@ -64,7 +65,7 @@ func NewBroker(c *redis.Client, concurency int, name string) *RedisBroker {
6465

6566
//create new worker
6667
func NewWorker(c *redis.Client, brokers []disq.Broker) *Worker {
67-
w := disq.NewWorker(brokers)
68+
w := worker.NewWorker(brokers)
6869

6970
return &Worker{
7071
inner: c,

example/consumer/consumer.go

+5-9
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,23 @@ func main() {
1818
// b := example.RQueue.Queue
1919
// w.Brokers()[0].(*redisBroker.Broker).Purge()
2020

21-
err := w.Start(ctx)
21+
w.StartAll(ctx)
2222
// b.Consume(ctx)
2323

24-
if err != nil {
25-
log.Fatal(err)
26-
}
27-
2824
ticker := time.NewTicker(200 * time.Millisecond)
2925

3026
for {
3127
select {
3228
case <-ticker.C:
3329
// len, _ := b.Len()
34-
for i, b := range w.Brokers() {
30+
for n, b := range w.LoadAll() {
3531
len, _ := b.Len()
36-
log.Printf("Broker_%d Queue Size: %+v", i, len)
37-
log.Printf("Broker_%d Stats: %+v\n\n", i, b.Stats())
32+
log.Printf("Broker_%s Queue Size: %+v", n, len)
33+
log.Printf("Broker_%s Stats: %+v\n\n", n, b.Stats())
3834
}
3935
case <-ctx.Done():
4036
log.Println("Worker quiting")
41-
_ = w.Stop()
37+
_ = w.StopAll()
4238
return
4339
}
4440
}

example/publisher/publisher.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func main() {
2626
}},
2727
// Delay: delay,
2828
}
29-
err := example.RWorker.Worker.Brokers()[0].Publish(msg)
29+
err := example.RWorker.Worker.LoadAll()["disq10"].Publish(msg)
3030
if err != nil {
3131
log.Fatal(err)
3232
}
@@ -35,7 +35,7 @@ func main() {
3535
}()
3636

3737
go func() {
38-
if len(example.RWorker.Worker.Brokers()) > 1 {
38+
if len(example.RWorker.Worker.LoadAll()) > 1 {
3939
for i := 0; i < count; i++ {
4040
value := fmt.Sprint("message_", uuid.NewString())
4141
ctx := context.Background()
@@ -49,7 +49,7 @@ func main() {
4949
}},
5050
// Delay: delay,
5151
}
52-
err := example.RWorker.Worker.Brokers()[1].Publish(msg)
52+
err := example.RWorker.Worker.LoadAll()["disq9"].Publish(msg)
5353
if err != nil {
5454
log.Fatal(err)
5555
}
@@ -60,7 +60,7 @@ func main() {
6060
}()
6161

6262
go func() {
63-
if len(example.RWorker.Worker.Brokers()) > 2 {
63+
if len(example.RWorker.Worker.LoadAll()) > 2 {
6464

6565
for i := 0; i < count; i++ {
6666
value := fmt.Sprint("message_", uuid.NewString())
@@ -75,7 +75,7 @@ func main() {
7575
}},
7676
// Delay: delay,
7777
}
78-
err := example.RWorker.Worker.Brokers()[2].Publish(msg)
78+
err := example.RWorker.Worker.LoadAll()["disq8"].Publish(msg)
7979
if err != nil {
8080
log.Fatal(err)
8181
}

go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ require (
77
github.com/go-redis/redis/v8 v8.11.4
88
github.com/google/uuid v1.3.0
99
github.com/sirupsen/logrus v1.8.1
10+
github.com/stretchr/testify v1.7.0
1011
github.com/vmihailenco/msgpack/v5 v5.3.5
1112
)
1213

1314
require (
1415
github.com/cespare/xxhash/v2 v2.1.2 // indirect
16+
github.com/davecgh/go-spew v1.1.1 // indirect
1517
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
1618
github.com/onsi/ginkgo v1.16.5 // indirect
1719
github.com/onsi/gomega v1.18.1 // indirect
18-
github.com/stretchr/testify v1.7.0 // indirect
20+
github.com/pmezard/go-difflib v1.0.0 // indirect
1921
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
2022
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
2123
golang.org/x/text v0.3.7 // indirect

go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
115115
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
116116
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
117117
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
118+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
118119
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
119120
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
120121
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=

0 commit comments

Comments
 (0)