This repository has been archived by the owner on Mar 18, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdispatcher.go
102 lines (93 loc) · 2.08 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package workerpool
import (
"errors"
"fmt"
"reflect"
"sync"
)
// JobQueue 任务队列
type JobQueue chan interface{}
// Dispatcher 调度器
type Dispatcher struct {
WorkerFunc reflect.Value
WorkerFuncArgs []reflect.Value
MaxWorkers int
JobQueue JobQueue
workers []Worker
workerPool chan JobQueue
wg *sync.WaitGroup
quit chan bool
}
// Run 执行
func (t *Dispatcher) Run() {
for i := 0; i < t.MaxWorkers; i++ {
w := t.WorkerFunc.Call(t.WorkerFuncArgs)[0].Interface().(Worker)
w.Init(i, t.workerPool, t.wg, w.Do)
w.Run()
t.workers = append(t.workers, w)
}
t.dispatch()
t.wait()
}
func (t *Dispatcher) dispatch() {
go func() {
for {
select {
case data := <-t.JobQueue:
if data == nil {
for _, w := range t.workers {
w.Stop()
}
return
}
ch := <-t.workerPool
ch <- data
case <-t.quit:
close(t.JobQueue)
}
}
}()
}
// Stop 停止
func (t *Dispatcher) Stop() {
go func() {
t.quit <- true
}()
}
func (t *Dispatcher) wait() {
t.wg.Wait()
}
// NewDispatcher 创建调度器
func NewDispatcher(jobQueue chan interface{}, maxWorkers int, workerFunc interface{}, args ...interface{}) *Dispatcher {
if reflect.TypeOf(workerFunc).Kind() != reflect.Func {
panic(errors.New("WorkerFunc is not a Func type"))
}
value := reflect.ValueOf(workerFunc)
valueArgs := []reflect.Value{}
for _, arg := range args {
valueArgs = append(valueArgs, reflect.ValueOf(arg))
}
func() {
defer func() {
if err := recover(); err != nil {
panic(fmt.Errorf("WorkerFunc %s", err))
}
}()
values := value.Call(valueArgs)
if len(values) == 0 {
panic(errors.New("WorkerFunc did not return"))
}
if _, ok := values[0].Interface().(Worker); !ok {
panic(errors.New("WorkerFunc return type can only be workerPool.Worker"))
}
}()
return &Dispatcher{
WorkerFunc: value,
WorkerFuncArgs: valueArgs,
MaxWorkers: maxWorkers,
JobQueue: jobQueue,
workerPool: make(chan JobQueue, maxWorkers),
wg: &sync.WaitGroup{},
quit: make(chan bool),
}
}