-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathconfigs.go
265 lines (242 loc) · 7.87 KB
/
configs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/*
*
* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*
*/
package nebula_go
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"
"time"
)
// PoolConfig is the configs of connection pool
type PoolConfig struct {
// Socket timeout and Socket connection timeout, unit: seconds
TimeOut time.Duration
// The idleTime of the connection, unit: seconds
// If connection's idle time is longer than idleTime, it will be delete
// 0 value means the connection will not expire
IdleTime time.Duration
// The max connections in pool for all addresses
MaxConnPoolSize int
// The min connections in pool for all addresses
MinConnPoolSize int
// UseHTTP2 indicates whether to use HTTP2
UseHTTP2 bool
// HttpHeader is the http headers for the connection when using HTTP2
HttpHeader http.Header
// client handshakeKey, make sure the client handshakeKey is in the white list of NebulaGraph server 'client_white_list'
HandshakeKey string
}
// validateConf validates config
func (conf *PoolConfig) validateConf(log Logger) {
if conf.TimeOut < 0 {
conf.TimeOut = 0 * time.Millisecond
log.Warn("Illegal Timeout value, the default value of 0 second has been applied")
}
if conf.IdleTime < 0 {
conf.IdleTime = 0 * time.Millisecond
log.Warn("Invalid IdleTime value, the default value of 0 second has been applied")
}
if conf.MaxConnPoolSize < 1 {
conf.MaxConnPoolSize = 10
log.Warn("Invalid MaxConnPoolSize value, the default value of 10 has been applied")
}
if conf.MinConnPoolSize < 0 {
conf.MinConnPoolSize = 0
log.Warn("Invalid MinConnPoolSize value, the default value of 0 has been applied")
}
}
// GetDefaultConf returns the default config
func GetDefaultConf() PoolConfig {
return PoolConfig{
TimeOut: 0 * time.Millisecond,
IdleTime: 0 * time.Millisecond,
MaxConnPoolSize: 10,
MinConnPoolSize: 0,
UseHTTP2: false,
HandshakeKey: "",
}
}
// GetDefaultSSLConfig reads the files in the given path and returns a tls.Config object
func GetDefaultSSLConfig(rootCAPath, certPath, privateKeyPath string) (*tls.Config, error) {
rootCA, err := openAndReadFile(rootCAPath)
if err != nil {
return nil, err
}
cert, err := openAndReadFile(certPath)
if err != nil {
return nil, err
}
privateKey, err := openAndReadFile(privateKeyPath)
if err != nil {
return nil, err
}
clientCert, err := tls.X509KeyPair(cert, privateKey)
if err != nil {
return nil, err
}
// parse root CA pem and add into CA pool
// for self-signed cert, use the local cert as the root ca
rootCAPool := x509.NewCertPool()
ok := rootCAPool.AppendCertsFromPEM(rootCA)
if !ok {
return nil, fmt.Errorf("unable to append supplied cert into tls.Config, please make sure it is a valid certificate")
}
return &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: rootCAPool,
}, nil
}
func openAndReadFile(path string) ([]byte, error) {
// open file
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("unable to open file %s: %s", path, err)
}
// read file
b, err := io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("unable to ReadAll file %s: %s", path, err)
}
return b, nil
}
// SessionPoolConf is the configs of a session pool
// Note that the space name is bound to the session pool for its lifetime
type SessionPoolConf struct {
username string // username for authentication
password string // password for authentication
serviceAddrs []HostAddress // service addresses for session pool
hostIndex int // index of the host in ServiceAddrs that the next new session will connect to
spaceName string // The space name that all sessions in the pool are bound to
sslConfig *tls.Config // Optional SSL config for the connection
retryGetSessionTimes int // The max times to retry get new session when executing a query
// Basic pool configs
// Socket timeout and Socket connection timeout, unit: seconds
timeOut time.Duration
// The idleTime of the connection, unit: seconds
// If connection's idle time is longer than idleTime, it will be delete
// 0 value means the connection will not expire
idleTime time.Duration
// The max sessions in pool for all addresses
maxSize int
// The min sessions in pool for all addresses
minSize int
// useHTTP2 indicates whether to use HTTP2
useHTTP2 bool
// httpHeader is the http headers for the connection
httpHeader http.Header
// client handshakeKey, make sure the client handshakeKey is in the white list of NebulaGraph server 'client_white_list'
handshakeKey string
}
type SessionPoolConfOption func(*SessionPoolConf)
// NewSessionPoolConfOpt creates a new NewSessionPoolConf with the provided options
func NewSessionPoolConf(
username, password string,
serviceAddrs []HostAddress,
spaceName string, opts ...SessionPoolConfOption) (*SessionPoolConf, error) {
// Set default values for basic pool configs
newPoolConf := SessionPoolConf{
username: username,
password: password,
serviceAddrs: serviceAddrs,
spaceName: spaceName,
retryGetSessionTimes: 1,
timeOut: 0 * time.Millisecond,
idleTime: 0 * time.Millisecond,
maxSize: 30,
minSize: 1,
hostIndex: 0,
}
// Iterate the given options and apply them to the config.
for _, overwrite := range opts {
overwrite(&newPoolConf)
}
if err := newPoolConf.checkMandatoryFields(); err != nil {
return nil, err
}
return &newPoolConf, nil
}
func WithSSLConfig(sslConfig *tls.Config) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.sslConfig = sslConfig
}
}
func WithTimeOut(timeOut time.Duration) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.timeOut = timeOut
}
}
func WithIdleTime(idleTime time.Duration) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.idleTime = idleTime
}
}
func WithMaxSize(maxSize int) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.maxSize = maxSize
}
}
func WithMinSize(minSize int) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.minSize = minSize
}
}
func WithHTTP2(useHTTP2 bool) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.useHTTP2 = useHTTP2
}
}
func WithHttpHeader(header http.Header) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.httpHeader = header
}
}
func WithHandshakeKey(handshakeKey string) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.handshakeKey = handshakeKey
}
}
func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.username == "" {
return fmt.Errorf("invalid session pool config: Username is empty")
}
if conf.password == "" {
return fmt.Errorf("invalid session pool config: Password is empty")
}
if len(conf.serviceAddrs) == 0 {
return fmt.Errorf("invalid session pool config: Service address is empty")
}
if conf.spaceName == "" {
return fmt.Errorf("invalid session pool config: Space name is empty")
}
return nil
}
// checkBasicFields checks the basic fields of the config and
// sets a default value if the given field value is invalid
func (conf *SessionPoolConf) checkBasicFields(log Logger) {
// Check pool related fields, use default value if the given value is invalid
if conf.timeOut < 0 {
conf.timeOut = 0 * time.Millisecond
log.Warn("Illegal Timeout value, the default value of 0 second has been applied")
}
if conf.idleTime < 0 {
conf.idleTime = 0 * time.Millisecond
log.Warn("Invalid IdleTime value, the default value of 0 second has been applied")
}
if conf.maxSize < 1 {
conf.maxSize = 10
log.Warn("Invalid MaxSize value, the default value of 10 has been applied")
}
if conf.minSize < 0 {
conf.minSize = 0
log.Warn("Invalid MinSize value, the default value of 0 has been applied")
}
}