1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# -*- coding: utf-8 -*-
import struct
import six
from . import unittest
import kafka.common
import kafka.util
class UtilTest(unittest.TestCase):
@unittest.skip("Unwritten")
def test_relative_unpack(self):
pass
def test_write_int_string(self):
self.assertEqual(
kafka.util.write_int_string(b'some string'),
b'\x00\x00\x00\x0bsome string'
)
def test_write_int_string__unicode(self):
with self.assertRaises(TypeError) as cm:
kafka.util.write_int_string(u'unicode')
#: :type: TypeError
te = cm.exception
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_int_string__empty(self):
self.assertEqual(
kafka.util.write_int_string(b''),
b'\x00\x00\x00\x00'
)
def test_write_int_string__null(self):
self.assertEqual(
kafka.util.write_int_string(None),
b'\xff\xff\xff\xff'
)
def test_read_int_string(self):
self.assertEqual(kafka.util.read_int_string(b'\xff\xff\xff\xff', 0), (None, 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x00', 0), (b'', 4))
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))
def test_read_int_string__insufficient_data(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)
def test_write_short_string(self):
self.assertEqual(
kafka.util.write_short_string(b'some string'),
b'\x00\x0bsome string'
)
def test_write_short_string__unicode(self):
with self.assertRaises(TypeError) as cm:
kafka.util.write_short_string(u'hello')
#: :type: TypeError
te = cm.exception
if six.PY2:
self.assertIn('unicode', str(te))
else:
self.assertIn('str', str(te))
self.assertIn('to be bytes', str(te))
def test_write_short_string__empty(self):
self.assertEqual(
kafka.util.write_short_string(b''),
b'\x00\x00'
)
def test_write_short_string__null(self):
self.assertEqual(
kafka.util.write_short_string(None),
b'\xff\xff'
)
def test_write_short_string__too_long(self):
with self.assertRaises(struct.error):
kafka.util.write_short_string(b' ' * 33000)
def test_read_short_string(self):
self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
def test_read_int_string__insufficient_data2(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.read_int_string('\x00\x021', 0)
def test_relative_unpack2(self):
self.assertEqual(
kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
((1, 0), 4)
)
def test_relative_unpack3(self):
with self.assertRaises(kafka.common.BufferUnderflowError):
kafka.util.relative_unpack('>hh', '\x00', 0)
def test_group_by_topic_and_partition(self):
t = kafka.common.TopicAndPartition
l = [
t("a", 1),
t("a", 2),
t("a", 3),
t("b", 3),
]
self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
"a": {
1: t("a", 1),
2: t("a", 2),
3: t("a", 3),
},
"b": {
3: t("b", 3),
}
})
# should not be able to group duplicate topic-partitions
t1 = t("a", 1)
with self.assertRaises(AssertionError):
kafka.util.group_by_topic_and_partition([t1, t1])
|