-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcounter_service.go
135 lines (119 loc) · 2.51 KB
/
counter_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package main
import (
"context"
"database/sql"
"errors"
"math/big"
"sync"
"github.com/gomodule/redigo/redis"
)
const MaxBlockRangeSpan = 10000
var _counterService *CounterService = nil
type BlockRange struct {
from *big.Int
to *big.Int
}
/********************/
type Counter struct {
headNum *big.Int
cRange chan *BlockRange
redisCon redis.Conn
}
// generate BlockRange continuously, at the meanwhile allow the `newHeadNum` to be updated
func (c *Counter) start(ctx context.Context, cHeadNum chan *big.Int) {
mu := sync.Mutex{}
updateEvent := make(chan struct{})
var newHeadNum *big.Int
go func() {
for {
select {
case headNum := <-cHeadNum:
mu.Lock()
if newHeadNum == nil {
newHeadNum = headNum
} else if headNum.Cmp(newHeadNum) > 0 {
newHeadNum = headNum
}
mu.Unlock()
select {
case updateEvent <- struct{}{}:
default:
}
case <-ctx.Done():
break
}
}
}()
go func() {
loop:
for {
mu.Lock()
if newHeadNum == nil || newHeadNum.Cmp(c.headNum) <= 0 {
mu.Unlock()
select {
case <-updateEvent:
continue
case <-ctx.Done():
break loop
}
}
num := new(big.Int).Add(c.headNum, big.NewInt(MaxBlockRangeSpan))
if num.Cmp(newHeadNum) > 0 {
num = newHeadNum
}
mu.Unlock()
br := &BlockRange{from: c.headNum, to: num}
select {
case c.cRange <- br:
case <-ctx.Done():
break loop
}
c.headNum = num
}
}()
}
/********************/
type CounterService struct {
counter *Counter
mu *sync.Mutex
}
func NewCounterService() *CounterService {
if _counterService != nil {
return _counterService
}
_counterService = &CounterService{counter: nil, mu: &sync.Mutex{}}
return _counterService
}
// Lazily create a Counter in singleton pattern
func (c *CounterService) Counter(ctx context.Context, pgCon *sql.DB, cHeadNum chan *big.Int) (*Counter, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.counter != nil {
return c.counter, nil
}
rows, err := pgCon.Query("select head_num from settings where id=1")
if err != nil {
return nil, err
}
var headNum *big.Int
if rows.Next() {
var headNumStr string
err = rows.Scan(&headNumStr)
if err != nil {
return nil, err
}
var ok bool
headNum, ok = new(big.Int).SetString(headNumStr, 10)
if ok == false {
return nil, errors.New("parsed db field head_num failed")
}
} else {
headNum = big.NewInt(0)
}
counter := Counter{
headNum: headNum,
cRange: make(chan *BlockRange),
}
counter.start(ctx, cHeadNum)
return &counter, nil
}