1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
|
# -*- coding: utf-8 -*-
from socket import gethostbyaddr
from functools import wraps
# rediscluster imports
from .exceptions import (
RedisClusterException, ClusterDownError
)
# 3rd party imports
from redis._compat import basestring, nativestr
def bool_ok(response, *args, **kwargs):
"""
Borrowed from redis._compat becuase that method to not support extra arguments
when used in a cluster environment.
"""
return nativestr(response) == 'OK'
def string_keys_to_dict(key_strings, callback):
"""
Maps each string in `key_strings` to `callback` function
and return as a dict.
"""
return dict.fromkeys(key_strings, callback)
def dict_merge(*dicts):
"""
Merge all provided dicts into 1 dict.
"""
merged = {}
for d in dicts:
if not isinstance(d, dict):
raise ValueError('Value should be of dict type')
else:
merged.update(d)
return merged
def blocked_command(self, command):
"""
Raises a `RedisClusterException` mentioning the command is blocked.
"""
raise RedisClusterException("Command: {0} is blocked in redis cluster mode".format(command))
def merge_result(command, res):
"""
Merge all items in `res` into a list.
This command is used when sending a command to multiple nodes
and they result from each node should be merged into a single list.
"""
if not isinstance(res, dict):
raise ValueError('Value should be of dict type')
result = set([])
for _, v in res.items():
for value in v:
result.add(value)
return list(result)
def first_key(command, res):
"""
Returns the first result for the given command.
If more then 1 result is returned then a `RedisClusterException` is raised.
"""
if not isinstance(res, dict):
raise ValueError('Value should be of dict type')
if len(res.keys()) != 1:
raise RedisClusterException("More then 1 result from command: {0}".format(command))
return list(res.values())[0]
def clusterdown_wrapper(func):
"""
Wrapper for CLUSTERDOWN error handling.
If the cluster reports it is down it is assumed that:
- connection_pool was disconnected
- connection_pool was reseted
- refereh_table_asap set to True
It will try 3 times to rerun the command and raises ClusterDownException if it continues to fail.
"""
@wraps(func)
def inner(*args, **kwargs):
for _ in range(0, 3):
try:
return func(*args, **kwargs)
except ClusterDownError:
# Try again with the new cluster setup. All other errors
# should be raised.
pass
# If it fails 3 times then raise exception back to caller
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
return inner
def nslookup(node_ip):
"""
"""
if ':' not in node_ip:
return gethostbyaddr(node_ip)[0]
ip, port = node_ip.split(':')
return '{0}:{1}'.format(gethostbyaddr(ip)[0], port)
def parse_cluster_slots(resp, **options):
"""
"""
current_host = options.get('current_host', '')
def fix_server(*args):
return (nativestr(args[0]) or current_host, args[1])
slots = {}
for slot in resp:
start, end, master = slot[:3]
slaves = slot[3:]
slots[start, end] = {
'master': fix_server(*master),
'slaves': [fix_server(*slave) for slave in slaves],
}
return slots
def parse_cluster_nodes(resp, **options):
"""
@see: http://redis.io/commands/cluster-nodes # string
@see: http://redis.io/commands/cluster-slaves # list of string
"""
resp = nativestr(resp)
current_host = options.get('current_host', '')
def parse_slots(s):
slots, migrations = [], []
for r in s.split(' '):
if '->-' in r:
slot_id, dst_node_id = r[1:-1].split('->-', 1)
migrations.append({
'slot': int(slot_id),
'node_id': dst_node_id,
'state': 'migrating'
})
elif '-<-' in r:
slot_id, src_node_id = r[1:-1].split('-<-', 1)
migrations.append({
'slot': int(slot_id),
'node_id': src_node_id,
'state': 'importing'
})
elif '-' in r:
start, end = r.split('-')
slots.extend(range(int(start), int(end) + 1))
else:
slots.append(int(r))
return slots, migrations
if isinstance(resp, basestring):
resp = resp.splitlines()
nodes = []
for line in resp:
parts = line.split(' ', 8)
self_id, addr, flags, master_id, ping_sent, \
pong_recv, config_epoch, link_state = parts[:8]
host, ports = addr.rsplit(':', 1)
port, _, cluster_port = ports.partition('@')
node = {
'id': self_id,
'host': host or current_host,
'port': int(port),
'cluster-bus-port': int(cluster_port) if cluster_port else 10000 + int(port),
'flags': tuple(flags.split(',')),
'master': master_id if master_id != '-' else None,
'ping-sent': int(ping_sent),
'pong-recv': int(pong_recv),
'link-state': link_state,
'slots': [],
'migrations': [],
}
if len(parts) >= 9:
slots, migrations = parse_slots(parts[8])
node['slots'], node['migrations'] = tuple(slots), migrations
nodes.append(node)
return nodes
def parse_pubsub_channels(command, res, **options):
"""
Result callback, handles different return types
switchable by the `aggregate` flag.
"""
aggregate = options.get('aggregate', True)
if not aggregate:
return res
return merge_result(command, res)
def parse_pubsub_numpat(command, res, **options):
"""
Result callback, handles different return types
switchable by the `aggregate` flag.
"""
aggregate = options.get('aggregate', True)
if not aggregate:
return res
numpat = 0
for node, node_numpat in res.items():
numpat += node_numpat
return numpat
def parse_pubsub_numsub(command, res, **options):
"""
Result callback, handles different return types
switchable by the `aggregate` flag.
"""
aggregate = options.get('aggregate', True)
if not aggregate:
return res
numsub_d = dict()
for _, numsub_tups in res.items():
for channel, numsubbed in numsub_tups:
try:
numsub_d[channel] += numsubbed
except KeyError:
numsub_d[channel] = numsubbed
ret_numsub = []
for channel, numsub in numsub_d.items():
ret_numsub.append((channel, numsub))
return ret_numsub
|