Skip to content

Commit cfaabfe

Browse files
authored
Merge pull request #10 from frain-dev/enhancement/msgpack-encoding
add reflect decoding
2 parents c0b051e + 02379d4 commit cfaabfe

File tree

6 files changed

+55
-14
lines changed

6 files changed

+55
-14
lines changed

brokers/redis/list.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ func (b *List) Status() bool {
253253
}
254254

255255
func ListUnmarshalMessage(msg *disq.Message, body string) error {
256-
if err := msgpack.Unmarshal([]byte(body), (*disq.MessageRaw)(msg)); err != nil {
256+
err := msg.UnmarshalBinary(disq.StringToBytes(body))
257+
if err != nil {
257258
return err
258259
}
259260
return nil

brokers/redis/stream.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/go-redis/redis/v8"
1414
"github.com/google/uuid"
1515
log "github.com/sirupsen/logrus"
16-
"github.com/vmihailenco/msgpack"
1716
)
1817

1918
// Broker based on redis STREAM and ZSET.
@@ -316,7 +315,8 @@ func unixMs(tm time.Time) int64 {
316315

317316
func StreamUnmarshalMessage(msg *disq.Message, xmsg *redis.XMessage) error {
318317
body := xmsg.Values["body"].(string)
319-
if err := msgpack.Unmarshal([]byte(body), (*disq.MessageRaw)(msg)); err != nil {
318+
err := msg.UnmarshalBinary(disq.StringToBytes(body))
319+
if err != nil {
320320
return err
321321
}
322322
msg.ID = xmsg.ID

example/api.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,26 @@ func NewWorker(c *redis.Client, brokers []disq.Broker) *Worker {
7272
}
7373
}
7474

75-
//create task
76-
var CountHandler, _ = disq.RegisterTask(&disq.TaskOptions{
77-
Name: "CountHandler",
78-
Handler: func(name string) error {
75+
type MsgValue struct {
76+
Name string
77+
Value string
78+
}
79+
80+
func counthandler() func(job *MsgValue) error {
81+
return func(msgval *MsgValue) error {
7982
//time.Sleep(time.Duration(10) * time.Second)
80-
// fmt.Println("Hello", name)
83+
fmt.Println("Hello", msgval.Value)
8184
// return errors.New("error")
8285
// return &EndpointError{Err: errors.New("Error"), delay: time.Second * 60}
8386
return nil
84-
},
87+
}
88+
89+
}
90+
91+
//create task
92+
var CountHandler, _ = disq.RegisterTask(&disq.TaskOptions{
93+
Name: "CountHandler",
94+
Handler: counthandler(),
8595
RetryLimit: 3,
8696
})
8797

example/publisher/publisher.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import (
1111
)
1212

1313
func main() {
14-
15-
count := 50000000
14+
count := 50
1615
go func() {
1716
for i := 0; i < count; i++ {
1817
value := fmt.Sprint("message_", uuid.NewString())
@@ -21,7 +20,10 @@ func main() {
2120
msg := &disq.Message{
2221
Ctx: ctx,
2322
TaskName: example.CountHandler.Name(),
24-
Args: []interface{}{value},
23+
Args: []interface{}{&example.MsgValue{
24+
Name: "test",
25+
Value: value,
26+
}},
2527
// Delay: delay,
2628
}
2729
err := example.RWorker.Worker.Brokers()[0].Publish(msg)
@@ -41,7 +43,10 @@ func main() {
4143
msg := &disq.Message{
4244
Ctx: ctx,
4345
TaskName: example.CountHandler.Name(),
44-
Args: []interface{}{value},
46+
Args: []interface{}{&example.MsgValue{
47+
Name: "test",
48+
Value: value,
49+
}},
4550
// Delay: delay,
4651
}
4752
err := example.RWorker.Worker.Brokers()[1].Publish(msg)
@@ -64,7 +69,10 @@ func main() {
6469
msg := &disq.Message{
6570
Ctx: ctx,
6671
TaskName: example.CountHandler.Name(),
67-
Args: []interface{}{value},
72+
Args: []interface{}{&example.MsgValue{
73+
Name: "test",
74+
Value: value,
75+
}},
6876
// Delay: delay,
6977
}
7078
err := example.RWorker.Worker.Brokers()[2].Publish(msg)

handler.go

+6
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ func (h *reflectFunc) fnArgs(msg *Message) ([]reflect.Value, error) {
137137

138138
for i := 0; i < len(in); i++ {
139139
arg := reflect.New(h.ft.In(inStart + i)).Elem()
140+
err = dec.DecodeValue(arg)
141+
if err != nil {
142+
err = fmt.Errorf(
143+
"disq: decoding arg=%d failed (data=%.100x): %s", i, b, err)
144+
return nil, err
145+
}
140146
in[i] = arg
141147
}
142148

util.go

+16
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"strconv"
77
"time"
8+
"unsafe"
89
)
910

1011
func UnixMs(tm time.Time) int64 {
@@ -21,3 +22,18 @@ func ConsumerName() string {
2122
func DurEqual(d1, d2 time.Duration, threshold int) bool {
2223
return (d2 >= d1 && (d2-d1) < time.Duration(threshold)*time.Second)
2324
}
25+
26+
// BytesToString converts byte slice to string.
27+
func BytesToString(b []byte) string {
28+
return *(*string)(unsafe.Pointer(&b))
29+
}
30+
31+
// StringToBytes converts string to byte slice.
32+
func StringToBytes(s string) []byte {
33+
return *(*[]byte)(unsafe.Pointer(
34+
&struct {
35+
string
36+
Cap int
37+
}{s, len(s)},
38+
))
39+
}

0 commit comments

Comments
 (0)