-
Notifications
You must be signed in to change notification settings - Fork 9
/
client.go
481 lines (416 loc) · 11.3 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
package redis
import (
"bufio"
"errors"
"fmt"
"net"
"time"
)
// DialDelayMax is the idle limit for automated reconnect attempts.
// Sequential failure with connection establisment increases the retry
// delay in steps from 0 to 500 ms.
const DialDelayMax = time.Second / 2
// Fixed Settings
const (
// Number of pending requests limit per network protocol.
queueSizeTCP = 128
queueSizeUnix = 512
)
// ErrConnLost signals connection loss on pending request.
var errConnLost = errors.New("redis: connection lost while awaiting response")
// ClientConfig defines a Client setup.
type ClientConfig struct {
// The host defaults to localhost, and the port defaults to 6379.
// Thus, the empty string defaults to "localhost:6379". Use an
// absolute file path (e.g. "/var/run/redis.sock") for Unix
// domain sockets.
Addr string
// Limit execution duration when nonzero. Expiry causes a reconnect
// to prevent stale connections and a net.Error with Timeout() true.
CommandTimeout time.Duration
// Limit the duration for network connection establishment. Expiry
// causes an abort plus retry. See net.Dialer Timeout for details.
// Zero defaults to one second.
//
// Command submission blocks during the first dial attempt. When the
// connect fails, then command submission receives the error of the last
// connect attempt until the connection restores.
DialTimeout time.Duration
// AUTH when not nil.
Password []byte
// SELECT when not zero.
DB int64
}
// Client manages a connection to a Redis node until Close. Broken connection
// states cause automated reconnects.
//
// Multiple goroutines may invoke methods on a Client simultaneously. Command
// invocation applies <https://redis.io/topics/pipelining> on concurrency.
type Client[Key, Value String] struct {
ClientConfig // read-only attributes
noCopy noCopy
// The connection semaphore is used as a write lock.
connSem chan *redisConn
// Requests are send with a redisConn. The buffering reader attached to
// a redisConn is used to read the response. Requests enqueue a callback
// channel to parse the response in pipeline order. A nil Reader receive
// implies connection loss.
// Insertion must hold the write lock (connSem).
readQueue chan chan<- *bufio.Reader
// A send/receive halts the read routine. The bufio.Reader is discarded.
// No more consumption on ReadQueue.
// Insertion must hold the write lock (connSem).
readTerm chan struct{}
}
// NewDefaultClient launches a managed connection to a node (address).
// Both CommandTimeout and DialTimeout are set to one second.
func NewDefaultClient[Key, Value String](addr string) *Client[Key, Value] {
return NewClient[Key, Value](ClientConfig{
Addr: addr,
CommandTimeout: time.Second,
DialTimeout: time.Second,
})
}
// NewClient launches a managed connection to a node (address).
func NewClient[Key, Value String](config ClientConfig) *Client[Key, Value] {
config.Addr = normalizeAddr(config.Addr)
if config.DialTimeout == 0 {
config.DialTimeout = time.Second
}
queueSize := queueSizeTCP
if isUnixAddr(config.Addr) {
queueSize = queueSizeUnix
}
c := &Client[Key, Value]{
ClientConfig: config,
connSem: make(chan *redisConn, 1),
readQueue: make(chan chan<- *bufio.Reader, queueSize),
readTerm: make(chan struct{}),
}
go c.connectOrClosed()
return c
}
type redisConn struct {
net.Conn // nil when offline
offline error // reason for connection absence
// The token is nil when a read routine is using it.
idle *bufio.Reader
}
// Close terminates the connection establishment.
// Command submission is stopped with ErrClosed.
// All pending commands are dealt with on return.
// Calling Close more than once has no effect.
func (c *Client[Key, Value]) Close() error {
conn := <-c.connSem // lock write
if conn.offline == ErrClosed {
// redundant invocation
c.connSem <- conn // unlock write
return nil
}
if conn.offline == nil && conn.idle == nil {
// must hold write lock for insertion:
c.readTerm <- struct{}{}
// race unlikely yet possible
c.cancelQueue()
}
// stop command submission (unlocks write)
c.connSem <- &redisConn{offline: ErrClosed}
if conn.Conn != nil {
return conn.Close()
}
return nil
}
// connectOrClosed populates the connection semaphore.
func (c *Client[Key, Value]) connectOrClosed() {
var retryDelay time.Duration
for {
conn, reader, err := c.connect(conservativeMSS)
if err != nil {
retry := time.NewTimer(retryDelay)
// remove previous connect error unless closed
if retryDelay != 0 {
current := <-c.connSem
if current.offline == ErrClosed {
c.connSem <- current // restore
retry.Stop() // cleanup
return // abandon
}
}
// propagate current connect error
c.connSem <- &redisConn{offline: fmt.Errorf("redis: offline due %w", err)}
retryDelay = 2*retryDelay + time.Millisecond
if retryDelay > DialDelayMax {
retryDelay = DialDelayMax
}
<-retry.C
continue
}
// remove previous connect error unless closed
if retryDelay != 0 {
current := <-c.connSem
if current.offline == ErrClosed {
c.connSem <- current // restore
conn.Close() // discard
return // abandon
}
}
// release
c.connSem <- &redisConn{Conn: conn, idle: reader}
return
}
}
func (c *Client[Key, Value]) cancelQueue() {
for {
select {
case ch := <-c.readQueue:
// signal connection loss
ch <- (*bufio.Reader)(nil)
default:
return
}
}
}
// Exchange sends a request, and then it awaits its turn (in the pipeline) for
// response receiption.
func (c *Client[Key, Value]) exchange(req *request) (*bufio.Reader, error) {
conn := <-c.connSem // lock write
// validate connection state
if err := conn.offline; err != nil {
c.connSem <- conn // unlock write
return nil, err
}
// apply time-out if set
var deadline time.Time
if c.CommandTimeout != 0 {
deadline = time.Now().Add(c.CommandTimeout)
conn.SetWriteDeadline(deadline)
}
// send command
if _, err := conn.Write(req.buf); err != nil {
// write remains locked (until connectOrClosed)
go func() {
if conn.idle == nil {
// read routine running
// must hold write lock for insertion:
c.readTerm <- struct{}{}
c.cancelQueue()
}
conn.Close()
c.connectOrClosed()
}()
return nil, err
}
reader := conn.idle
if reader != nil {
// clear idle state; we're the read routine now
conn.idle = nil
// receive channel not used as first in line
req.free()
} else {
// read routine is running; wait in line
// must hold write lock for insertion:
c.readQueue <- req.receive
}
c.connSem <- conn // unlock write
if reader == nil {
// await response turn in pipeline
reader = <-req.receive
req.free()
if reader == nil {
// queue abandonment
return nil, errConnLost
}
}
if !deadline.IsZero() {
conn.SetReadDeadline(deadline)
}
return reader, nil
}
func (c *Client[Key, Value]) commandOK(req *request) error {
r, err := c.exchange(req)
if err != nil {
return err
}
err = readOK(r)
c.passRead(r, err)
return err
}
func (c *Client[Key, Value]) commandOKOrReconnect(req *request) error {
r, err := c.exchange(req)
if err != nil {
return err
}
err = readOK(r)
if err != nil {
c.dropConnFromRead()
} else {
c.passRead(r, nil)
}
return err
}
func (c *Client[Key, Value]) commandInteger(req *request) (int64, error) {
r, err := c.exchange(req)
if err != nil {
return 0, err
}
integer, err := readInteger(r)
c.passRead(r, err)
return integer, err
}
func (c *Client[Key, Value]) commandBulk(req *request) (bulk Value, _ error) {
r, err := c.exchange(req)
if err != nil {
return bulk, err
}
bulk, err = readBulk[Value](r)
c.passRead(r, err)
if err == errNull {
err = nil
}
return bulk, err
}
func (c *Client[Key, Value]) commandArray(req *request) ([]Value, error) {
r, err := c.exchange(req)
if err != nil {
return nil, err
}
array, err := readArray[Value](r)
c.passRead(r, err)
if err == errNull {
err = nil
}
return array, err
}
// PassRead hands over the buffered reader to the following command in line. It
// goes in idle mode (on the redisConn from connSem) when all requests are done
// for.
func (c *Client[Key, Value]) passRead(r *bufio.Reader, err error) {
switch err {
case nil, errNull:
break
default:
_, ok := err.(ServerError)
if !ok {
// got an I/O error on response
c.dropConnFromRead()
return
}
}
// pass r to enqueued
select {
case next := <-c.readQueue:
next <- r // direct pass
return
default:
break
}
// go idle
select {
case next := <-c.readQueue:
// request enqueued while awaiting lock
next <- r // pass after all
// Acquire write lock to make the idle decision atomic, as
// readQueue insertion (in exchange) operates within the lock.
case conn := <-c.connSem:
// write locked
select {
case next := <-c.readQueue:
// lost race while awaiting lock
next <- r // pass after all
default:
conn.idle = r // go idle mode
}
c.connSem <- conn // unlock write
case <-c.readTerm:
break // accept halt; discard r
}
}
// DropConnFromRead disconnects with Redis.
func (c *Client[Key, Value]) dropConnFromRead() {
for {
select {
case <-c.readTerm:
// accept halt; let sender drop conn
return
// A write (lock owner) blocks on a full queue,
// so include discard here to prevent deadlock.
case next := <-c.readQueue:
// signal connection loss
next <- (*bufio.Reader)(nil)
case conn := <-c.connSem:
// write locked
if conn.offline != nil {
c.connSem <- conn // unlock write
} else {
// write remains locked (until connectOrClosed)
go func() {
conn.Close()
c.cancelQueue()
c.connectOrClosed()
}()
}
return
}
}
}
func (c *ClientConfig) connect(readBufferSize int) (net.Conn, *bufio.Reader, error) {
network := "tcp"
if isUnixAddr(c.Addr) {
network = "unix"
}
conn, err := net.DialTimeout(network, c.Addr, c.DialTimeout)
if err != nil {
return nil, nil, err
}
// connection tuning
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetNoDelay(false)
tcp.SetLinger(0)
}
reader := bufio.NewReaderSize(conn, readBufferSize)
// apply sticky settings
if c.Password != nil {
req := requestWithString("*2\r\n$4\r\nAUTH\r\n$", c.Password)
defer req.free()
if c.CommandTimeout != 0 {
conn.SetDeadline(time.Now().Add(c.CommandTimeout))
defer conn.SetDeadline(time.Time{})
}
_, err := conn.Write(req.buf)
// ⚠️ reverse/delayed error check
if err == nil {
err = readOK(reader)
}
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("redis: AUTH on new connection: %w", err)
}
}
if c.DB != 0 {
req := requestWithDecimal("*2\r\n$6\r\nSELECT\r\n$", c.DB)
defer req.free()
if c.CommandTimeout != 0 {
conn.SetDeadline(time.Now().Add(c.CommandTimeout))
defer conn.SetDeadline(time.Time{})
}
_, err := conn.Write(req.buf)
// ⚠️ reverse/delayed error check
if err == nil {
err = readOK(reader)
}
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("redis: SELECT on new connection: %w", err)
}
}
return conn, reader, nil
}
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}