1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
package redisc
import (
"errors"
"time"
"github.com/gomodule/redigo/redis"
)
// RetryConn wraps the connection c (which must be a *Conn)
// into a connection that automatically handles cluster redirections
// (MOVED and ASK replies) and retries for TRYAGAIN errors.
// Only Do, Close and Err can be called on that connection,
// all other methods return an error.
//
// The maxAtt parameter indicates the maximum number of attempts
// to successfully execute the command. The tryAgainDelay is the
// duration to wait before retrying a TRYAGAIN error.
func RetryConn(c redis.Conn, maxAtt int, tryAgainDelay time.Duration) (redis.Conn, error) {
cc, ok := c.(*Conn)
if !ok {
return nil, errors.New("redisc: connection is not a *Conn")
}
return &retryConn{c: cc, maxAttempts: maxAtt, tryAgainDelay: tryAgainDelay}, nil
}
type retryConn struct {
c *Conn
maxAttempts int
tryAgainDelay time.Duration
}
func (rc *retryConn) Do(cmd string, args ...interface{}) (interface{}, error) {
return rc.do(cmd, args...)
}
func (rc *retryConn) do(cmd string, args ...interface{}) (interface{}, error) {
var att int
var asking bool
cluster := rc.c.cluster
for rc.maxAttempts <= 0 || att < rc.maxAttempts {
if asking {
if err := rc.c.Send("ASKING"); err != nil {
return nil, err
}
asking = false
}
v, err := rc.c.Do(cmd, args...)
re := ParseRedir(err)
if re == nil {
if IsTryAgain(err) {
// handle retry
time.Sleep(rc.tryAgainDelay)
att++
continue
}
// not a retry error nor a redirection, return result
return v, err
}
// handle redirection
rc.c.mu.Lock()
readOnly := rc.c.readOnly
connAddr := rc.c.boundAddr
rc.c.mu.Unlock()
if readOnly {
// check if the connection was already made to that slot, meaning
// that the redirection is because the command can't be served
// by the replica and a non-readonly connection must be made to
// the slot's master. If that's not the case, then keep the
// readonly flag to true, meaning that it will attempt a connection
// to a replica for the new slot.
cluster.mu.Lock()
slotMappings := cluster.mapping[re.NewSlot]
cluster.mu.Unlock()
if isIn(slotMappings, connAddr) {
readOnly = false
}
}
var conn redis.Conn
addr := re.Addr
asking = re.Type == "ASK"
if asking {
// if redirecting due to ASK, use the address that was
// provided in the ASK error reply.
conn, err = cluster.getConnForAddr(addr, rc.c.forceDial)
if err != nil {
return nil, err
}
// TODO(mna): does redis cluster send ASK replies that
// redirect to replicas if the source node was a replica?
// Assume no for now.
readOnly = false
} else {
// if redirecting due to a MOVED, the slot mapping is already
// updated to reflect the new server for that slot (done in
// rc.c.Do), so getConnForSlot will return a connection to
// the correct address.
conn, addr, err = cluster.getConnForSlot(re.NewSlot, rc.c.forceDial, readOnly)
if err != nil {
// could not get connection to that node, return that error
return nil, err
}
}
rc.c.mu.Lock()
// close and replace the old connection (close must come before assignments)
rc.c.closeLocked()
rc.c.rc = conn
rc.c.boundAddr = addr
rc.c.readOnly = readOnly
rc.c.mu.Unlock()
att++
}
return nil, errors.New("redisc: too many attempts")
}
func (rc *retryConn) Err() error {
return rc.c.Err()
}
func (rc *retryConn) Close() error {
return rc.c.Close()
}
func (rc *retryConn) Send(cmd string, args ...interface{}) error {
return errors.New("redisc: unsupported call to Send")
}
func (rc *retryConn) Receive() (interface{}, error) {
return nil, errors.New("redisc: unsupported call to Receive")
}
func (rc *retryConn) Flush() error {
return errors.New("redisc: unsupported call to Flush")
}
func isIn(list []string, v string) bool {
for _, vv := range list {
if v == vv {
return true
}
}
return false
}
|