-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
131 lines (113 loc) · 2.47 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package transfer
import (
"context"
"fmt"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"sync"
"time"
)
var (
// LatencyInit the latency init is 3 seconds
LatencyInit = time.Second * 3
)
// client the rpc client
// +gen * slice:"MinBy,SortBy,DistinctBy,Aggregate[int],Shuffle"
type client struct {
client *rpc.Client
connTime time.Time
latency time.Duration
lock sync.RWMutex
connTimeout time.Duration
callTimeout time.Duration
addr string
}
// get get the rpc net conn
func (cn *client) get() (*rpc.Client, error) {
cn.lock.Lock()
defer cn.lock.Unlock()
var err error
//将对应latency 设置为最大避免,同时调用
cn.latency = time.Minute * 3
if cn.client == nil {
if cn.client, err = cn.newConn(); err != nil {
return nil, err
}
return cn.client, nil
}
if time.Since(cn.connTime).Hours() >= 2 {
cn.client.Close()
if cn.client, err = cn.newConn(); err != nil {
return nil, err
}
return cn.client, nil
}
return cn.client, err
}
func (cn *client) newConn() (*rpc.Client, error) {
conn, err := net.DialTimeout("tcp", cn.addr, cn.connTimeout)
if err != nil {
return nil, err
}
cn.connTime = time.Now()
return jsonrpc.NewClient(conn), nil
}
// close close the conn
func (cn *client) close() {
if cn.client != nil {
cn.client.Close()
cn.client = nil
}
}
// call rpc method
func (cn *client) call(method string, args interface{}, resp interface{}) error {
cl, err := cn.get()
if err != nil {
return err
}
ctx, cancelFn := context.WithTimeout(context.Background(), cn.callTimeout)
defer cancelFn()
done := make(chan error, 2)
start := time.Now()
go func() {
done <- cl.Call(method, args, resp)
}()
select {
case err := <-done:
if err != nil {
cn.close()
cn.latency = time.Minute
return fmt.Errorf("rpc call %s error:%s", cn.addr, err.Error())
}
case <-ctx.Done():
cn.close()
cn.latency = time.Minute * 2
return fmt.Errorf("rpc call %s timeout", cn.addr)
}
cn.latency = time.Now().Sub(start)
return nil
}
// NewClient create the rpc client
func NewClient(address string, connTW, callTW time.Duration) *client {
c := &client{
addr: address,
connTimeout: connTW,
callTimeout: callTW,
latency: 3 * time.Second,
}
return c
}
func quick(a, b *client) bool {
return a.latency < b.latency
}
func aggregate(i int, c *client) int {
c.latency = LatencyInit
return i + 1
}
func addrLess(a, b *client) bool {
return a.addr < b.addr
}
func distinctBy(a, b *client) bool {
return a.addr == b.addr
}