-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmonitor.go
78 lines (63 loc) · 1.49 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package gopool
import (
"time"
"fmt"
)
// SleepTimeFunc should return a time.Duration that we should sleep between checking the worker count
// of a pool
type SleepTimeFunc func() time.Duration
func monitorPool(pool *Pool) {
ctx := pool.Context()
for {
select {
case <-ctx.Done():
return
default:
assertPoolSize(pool)
sleepTime := pool.sleepTime()
time.Sleep(sleepTime)
}
}
}
func assertPoolSize(pool *Pool) {
pool.mu.Lock()
defer pool.mu.Unlock()
diff := int(pool.desiredWorkerCount()) - len(pool.workers)
if diff < 0 {
removeWorkersFromPool(pool, uint64(-diff))
} else if diff > 0 {
addWorkersToPool(pool, uint64(diff))
}
}
func addWorkersToPool(pool *Pool, add uint64) {
for x := uint64(0); x < add; x++ {
worker := poolWorker{
worker: NewWorker(
fmt.Sprintf("%s-%d", pool.id, time.Now().UnixNano()),
pool.work,
pool.ctx,
),
}
worker.cancel = worker.worker.Start()
pool.workers = append(pool.workers, worker)
go removeWorkerFromPoolOnceDone(pool, worker)
}
}
func removeWorkersFromPool(pool *Pool, remove uint64) {
for x := uint64(0); x < remove; x++ {
pool.workers[x].cancel()
}
}
func removeWorkerFromPoolOnceDone(pool *Pool, worker poolWorker) {
<-worker.worker.Done()
pool.mu.Lock()
defer pool.mu.Unlock()
newWorkers := make([]poolWorker, 0)
subjectWorkerId := worker.worker.ID()
for _, w := range pool.workers {
if subjectWorkerId != w.worker.ID() {
newWorkers = append(newWorkers, w)
}
}
pool.workers = newWorkers
}