forked from mediocregopher/radix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathradix.go
492 lines (443 loc) · 14.9 KB
/
radix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
// Package radix implements all functionality needed to work with redis and all
// things related to it, including redis cluster, pubsub, sentinel, scanning,
// lua scripting, and more.
//
// Creating a client
//
// For a single node redis instance use NewPool to create a connection pool. The
// connection pool is thread-safe and will automatically create, reuse, and
// recreate connections as needed:
//
// pool, err := radix.NewPool("tcp", "127.0.0.1:6379", 10)
// if err != nil {
// // handle error
// }
//
// If you're using sentinel or cluster you should use NewSentinel or NewCluster
// (respectively) to create your client instead.
//
// Commands
//
// Any redis command can be performed by passing a Cmd into a Client's Do
// method. Each Cmd should only be used once. The return from the Cmd can be
// captured into any appopriate go primitive type, or a slice, map, or struct,
// if the command returns an array.
//
// err := client.Do(radix.Cmd(nil, "SET", "foo", "someval"))
//
// var fooVal string
// err := client.Do(radix.Cmd(&fooVal, "GET", "foo"))
//
// var fooValB []byte
// err := client.Do(radix.Cmd(&fooValB, "GET", "foo"))
//
// var barI int
// err := client.Do(radix.Cmd(&barI, "INCR", "bar"))
//
// var bazEls []string
// err := client.Do(radix.Cmd(&bazEls, "LRANGE", "baz", "0", "-1"))
//
// var buzMap map[string]string
// err := client.Do(radix.Cmd(&buzMap, "HGETALL", "buz"))
//
// FlatCmd can also be used if you wish to use non-string arguments like
// integers, slices, maps, or structs, and have them automatically be flattened
// into a single string slice.
//
// Struct Scanning
//
// Cmd and FlatCmd can unmarshal results into a struct. The results must be a
// key/value array, such as that returned by HGETALL. Exported field names will
// be used as keys, unless the fields have the "redis" tag:
//
// type MyType struct {
// Foo string // Will be populated with the value for key "Foo"
// Bar string `redis:"BAR"` // Will be populated with the value for key "BAR"
// Baz string `redis:"-"` // Will not be populated
// }
//
// Embedded structs will inline that struct's fields into the parent's:
//
// type MyOtherType struct {
// // adds fields "Foo" and "BAR" (from above example) to MyOtherType
// MyType
// Biz int
// }
//
// The same rules for field naming apply when a struct is passed into FlatCmd as
// an argument.
//
// Actions
//
// Cmd and FlatCmd both implement the Action interface. Other Actions include
// Pipeline, WithConn, and EvalScript.Cmd. Any of these may be passed into any
// Client's Do method.
//
// var fooVal string
// p := radix.Pipeline(
// radix.FlatCmd(nil, "SET", "foo", 1),
// radix.Cmd(&fooVal, "GET", "foo"),
// )
// if err := client.Do(p); err != nil {
// panic(err)
// }
// fmt.Printf("fooVal: %q\n", fooVal)
//
// Transactions
//
// There are two ways to perform transactions in redis. The first is with the
// MULTI/EXEC commands, which can be done using the WithConn Action (see its
// example). The second is using EVAL with lua scripting, which can be done
// using the EvalScript Action (again, see its example).
//
// EVAL with lua scripting is recommended in almost all cases. It only requires
// a single round-trip, it's infinitely more flexible than MULTI/EXEC, it's
// simpler to code, and for complex transactions, which would otherwise need a
// WATCH statement with MULTI/EXEC, it's significantly faster.
//
// AUTH and other settings via ConnFunc and ClientFunc
//
// All the client creation functions (e.g. NewPool) take in either a ConnFunc or
// a ClientFunc via their options. These can be used in order to set up timeouts
// on connections, perform authentication commands, or even implement custom
// pools.
//
// // this is a ConnFunc which will set up a connection which is authenticated
// // and has a 1 minute timeout on all operations
// customConnFunc := func(network, addr string) (radix.Conn, error) {
// return radix.Dial(network, addr,
// radix.DialTimeout(1 * time.Minute),
// radix.DialAuthPass("mySuperSecretPassword"),
// )
// }
//
// // this pool will use our ConnFunc for all connections it creates
// pool, err := radix.NewPool("tcp", redisAddr, 10, PoolConnFunc(customConnFunc))
//
// // this cluster will use the ClientFunc to create a pool to each node in the
// // cluster. The pools also use our customConnFunc, but have more connections
// poolFunc := func(network, addr string) (radix.Client, error) {
// return radix.NewPool(network, addr, 100, PoolConnFunc(customConnFunc))
// }
// cluster, err := radix.NewCluster([]string{redisAddr1, redisAddr2}, ClusterPoolFunc(poolFunc))
//
// Custom implementations
//
// All interfaces in this package were designed such that they could have custom
// implementations. There is no dependency within radix that demands any
// interface be implemented by a particular underlying type, so feel free to
// create your own Pools or Conns or Actions or whatever makes your life easier.
//
// Errors
//
// Errors returned from redis can be explicitly checked for using the the
// resp2.Error type. Note that the errors.As function, introduced in go 1.13,
// should be used.
//
// var redisErr resp2.Error
// err := client.Do(radix.Cmd(nil, "AUTH", "wrong password"))
// if errors.As(err, &redisErr) {
// log.Printf("redis error returned: %s", redisErr.E)
// }
//
// Use the golang.org/x/xerrors package if you're using an older version of go.
//
package radix
import (
"bufio"
"crypto/tls"
"net"
"net/url"
"strconv"
"strings"
"time"
errors "golang.org/x/xerrors"
"github.com/mediocregopher/radix/v3/resp"
)
var errClientClosed = errors.New("client is closed")
// Client describes an entity which can carry out Actions, e.g. a connection
// pool for a single redis instance or the cluster client.
//
// Implementations of Client are expected to be thread-safe, except in cases
// like Conn where they specify otherwise.
type Client interface {
// Do performs an Action, returning any error.
Do(Action) error
// Once Close() is called all future method calls on the Client will return
// an error
Close() error
}
// ClientFunc is a function which can be used to create a Client for a single
// redis instance on the given network/address.
type ClientFunc func(network, addr string) (Client, error)
// DefaultClientFunc is a ClientFunc which will return a Client for a redis
// instance using sane defaults.
var DefaultClientFunc = func(network, addr string) (Client, error) {
return NewPool(network, addr, 4)
}
// Conn is a Client wrapping a single network connection which synchronously
// reads/writes data using the redis resp protocol.
//
// A Conn can be used directly as a Client, but in general you probably want to
// use a *Pool instead
type Conn interface {
// The Do method of a Conn is _not_ expected to be thread-safe with the
// other methods of Conn, and merely calls the Action's Run method with
// itself as the argument.
Client
// Encode and Decode may be called at the same time by two different
// go-routines, but each should only be called once at a time (i.e. two
// routines shouldn't call Encode at the same time, same with Decode).
//
// Encode and Decode should _not_ be called at the same time as Do.
//
// If either Encode or Decode encounter a net.Error the Conn will be
// automatically closed.
//
// Encode is expected to encode an entire resp message, not a partial one.
// In other words, when sending commands to redis, Encode should only be
// called once per command. Similarly, Decode is expected to decode an
// entire resp response.
Encode(resp.Marshaler) error
Decode(resp.Unmarshaler) error
// Returns the underlying network connection, as-is. Read, Write, and Close
// should not be called on the returned Conn.
NetConn() net.Conn
}
type connWrap struct {
net.Conn
brw *bufio.ReadWriter
}
// NewConn takes an existing net.Conn and wraps it to support the Conn interface
// of this package. The Read and Write methods on the original net.Conn should
// not be used after calling this method.
func NewConn(conn net.Conn) Conn {
return &connWrap{
Conn: conn,
brw: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
}
}
func (cw *connWrap) Do(a Action) error {
return a.Run(cw)
}
func (cw *connWrap) Encode(m resp.Marshaler) error {
if err := m.MarshalRESP(cw.brw); err != nil {
return err
}
return cw.brw.Flush()
}
func (cw *connWrap) Decode(u resp.Unmarshaler) error {
return u.UnmarshalRESP(cw.brw.Reader)
}
func (cw *connWrap) NetConn() net.Conn {
return cw.Conn
}
// ConnFunc is a function which returns an initialized, ready-to-be-used Conn.
// Functions like NewPool or NewCluster take in a ConnFunc in order to allow for
// things like calls to AUTH on each new connection, setting timeouts, custom
// Conn implementations, etc... See the package docs for more details.
type ConnFunc func(network, addr string) (Conn, error)
// DefaultConnFunc is a ConnFunc which will return a Conn for a redis instance
// using sane defaults.
var DefaultConnFunc = func(network, addr string) (Conn, error) {
return Dial(network, addr)
}
func wrapDefaultConnFunc(addr string) ConnFunc {
_, opts := parseRedisURL(addr)
return func(network, addr string) (Conn, error) {
return Dial(network, addr, opts...)
}
}
////////////////////////////////////////////////////////////////////////////////
type dialOpts struct {
connectTimeout, readTimeout, writeTimeout time.Duration
authPass string
selectDB string
useTLSConfig bool
tlsConfig *tls.Config
}
// DialOpt is an optional behavior which can be applied to the Dial function to
// effect its behavior, or the behavior of the Conn it creates.
type DialOpt func(*dialOpts)
// DialConnectTimeout determines the timeout value to pass into net.DialTimeout
// when creating the connection. If not set then net.Dial is called instead.
func DialConnectTimeout(d time.Duration) DialOpt {
return func(do *dialOpts) {
do.connectTimeout = d
}
}
// DialReadTimeout determines the deadline to set when reading from a dialed
// connection. If not set then SetReadDeadline is never called.
func DialReadTimeout(d time.Duration) DialOpt {
return func(do *dialOpts) {
do.readTimeout = d
}
}
// DialWriteTimeout determines the deadline to set when writing to a dialed
// connection. If not set then SetWriteDeadline is never called.
func DialWriteTimeout(d time.Duration) DialOpt {
return func(do *dialOpts) {
do.writeTimeout = d
}
}
// DialTimeout is the equivalent to using DialConnectTimeout, DialReadTimeout,
// and DialWriteTimeout all with the same value.
func DialTimeout(d time.Duration) DialOpt {
return func(do *dialOpts) {
DialConnectTimeout(d)(do)
DialReadTimeout(d)(do)
DialWriteTimeout(d)(do)
}
}
// DialAuthPass will cause Dial to perform an AUTH command once the connection
// is created, using the given pass.
//
// If this is set and a redis URI is passed to Dial which also has a password
// set, this takes precedence.
func DialAuthPass(pass string) DialOpt {
return func(do *dialOpts) {
do.authPass = pass
}
}
// DialSelectDB will cause Dial to perform a SELECT command once the connection
// is created, using the given database index.
//
// If this is set and a redis URI is passed to Dial which also has a database
// index set, this takes precedence.
func DialSelectDB(db int) DialOpt {
return func(do *dialOpts) {
do.selectDB = strconv.Itoa(db)
}
}
// DialUseTLS will cause Dial to perform a TLS handshake using the provided
// config. If config is nil the config is interpreted as equivalent to the zero
// configuration. See https://golang.org/pkg/crypto/tls/#Config
func DialUseTLS(config *tls.Config) DialOpt {
return func(do *dialOpts) {
do.tlsConfig = config
do.useTLSConfig = true
}
}
type timeoutConn struct {
net.Conn
readTimeout, writeTimeout time.Duration
}
func (tc *timeoutConn) Read(b []byte) (int, error) {
if tc.readTimeout > 0 {
tc.Conn.SetReadDeadline(time.Now().Add(tc.readTimeout))
}
return tc.Conn.Read(b)
}
func (tc *timeoutConn) Write(b []byte) (int, error) {
if tc.writeTimeout > 0 {
tc.Conn.SetWriteDeadline(time.Now().Add(tc.writeTimeout))
}
return tc.Conn.Write(b)
}
var defaultDialOpts = []DialOpt{
DialTimeout(10 * time.Second),
}
func parseRedisURL(urlStr string) (string, []DialOpt) {
// do a quick check before we bust out url.Parse, in case that is very
// unperformant
if !strings.HasPrefix(urlStr, "redis://") {
return urlStr, nil
}
u, err := url.Parse(urlStr)
if err != nil {
return urlStr, nil
}
var opts []DialOpt
q := u.Query()
if p, ok := u.User.Password(); ok {
opts = append(opts, DialAuthPass(p))
} else if qpw := q.Get("password"); qpw != "" {
opts = append(opts, DialAuthPass(qpw))
}
dbStr := q.Get("db")
if u.Path != "" && u.Path != "/" {
dbStr = u.Path[1:]
}
if dbStr, err := strconv.Atoi(dbStr); err == nil {
opts = append(opts, DialSelectDB(dbStr))
}
return u.Host, opts
}
// Dial is a ConnFunc which creates a Conn using net.Dial and NewConn. It takes
// in a number of options which can overwrite its default behavior as well.
//
// In place of a host:port address, Dial also accepts a URI, as per:
// https://www.iana.org/assignments/uri-schemes/prov/redis
// If the URI has an AUTH password or db specified Dial will attempt to perform
// the AUTH and/or SELECT as well.
//
// If either DialAuthPass or DialSelectDB is used it overwrites the associated
// value passed in by the URI.
//
// The default options Dial uses are:
//
// DialTimeout(10 * time.Second)
//
func Dial(network, addr string, opts ...DialOpt) (Conn, error) {
var do dialOpts
for _, opt := range defaultDialOpts {
opt(&do)
}
addr, addrOpts := parseRedisURL(addr)
for _, opt := range addrOpts {
opt(&do)
}
for _, opt := range opts {
opt(&do)
}
var netConn net.Conn
var err error
dialer := net.Dialer{}
if do.connectTimeout > 0 {
dialer.Timeout = do.connectTimeout
}
if do.useTLSConfig {
netConn, err = tls.DialWithDialer(&dialer, network, addr, do.tlsConfig)
} else {
netConn, err = dialer.Dial(network, addr)
}
if err != nil {
return nil, err
}
// If the netConn is a net.TCPConn (or some wrapper for it) and so can have
// keepalive enabled, do so with a sane (though slightly aggressive)
// default.
{
type keepaliveConn interface {
SetKeepAlive(bool) error
SetKeepAlivePeriod(time.Duration) error
}
if kaConn, ok := netConn.(keepaliveConn); ok {
if err = kaConn.SetKeepAlive(true); err != nil {
netConn.Close()
return nil, err
} else if err = kaConn.SetKeepAlivePeriod(10 * time.Second); err != nil {
netConn.Close()
return nil, err
}
}
}
conn := NewConn(&timeoutConn{
readTimeout: do.readTimeout,
writeTimeout: do.writeTimeout,
Conn: netConn,
})
if do.authPass != "" {
if err := conn.Do(Cmd(nil, "AUTH", do.authPass)); err != nil {
conn.Close()
return nil, err
}
}
if do.selectDB != "" {
if err := conn.Do(Cmd(nil, "SELECT", do.selectDB)); err != nil {
conn.Close()
return nil, err
}
}
return conn, nil
}