1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
|
package redisc
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/gomodule/redigo/redis"
)
var _ redis.ConnWithTimeout = (*Conn)(nil)
// Conn is a redis cluster connection. When returned by Get
// or Dial, it is not yet bound to any node in the cluster.
// Only when a call to Do, Send, Receive or Bind is made is a connection
// to a specific node established:
//
// - if Do or Send is called first, the command's first parameter
// is assumed to be the key, and its slot is used to find the node
// - if Receive is called first, or if Do or Send is called first
// but with no parameter for the command (or no command), a
// random node is selected in the cluster
// - if Bind is called first, the node corresponding to the slot of
// the specified key(s) is selected
//
// Because Get and Dial return a redis.Conn interface,
// a type assertion can be used to call Bind or ReadOnly on this
// concrete Conn type:
//
// redisConn := cluster.Get()
// if conn, ok := redisConn.(*redisc.Conn); ok {
// if err := conn.Bind("my-key"); err != nil {
// // handle error
// }
// }
//
// Or call the package-level BindConn or ReadOnlyConn helper functions.
//
type Conn struct {
cluster *Cluster
forceDial bool // immutable
// redigo allows concurrent reader and writer (conn.Receive and
// conn.Send/conn.Flush), a mutex is needed to protect concurrent
// accesses.
mu sync.Mutex
readOnly bool
boundAddr string
err error
rc redis.Conn
}
// RedirError is a cluster redirection error. It indicates that
// the redis node returned either a MOVED or an ASK error, as
// specified by the Type field.
type RedirError struct {
// Type indicates if the redirection is a MOVED or an ASK.
Type string
// NewSlot is the slot number of the redirection.
NewSlot int
// Addr is the node address to redirect to.
Addr string
raw string
}
// Error returns the error message of a RedirError. This is the
// message as received from redis.
func (e *RedirError) Error() string {
return e.raw
}
func isRedisErr(err error, typ string) bool {
re, ok := err.(redis.Error)
if !ok {
return false
}
parts := strings.Fields(re.Error())
return len(parts) > 0 && parts[0] == typ
}
// IsTryAgain returns true if the error is a redis cluster
// error of type TRYAGAIN, meaning that the command is valid,
// but the cluster is in an unstable state and it can't complete
// the request at the moment.
func IsTryAgain(err error) bool {
return isRedisErr(err, "TRYAGAIN")
}
// IsCrossSlot returns true if the error is a redis cluster
// error of type CROSSSLOT, meaning that a command was sent
// with keys from different slots.
func IsCrossSlot(err error) bool {
return isRedisErr(err, "CROSSSLOT")
}
// ParseRedir parses err into a RedirError. If err is
// not a MOVED or ASK error or if it is nil, it returns nil.
func ParseRedir(err error) *RedirError {
re, ok := err.(redis.Error)
if !ok {
return nil
}
parts := strings.Fields(re.Error())
if len(parts) != 3 || (parts[0] != "MOVED" && parts[0] != "ASK") {
return nil
}
slot, err := strconv.Atoi(parts[1])
if err != nil {
return nil
}
return &RedirError{
Type: parts[0],
NewSlot: slot,
Addr: parts[2],
raw: re.Error(),
}
}
// binds the connection to a specific node, the one holding the slot
// or a random node if slot is -1, iff the connection is not broken
// and is not already bound. It returns the redis conn, true if it
// successfully bound to this slot, or any error.
func (c *Conn) bind(slot int) (rc redis.Conn, ok bool, err error) {
c.mu.Lock()
rc, err = c.rc, c.err
if err == nil {
if rc == nil {
conn, addr, err2 := c.cluster.getConn(slot, c.forceDial, c.readOnly)
if err2 != nil {
err = err2
} else {
c.rc, rc = conn, conn
c.boundAddr = addr
ok = true
}
}
}
c.mu.Unlock()
return rc, ok, err
}
func cmdSlot(cmd string, args []interface{}) int {
slot := -1
if len(args) > 0 {
key := fmt.Sprintf("%s", args[0])
slot = Slot(key)
}
return slot
}
// BindConn is a convenience function that checks if c implements
// a Bind method with the right signature such as the one for
// a *Conn, and calls that method. If c doesn't implement that
// method, it returns an error.
func BindConn(c redis.Conn, keys ...string) error {
if cc, ok := c.(interface {
Bind(...string) error
}); ok {
return cc.Bind(keys...)
}
return errors.New("redisc: no Bind method")
}
// Bind binds the connection to the cluster node corresponding to
// the slot of the provided keys. If the keys don't belong to the
// same slot, an error is returned and the connection is not bound.
// If the connection is already bound, an error is returned.
// If no key is provided, it binds to a random node.
func (c *Conn) Bind(keys ...string) error {
slot := -1
for _, k := range keys {
ks := Slot(k)
if slot != -1 && ks != slot {
return errors.New("redisc: keys do not belong to the same slot")
}
slot = ks
}
_, ok, err := c.bind(slot)
if err != nil {
return err
}
if !ok {
// was already bound
return errors.New("redisc: connection already bound to a node")
}
return nil
}
// ReadOnlyConn is a convenience function that checks if c implements
// a ReadOnly method with the right signature such as the one for
// a *Conn, and calls that method. If c doesn't implement that
// method, it returns an error.
func ReadOnlyConn(c redis.Conn) error {
if cc, ok := c.(interface {
ReadOnly() error
}); ok {
return cc.ReadOnly()
}
return errors.New("redisc: no ReadOnly method")
}
// ReadOnly marks the connection as read-only, meaning that when it is
// bound to a cluster node, it will attempt to connect to a replica instead
// of the master and will automatically emit a READONLY command so that
// the replica agrees to serve read commands. Be aware that reading
// from a replica may return stale data. Sending write commands on a
// read-only connection will fail with a MOVED error.
// See http://redis.io/commands/readonly for more details.
//
// If the connection is already bound to a node, either via a call to
// Do, Send, Receive or Bind, ReadOnly returns an error.
func (c *Conn) ReadOnly() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.err != nil {
return c.err
}
if c.rc != nil {
// was already bound
return errors.New("redisc: connection already bound to a node")
}
c.readOnly = true
return nil
}
// Do sends a command to the server and returns the received reply.
// If the connection is not yet bound to a cluster node, it will be
// after this call, based on the rules documented in the Conn type.
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
return c.DoWithTimeout(-1, cmd, args...)
}
// DoWithTimeout sends a command to the server and returns the received reply.
// If the connection is not yet bound to a cluster node, it will be
// after this call, based on the rules documented in the Conn type.
//
// The timeout overrides the read timeout set when dialing the
// connection (in the DialOptions of the Cluster).
func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (v interface{}, err error) {
// The blank command is a special redigo/redis command that flushes the
// output buffer and receives all pending replies. This is used, for example,
// when returning a Redis conneciton back to the pool. If we recieve the
// blank command, don't bind to a random node if this connection is not bound
// yet.
if cmd == "" && len(args) == 0 {
c.mu.Lock()
rc := c.rc
c.mu.Unlock()
if rc == nil {
return nil, nil
}
}
rc, _, err := c.bind(cmdSlot(cmd, args))
if err != nil {
return nil, err
}
if timeout < 0 {
v, err = rc.Do(cmd, args...)
} else if rcwt, ok := rc.(redis.ConnWithTimeout); ok {
v, err = rcwt.DoWithTimeout(timeout, cmd, args...)
} else {
return nil, errors.New("redisc: connection does not support ConnWithTimeout")
}
// handle redirections, if any
if re := ParseRedir(err); re != nil {
if re.Type == "MOVED" {
c.cluster.needsRefresh(re)
}
}
return v, err
}
// Send writes the command to the client's output buffer. If the
// connection is not yet bound to a cluster node, it will be after
// this call, based on the rules documented in the Conn type.
func (c *Conn) Send(cmd string, args ...interface{}) error {
rc, _, err := c.bind(cmdSlot(cmd, args))
if err != nil {
return err
}
return rc.Send(cmd, args...)
}
// Receive receives a single reply from the server. If the connection
// is not yet bound to a cluster node, it will be after this call,
// based on the rules documented in the Conn type.
func (c *Conn) Receive() (interface{}, error) {
return c.ReceiveWithTimeout(-1)
}
// ReceiveWithTimeout receives a single reply from the Redis server.
// If the connection is not yet bound to a cluster node, it will be
// after this call, based on the rules documented in the Conn type.
//
// The timeout overrides the read timeout set when dialing the
// connection (in the DialOptions of the Cluster).
func (c *Conn) ReceiveWithTimeout(timeout time.Duration) (v interface{}, err error) {
rc, _, err := c.bind(-1)
if err != nil {
return nil, err
}
if timeout < 0 {
v, err = rc.Receive()
} else if rcwt, ok := rc.(redis.ConnWithTimeout); ok {
v, err = rcwt.ReceiveWithTimeout(timeout)
} else {
return nil, errors.New("redisc: connection does not support ConnWithTimeout")
}
// handle redirections, if any
if re := ParseRedir(err); re != nil {
if re.Type == "MOVED" {
c.cluster.needsRefresh(re)
}
}
return v, err
}
// Flush flushes the output buffer to the server.
func (c *Conn) Flush() error {
c.mu.Lock()
err := c.err
if err == nil && c.rc != nil {
err = c.rc.Flush()
}
c.mu.Unlock()
return err
}
// Err returns a non-nil value if the connection is broken. Applications
// should close broken connections.
func (c *Conn) Err() error {
c.mu.Lock()
err := c.err
if err == nil && c.rc != nil {
err = c.rc.Err()
}
c.mu.Unlock()
return err
}
// Close closes the connection.
func (c *Conn) Close() error {
c.mu.Lock()
err := c.err
if err == nil {
c.err = errors.New("redisc: closed")
err = c.closeLocked()
}
c.mu.Unlock()
return err
}
func (c *Conn) closeLocked() (err error) {
if c.rc != nil {
// this may be a pooled connection, so make sure the readOnly flag is reset
if c.readOnly {
c.rc.Do("READWRITE")
}
err = c.rc.Close()
}
return err
}
|