File: retry_conn.go

package info (click to toggle)
golang-github-mna-redisc 1.1.7-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 308 kB
  • sloc: ruby: 1,261; sh: 101; makefile: 5
file content (152 lines) | stat: -rw-r--r-- 3,971 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package redisc

import (
	"errors"
	"time"

	"github.com/gomodule/redigo/redis"
)

// RetryConn wraps the connection c (which must be a *Conn)
// into a connection that automatically handles cluster redirections
// (MOVED and ASK replies) and retries for TRYAGAIN errors.
// Only Do, Close and Err can be called on that connection,
// all other methods return an error.
//
// The maxAtt parameter indicates the maximum number of attempts
// to successfully execute the command. The tryAgainDelay is the
// duration to wait before retrying a TRYAGAIN error.
func RetryConn(c redis.Conn, maxAtt int, tryAgainDelay time.Duration) (redis.Conn, error) {
	cc, ok := c.(*Conn)
	if !ok {
		return nil, errors.New("redisc: connection is not a *Conn")
	}
	return &retryConn{c: cc, maxAttempts: maxAtt, tryAgainDelay: tryAgainDelay}, nil
}

type retryConn struct {
	c *Conn

	maxAttempts   int
	tryAgainDelay time.Duration
}

func (rc *retryConn) Do(cmd string, args ...interface{}) (interface{}, error) {
	return rc.do(cmd, args...)
}

func (rc *retryConn) do(cmd string, args ...interface{}) (interface{}, error) {
	var att int
	var asking bool

	cluster := rc.c.cluster
	for rc.maxAttempts <= 0 || att < rc.maxAttempts {
		if asking {
			if err := rc.c.Send("ASKING"); err != nil {
				return nil, err
			}
			asking = false
		}

		v, err := rc.c.Do(cmd, args...)
		re := ParseRedir(err)
		if re == nil {
			if IsTryAgain(err) {
				// handle retry
				time.Sleep(rc.tryAgainDelay)
				att++
				continue
			}

			// not a retry error nor a redirection, return result
			return v, err
		}

		// handle redirection
		rc.c.mu.Lock()
		readOnly := rc.c.readOnly
		connAddr := rc.c.boundAddr
		rc.c.mu.Unlock()
		if readOnly {
			// check if the connection was already made to that slot, meaning
			// that the redirection is because the command can't be served
			// by the replica and a non-readonly connection must be made to
			// the slot's master. If that's not the case, then keep the
			// readonly flag to true, meaning that it will attempt a connection
			// to a replica for the new slot.
			cluster.mu.Lock()
			slotMappings := cluster.mapping[re.NewSlot]
			cluster.mu.Unlock()
			if isIn(slotMappings, connAddr) {
				readOnly = false
			}
		}

		var conn redis.Conn
		addr := re.Addr
		asking = re.Type == "ASK"

		if asking {
			// if redirecting due to ASK, use the address that was
			// provided in the ASK error reply.
			conn, err = cluster.getConnForAddr(addr, rc.c.forceDial)
			if err != nil {
				return nil, err
			}
			// TODO(mna): does redis cluster send ASK replies that
			// redirect to replicas if the source node was a replica?
			// Assume no for now.
			readOnly = false
		} else {
			// if redirecting due to a MOVED, the slot mapping is already
			// updated to reflect the new server for that slot (done in
			// rc.c.Do), so getConnForSlot will return a connection to
			// the correct address.
			conn, addr, err = cluster.getConnForSlot(re.NewSlot, rc.c.forceDial, readOnly)
			if err != nil {
				// could not get connection to that node, return that error
				return nil, err
			}
		}

		rc.c.mu.Lock()
		// close and replace the old connection (close must come before assignments)
		rc.c.closeLocked()
		rc.c.rc = conn
		rc.c.boundAddr = addr
		rc.c.readOnly = readOnly
		rc.c.mu.Unlock()

		att++
	}
	return nil, errors.New("redisc: too many attempts")
}

func (rc *retryConn) Err() error {
	return rc.c.Err()
}

func (rc *retryConn) Close() error {
	return rc.c.Close()
}

func (rc *retryConn) Send(cmd string, args ...interface{}) error {
	return errors.New("redisc: unsupported call to Send")
}

func (rc *retryConn) Receive() (interface{}, error) {
	return nil, errors.New("redisc: unsupported call to Receive")
}

func (rc *retryConn) Flush() error {
	return errors.New("redisc: unsupported call to Flush")
}

func isIn(list []string, v string) bool {
	for _, vv := range list {
		if v == vv {
			return true
		}
	}
	return false
}