1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
|
import struct
import unittest
from gearman import protocol
from gearman.connection import GearmanConnection
from gearman.constants import JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ConnectionError, ServerUnavailable, ProtocolError
from tests._core_testing import _GearmanAbstractTest
class ProtocolBinaryCommandsTest(unittest.TestCase):
#######################
# Begin parsing tests #
#######################
def test_parsing_errors(self):
malformed_command_buffer = "%sAAAABBBBCCCC"
# Raise malformed magic exceptions
self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % "DDDD")
self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_RES_STRING, is_response=False)
self.assertRaises(ProtocolError, protocol.parse_binary_command, malformed_command_buffer % protocol.MAGIC_REQ_STRING, is_response=True)
# Raise unknown command errors
unassigned_gearman_command = 1234
unknown_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, unassigned_gearman_command, 0)
self.assertRaises(ProtocolError, protocol.parse_binary_command, unknown_command_buffer)
# Raise an error on our imaginary GEARMAN_COMMAND_TEXT_COMMAND
imaginary_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_TEXT_COMMAND, 4, 'ABCD')
self.assertRaises(ProtocolError, protocol.parse_binary_command, imaginary_command_buffer)
# Raise an error on receiving an unexpected payload
unexpected_payload_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 4, 'ABCD')
self.assertRaises(ProtocolError, protocol.parse_binary_command, unexpected_payload_command_buffer)
def test_parsing_request(self):
# Test parsing a request for a job (server side parsing)
grab_job_command_buffer = struct.pack('!4sII', protocol.MAGIC_REQ_STRING, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ, 0)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(grab_job_command_buffer, is_response=False)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_GRAB_JOB_UNIQ)
self.assertEquals(cmd_args, dict())
self.assertEquals(cmd_len, len(grab_job_command_buffer))
def test_parsing_without_enough_data(self):
# Test that we return with nothing to do... received a partial packet
not_enough_data_command_buffer = struct.pack('!4s', protocol.MAGIC_RES_STRING)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
self.assertEquals(cmd_len, 0)
# Test that we return with nothing to do... received a partial packet (expected binary payload of size 4, got 0)
not_enough_data_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(not_enough_data_command_buffer)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_args, None)
self.assertEquals(cmd_len, 0)
def test_parsing_no_args(self):
noop_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_NOOP, 0)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(noop_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_NOOP)
self.assertEquals(cmd_args, dict())
self.assertEquals(cmd_len, len(noop_command_buffer))
def test_parsing_single_arg(self):
echoed_string = 'abcd'
echo_command_buffer = struct.pack('!4sII4s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, echoed_string)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
self.assertEquals(cmd_len, len(echo_command_buffer))
def test_parsing_single_arg_with_extra_data(self):
echoed_string = 'abcd'
excess_bytes = 5
excess_data = echoed_string + (protocol.NULL_CHAR * excess_bytes)
excess_echo_command_buffer = struct.pack('!4sII9s', protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_ECHO_RES, 4, excess_data)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(excess_echo_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_ECHO_RES)
self.assertEquals(cmd_args, dict(data=echoed_string))
self.assertEquals(cmd_len, len(excess_echo_command_buffer) - excess_bytes)
def test_parsing_multiple_args(self):
# Tests ordered argument processing and proper NULL_CHAR splitting
expected_data = protocol.NULL_CHAR * 4
binary_payload = protocol.NULL_CHAR.join(['test', 'function', 'identifier', expected_data])
payload_size = len(binary_payload)
uniq_command_buffer = struct.pack('!4sII%ds' % payload_size, protocol.MAGIC_RES_STRING, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ, payload_size, binary_payload)
cmd_type, cmd_args, cmd_len = protocol.parse_binary_command(uniq_command_buffer)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
self.assertEquals(cmd_args, dict(job_handle='test', task='function', unique='identifier', data=expected_data))
self.assertEquals(cmd_len, len(uniq_command_buffer))
#######################
# Begin packing tests #
#######################
def test_packing_errors(self):
# Assert we get an unknown command
cmd_type = 1234
cmd_args = dict()
self.assertRaises(ProtocolError, protocol.pack_binary_command, cmd_type, cmd_args)
# Assert we get a fake command
cmd_type = protocol.GEARMAN_COMMAND_TEXT_COMMAND
cmd_args = dict()
self.assertRaises(ProtocolError, protocol.pack_binary_command, cmd_type, cmd_args)
# Assert we get arg mismatch, got 1, expecting 0
cmd_type = protocol.GEARMAN_COMMAND_GRAB_JOB
cmd_args = dict(extra='arguments')
self.assertRaises(ProtocolError, protocol.pack_binary_command, cmd_type, cmd_args)
# Assert we get arg mismatch, got 0, expecting 1
cmd_type = protocol.GEARMAN_COMMAND_JOB_CREATED
cmd_args = dict()
self.assertRaises(ProtocolError, protocol.pack_binary_command, cmd_type, cmd_args)
# Assert we get arg mismatch (name), got 1, expecting 1
cmd_type = protocol.GEARMAN_COMMAND_JOB_CREATED
cmd_args = dict(extra='arguments')
self.assertRaises(ProtocolError, protocol.pack_binary_command, cmd_type, cmd_args)
# Assert we get a non-string argument
cmd_type = protocol.GEARMAN_COMMAND_JOB_CREATED
cmd_args = dict(job_handle=12345)
self.assertRaises(ProtocolError, protocol.pack_binary_command, cmd_type, cmd_args)
# Assert we get a non-string argument (expecting BYTES)
cmd_type = protocol.GEARMAN_COMMAND_JOB_CREATED
cmd_args = dict(job_handle=unicode(12345))
self.assertRaises(ProtocolError, protocol.pack_binary_command, cmd_type, cmd_args)
def test_packing_response(self):
# Test packing a response for a job (server side packing)
cmd_type = protocol.GEARMAN_COMMAND_NO_JOB
cmd_args = dict()
expected_command_buffer = struct.pack('!4sII', protocol.MAGIC_RES_STRING, cmd_type, 0)
packed_command_buffer = protocol.pack_binary_command(cmd_type, cmd_args, is_response=True)
self.assertEquals(packed_command_buffer, expected_command_buffer)
def test_packing_no_arg(self):
cmd_type = protocol.GEARMAN_COMMAND_NOOP
cmd_args = dict()
expected_command_buffer = struct.pack('!4sII', protocol.MAGIC_REQ_STRING, cmd_type, 0)
packed_command_buffer = protocol.pack_binary_command(cmd_type, cmd_args)
self.assertEquals(packed_command_buffer, expected_command_buffer)
def test_packing_single_arg(self):
cmd_type = protocol.GEARMAN_COMMAND_ECHO_REQ
cmd_args = dict(data='abcde')
expected_payload_size = len(cmd_args['data'])
expected_format = '!4sII%ds' % expected_payload_size
expected_command_buffer = struct.pack(expected_format, protocol.MAGIC_REQ_STRING, cmd_type, expected_payload_size, cmd_args['data'])
packed_command_buffer = protocol.pack_binary_command(cmd_type, cmd_args)
self.assertEquals(packed_command_buffer, expected_command_buffer)
def test_packing_multiple_args(self):
cmd_type = protocol.GEARMAN_COMMAND_SUBMIT_JOB
cmd_args = dict(task='function', unique='12345', data='abcd')
ordered_parameters = [cmd_args['task'], cmd_args['unique'], cmd_args['data']]
expected_payload = protocol.NULL_CHAR.join(ordered_parameters)
expected_payload_size = len(expected_payload)
expected_format = '!4sII%ds' % expected_payload_size
expected_command_buffer = struct.pack(expected_format, protocol.MAGIC_REQ_STRING, cmd_type, expected_payload_size, expected_payload)
packed_command_buffer = protocol.pack_binary_command(cmd_type, cmd_args)
self.assertEquals(packed_command_buffer, expected_command_buffer)
class ProtocolTextCommandsTest(unittest.TestCase):
#######################
# Begin parsing tests #
#######################
def test_parsing_errors(self):
received_data = "Hello\x00there\n"
self.assertRaises(ProtocolError, protocol.parse_text_command, received_data)
def test_parsing_without_enough_data(self):
received_data = "Hello there"
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, None)
self.assertEquals(cmd_response, None)
self.assertEquals(cmd_len, 0)
def test_parsing_single_line(self):
received_data = "Hello there\n"
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
self.assertEquals(cmd_response, dict(raw_text=received_data.strip()))
self.assertEquals(cmd_len, len(received_data))
def test_parsing_multi_line(self):
sentence_one = "Hello there\n"
sentence_two = "My name is bob\n"
received_data = sentence_one + sentence_two
cmd_type, cmd_response, cmd_len = protocol.parse_text_command(received_data)
self.assertEquals(cmd_type, protocol.GEARMAN_COMMAND_TEXT_COMMAND)
self.assertEquals(cmd_response, dict(raw_text=sentence_one.strip()))
self.assertEquals(cmd_len, len(sentence_one))
def test_packing_errors(self):
# Test bad command type
cmd_type = protocol.GEARMAN_COMMAND_NOOP
cmd_args = dict()
self.assertRaises(ProtocolError, protocol.pack_text_command, cmd_type, cmd_args)
# Test missing args
cmd_type = protocol.GEARMAN_COMMAND_TEXT_COMMAND
cmd_args = dict()
self.assertRaises(ProtocolError, protocol.pack_text_command, cmd_type, cmd_args)
# Test misnamed parameter dict
cmd_type = protocol.GEARMAN_COMMAND_TEXT_COMMAND
cmd_args = dict(bad_text='abcdefghij')
self.assertRaises(ProtocolError, protocol.pack_text_command, cmd_type, cmd_args)
#######################
# Begin packing tests #
#######################
def test_packing_single_line(self):
expected_string = 'Hello world'
cmd_type = protocol.GEARMAN_COMMAND_TEXT_COMMAND
cmd_args = dict(raw_text=expected_string)
packed_command = protocol.pack_text_command(cmd_type, cmd_args)
self.assertEquals(packed_command, expected_string)
class GearmanConnectionTest(unittest.TestCase):
"""Tests the base CommandHandler class that underpins all other CommandHandlerTests"""
def test_recv_command(self):
pass
class GearmanCommandHandlerTest(_GearmanAbstractTest):
"""Tests the base CommandHandler class that underpins all other CommandHandlerTests"""
def _test_recv_command(self):
# recv_echo_res and recv_error are predefined on the CommandHandler
self.command_handler.recv_command(protocol.GEARMAN_COMMAND_NOOP)
self.assert_recv_command(protocol.GEARMAN_COMMAND_NOOP)
# The mock handler never implemented 'recv_all_yours' so we should get an attribute error here
self.assertRaises(ValueError, self.command_handler.recv_command, protocol.GEARMAN_COMMAND_ALL_YOURS)
def _test_send_command(self):
self.command_handler.send_command(protocol.GEARMAN_COMMAND_NOOP)
self.assert_sent_command(protocol.GEARMAN_COMMAND_NOOP)
# The mock handler never implemented 'recv_all_yours' so we should get an attribute error here
self.command_handler.send_command(protocol.GEARMAN_COMMAND_ECHO_REQ, text='hello world')
self.assert_sent_command(protocol.GEARMAN_COMMAND_ECHO_REQ, text='hello world')
def assert_recv_command(self, expected_cmd_type, **expected_cmd_args):
cmd_type, cmd_args = self.command_handler.recv_command_queue.popleft()
self.assert_commands_equal(cmd_type, expected_cmd_type)
self.assertEqual(cmd_args, expected_cmd_args)
def assert_sent_command(self, expected_cmd_type, **expected_cmd_args):
# All commands should be sent via the CommandHandler
handler_cmd_type, handler_cmd_args = self.command_handler.sent_command_queue.popleft()
self.assert_commands_equal(handler_cmd_type, expected_cmd_type)
self.assertEqual(handler_cmd_args, expected_cmd_args)
super(GearmanCommandHandlerTest, self).assert_sent_command(expected_cmd_type, **expected_cmd_args)
if __name__ == '__main__':
unittest.main()
|