1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
|
import multiprocessing
import os
import select
import six
import socket
import time
import unittest
import bmemcached
from bmemcached.protocol import Protocol
class _CacheProxy(multiprocessing.Process):
def __init__(self, server, pipe, listen_port=None):
super(_CacheProxy, self).__init__()
self._listen_port = listen_port
self.server = server
self.pipe = pipe
def run(self):
listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_sock.setblocking(False)
listen_sock.bind((os.environ['MEMCACHED_HOST'], self._listen_port or 0))
listen_sock.listen(1)
# Tell our caller the (host, port) that we're listening on.
self.pipe.send(listen_sock.getsockname())
# Open a connection to the real memcache server.
if not self.server.startswith('/'):
host, port = Protocol.split_host_port(self.server)
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.connect((host, port))
else:
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_sock.connect(self.server)
# The connection to this server above is blocking, but reads and writes below are nonblocking.
server_sock.setblocking(False)
# listen_sock is the socket we're listening for connections on. We only handle
# a single connection at a time.
# client_sock is the connection we've accepted from listen_sock.
# server_sock is the connection to the actual server.
client_sock = None
# Data waiting to be sent to client_sock:
data_for_client = b''
# Data waiting to be sent to server_sock:
data_for_server = b''
while True:
read_sockets = [listen_sock]
write_sockets = []
if client_sock:
# Only add client_sock to read_sockets if we don't already have data
# from it waiting to be sent to the real server.
if not data_for_server:
read_sockets.append(client_sock)
# Only add client_sock to write_sockets if we have data to send.
if data_for_client:
write_sockets.append(client_sock)
if not data_for_client:
read_sockets.append(server_sock)
if data_for_server:
write_sockets.append(server_sock)
r, w, _ = select.select(read_sockets, write_sockets, [])
if listen_sock in r:
if client_sock:
client_sock.close()
client_sock, client_addr = listen_sock.accept()
client_sock.setblocking(False)
if server_sock in r:
data_for_client += server_sock.recv(1024)
if client_sock in r:
data_for_server += client_sock.recv(1024)
if server_sock in w:
bytes_written = server_sock.send(data_for_server)
data_for_server = data_for_server[bytes_written:]
if client_sock in w:
bytes_written = client_sock.send(data_for_client)
data_for_client = data_for_client[bytes_written:]
class MemcachedTests(unittest.TestCase):
def setUp(self):
self._proxy_port = None
# Start a helper to proxy requests to the actual memcache server. This uses a
# process instead of a thread, so we can simply kill the process between tests.
self._start_proxy()
self._stop_proxy()
self._start_proxy()
self.client = bmemcached.Client(self.server, 'user', 'password')
# Disable retry delays, so we can disconnect and reconnect from the
# server without needing to put delays in most of the tests.
self.client.enable_retry_delay(False)
# Clean up from any previous tests.
self.client.delete('test_key')
self.client.delete('test_key2')
def _server_host(self):
return '{}:11211'.format(os.environ['MEMCACHED_HOST'])
def _start_proxy(self):
# Start the proxy. If this isn't the first time we've started the proxy,
# use the same port we got the first time around.
parent_pipe, child_pipe = multiprocessing.Pipe()
self._proxy_thread = _CacheProxy(self._server_host(), child_pipe, self._proxy_port)
self._proxy_thread.start()
# Read the port the server is actually listening on. If we supplied a port, it
# will always be the same. This also guarantees that the process is listening on
# the port before we continue and try to connect to it.
sockname = parent_pipe.recv()
self._proxy_port = sockname[1]
self.server = '%s:%i' % sockname
def _stop_proxy(self):
if not self._proxy_thread:
return
# Kill the proxy, which causes communication to the server to fail.
self._proxy_thread.terminate()
self._proxy_thread.join()
self._proxy_thread = None
def tearDown(self):
self.client.disconnect_all()
self._stop_proxy()
def testSet(self):
self.assertTrue(self.client.set('test_key', 'test'))
self._stop_proxy()
self.assertFalse(self.client.set('test_key', 'test'))
def testSetMulti(self):
six.assertCountEqual(self, self.client.set_multi({
'test_key': 'value',
'test_key2': 'value2'}), [])
self._stop_proxy()
six.assertCountEqual(self, self.client.set_multi({
'test_key': 'value',
'test_key2': 'value2'}), ['test_key', 'test_key2'])
def testGet(self):
self.client.set('test_key', 'test')
self.assertEqual('test', self.client.get('test_key'))
# If the server is offline, get always returns None.
self._stop_proxy()
self.assertTrue(self.client.get('test_key') is None)
# After the server comes back online, gets will resume.
self._start_proxy()
self.assertEqual('test', self.client.get('test_key'))
def testRetryDelay(self):
# Test delaying retries. We only enable retry delays for this test, since we
# need to pause to test it, which slows down the test.
self.client._set_retry_delay(0.25)
self.client.set('test_key', 'test')
self.assertEqual('test', self.client.get('test_key'))
# If the server is offline, get always returns None. This request will cause
# the client to notice that the connection is offline, but not to retry the
# request.
self._stop_proxy()
self.assertTrue(self.client.get('test_key') is None)
# If we start the proxy again now, it'll reconnect immediately without any delay.
self._start_proxy()
self.assertEqual('test', self.client.get('test_key'))
# Stop the proxy again, and make another request to cause the client to notice the
# disconnection.
self._stop_proxy()
self.assertTrue(self.client.get('test_key') is None)
# Make another request. As above, the client will attempt a reconnection here, but
# the server is still offline so it'll fail. This will cause the retry delay to
# kick in.
# After the server comes back online, gets will continue to return None for 0.25
# second, since delays are still deferred.
self.assertTrue(self.client.get('test_key') is None)
# Start the server. This time, attempting to read from the server won't cause a
# connection attempt, because we're still delaying.
self._start_proxy()
self.assertTrue(self.client.get('test_key') is None)
# Sleep until the retry delay has elapsed, and verify that we connect to the server
# this time.
time.sleep(0.3)
self.assertEqual('test', self.client.get('test_key'))
def testGetMulti(self):
six.assertCountEqual(self, self.client.set_multi({
'test_key': 'value',
'test_key2': 'value2'
}), [])
self.assertEqual({'test_key': 'value', 'test_key2': 'value2'},
self.client.get_multi(['test_key', 'test_key2']))
self._stop_proxy()
self.assertEqual({}, self.client.get_multi(['test_key', 'test_key2']))
self._start_proxy()
self.assertEqual({'test_key': 'value', 'test_key2': 'value2'},
self.client.get_multi(['test_key', 'test_key2']))
def testDelete(self):
self._stop_proxy()
self.assertFalse(self.client.delete('test_key'))
def testAdd(self):
self._stop_proxy()
self.assertFalse(self.client.add('test_key', 'test'))
def testReplace(self):
self._stop_proxy()
self.assertFalse(self.client.replace('test_key', 'value2'))
def testIncrement(self):
self._stop_proxy()
self.assertEqual(0, self.client.incr('test_key', 1))
self.assertEqual(0, self.client.incr('test_key', 1))
def testDecrement(self):
self._stop_proxy()
self.assertEqual(0, self.client.decr('test_key', 1))
def testFlush(self):
self._stop_proxy()
self.assertTrue(self.client.flush_all())
def testStats(self):
self._stop_proxy()
stats = self.client.stats()[self.server]
self.assertEqual(stats, {})
class SocketMemcachedTests(MemcachedTests):
"""
Same tests as above, just make sure it works with sockets.
"""
def _server_host(self):
return '/tmp/memcached.sock'
|