Skip to content

Commit 83e341c

Browse files
committed
add reflect decoding
1 parent 33a10ed commit 83e341c

File tree

3 files changed

+35
-11
lines changed

3 files changed

+35
-11
lines changed

example/api.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,26 @@ func NewWorker(c *redis.Client, brokers []disq.Broker) *Worker {
7272
}
7373
}
7474

75-
//create task
76-
var CountHandler, _ = disq.RegisterTask(&disq.TaskOptions{
77-
Name: "CountHandler",
78-
Handler: func(name string) error {
75+
type MsgValue struct {
76+
Name string
77+
Value string
78+
}
79+
80+
func counthandler() func(job *MsgValue) error {
81+
return func(msgval *MsgValue) error {
7982
//time.Sleep(time.Duration(10) * time.Second)
80-
// fmt.Println("Hello", name)
83+
fmt.Println("Hello", msgval.Value)
8184
// return errors.New("error")
8285
// return &EndpointError{Err: errors.New("Error"), delay: time.Second * 60}
8386
return nil
84-
},
87+
}
88+
89+
}
90+
91+
//create task
92+
var CountHandler, _ = disq.RegisterTask(&disq.TaskOptions{
93+
Name: "CountHandler",
94+
Handler: counthandler(),
8595
RetryLimit: 3,
8696
})
8797

example/publisher/publisher.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import (
1111
)
1212

1313
func main() {
14-
15-
count := 50000000
14+
count := 50
1615
go func() {
1716
for i := 0; i < count; i++ {
1817
value := fmt.Sprint("message_", uuid.NewString())
@@ -21,7 +20,10 @@ func main() {
2120
msg := &disq.Message{
2221
Ctx: ctx,
2322
TaskName: example.CountHandler.Name(),
24-
Args: []interface{}{value},
23+
Args: []interface{}{&example.MsgValue{
24+
Name: "test",
25+
Value: value,
26+
}},
2527
// Delay: delay,
2628
}
2729
err := example.RWorker.Worker.Brokers()[0].Publish(msg)
@@ -41,7 +43,10 @@ func main() {
4143
msg := &disq.Message{
4244
Ctx: ctx,
4345
TaskName: example.CountHandler.Name(),
44-
Args: []interface{}{value},
46+
Args: []interface{}{&example.MsgValue{
47+
Name: "test",
48+
Value: value,
49+
}},
4550
// Delay: delay,
4651
}
4752
err := example.RWorker.Worker.Brokers()[1].Publish(msg)
@@ -64,7 +69,10 @@ func main() {
6469
msg := &disq.Message{
6570
Ctx: ctx,
6671
TaskName: example.CountHandler.Name(),
67-
Args: []interface{}{value},
72+
Args: []interface{}{&example.MsgValue{
73+
Name: "test",
74+
Value: value,
75+
}},
6876
// Delay: delay,
6977
}
7078
err := example.RWorker.Worker.Brokers()[2].Publish(msg)

handler.go

+6
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ func (h *reflectFunc) fnArgs(msg *Message) ([]reflect.Value, error) {
137137

138138
for i := 0; i < len(in); i++ {
139139
arg := reflect.New(h.ft.In(inStart + i)).Elem()
140+
err = dec.DecodeValue(arg)
141+
if err != nil {
142+
err = fmt.Errorf(
143+
"disq: decoding arg=%d failed (data=%.100x): %s", i, b, err)
144+
return nil, err
145+
}
140146
in[i] = arg
141147
}
142148

0 commit comments

Comments
 (0)