1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
# -*- encoding: utf-8 -*-
"""
pika.data tests
"""
import datetime
import decimal
import unittest
from collections import OrderedDict
from pika.compat import PY2, PY3
from pika import data, exceptions
from pika.compat import long
class DataTests(unittest.TestCase):
FIELD_TBL_ENCODED = (
b'\x00\x00\x00\xef'
b'\x05arrayA\x00\x00\x00\x0fI\x00\x00\x00\x01I'
b'\x00\x00\x00\x02I\x00\x00\x00\x03'
b'\x07boolvalt\x01'
b'\x07decimalD\x02\x00\x00\x01:'
b'\x0bdecimal_tooD\x00\x00\x00\x00d'
b'\x07dictvalF\x00\x00\x00\x0c\x03fooS\x00\x00\x00\x03bar'
b'\x06intvalI\x00\x00\x00\x01'
b'\x06bigint\x6c\x00\x00\x00\x00\x9a\x7e\xc8\x00'
b'\x07longval\x6c\x00\x00\x00\x00\x36\x65\x26\x55'
b'\x09maxLLUINTl\xff\xff\xff\xff\xff\xff\xff\xff'
b'\x04nullV'
b'\x06strvalS\x00\x00\x00\x04Test'
b'\x0ctimestampvalT\x00\x00\x00\x00Ec)\x92'
b'\x07unicodeS\x00\x00\x00\x08utf8=\xe2\x9c\x93'
)
FIELD_TBL_ENCODED += b'\x05bytesx\x00\x00\x00\x06foobar' if PY3 else b'\x05bytesS\x00\x00\x00\x06foobar'
FIELD_TBL_VALUE = OrderedDict(
[
('array', [1, 2, 3]),
('boolval', True),
('decimal', decimal.Decimal('3.14')),
('decimal_too', decimal.Decimal('100')),
('dictval', { 'foo': 'bar' }),
('intval', 1),
('bigint', 2592000000),
('longval', long(912598613)),
('maxLLUINT', long(2**64-1)),
('null', None),
('strval', 'Test'),
('timestampval', datetime.datetime(2006, 11, 21, 16, 30, 10)),
('unicode', u'utf8=✓'),
('bytes', b'foobar'),
])
def test_decode_bytes(self):
input = (
b'\x00\x00\x00\x01'
b'\x05bytesx\x00\x00\x00\x06foobar'
)
result = data.decode_table(input, 0)
self.assertEqual(result, ({'bytes': b'foobar'}, 21))
# b'\x08shortints\x04\xd2'
# ('shortint', 1234),
def test_decode_shortint(self):
input = (
b'\x00\x00\x00\x01'
b'\x08shortints\x04\xd2'
)
result = data.decode_table(input, 0)
self.assertEqual(result, ({'shortint': 1234}, 16))
def test_encode_table(self):
result = []
data.encode_table(result, self.FIELD_TBL_VALUE)
self.assertEqual(b''.join(result), self.FIELD_TBL_ENCODED)
def test_encode_table_bytes(self):
result = []
byte_count = data.encode_table(result, self.FIELD_TBL_VALUE)
self.assertEqual(byte_count, 243)
def test_decode_table(self):
value, byte_count = data.decode_table(self.FIELD_TBL_ENCODED, 0)
self.assertDictEqual(value, self.FIELD_TBL_VALUE)
def test_decode_table_bytes(self):
value, byte_count = data.decode_table(self.FIELD_TBL_ENCODED, 0)
self.assertEqual(byte_count, 243)
def test_encode_raises(self):
self.assertRaises(exceptions.UnsupportedAMQPFieldException,
data.encode_table, [], {'foo': {1, 2, 3}})
def test_decode_raises(self):
self.assertRaises(exceptions.InvalidFieldTypeException,
data.decode_table,
b'\x00\x00\x00\t\x03fooZ\x00\x00\x04\xd2', 0)
def test_long_repr(self):
value = long(912598613)
self.assertEqual(repr(value), '912598613L')
|