1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# license you chose for the specific language governing permissions and
# limitations under that license.
#
import unittest
from testtools.compat import _b, BytesIO
import subunit.chunked
class TestDecode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = BytesIO()
self.decoder = subunit.chunked.Decoder(self.output)
def test_close_read_length_short_errors(self):
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_short_errors(self):
self.assertEqual(None, self.decoder.write(_b('2\r\na')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_buffered_data_errors(self):
self.assertEqual(None, self.decoder.write(_b('2\r')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_after_finished_stream_safe(self):
self.assertEqual(None, self.decoder.write(_b('2\r\nab')))
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.decoder.close()
def test_decode_nothing(self):
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.assertEqual(_b(''), self.output.getvalue())
def test_decode_serialised_form(self):
self.assertEqual(None, self.decoder.write(_b("F\r\n")))
self.assertEqual(None, self.decoder.write(_b("serialised\n")))
self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n")))
def test_decode_short(self):
self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n')))
self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_combines_short(self):
self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n')))
self.assertEqual(_b('abcdef'), self.output.getvalue())
def test_decode_excess_bytes_from_write(self):
self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_write_after_finished_errors(self):
self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
self.assertRaises(ValueError, self.decoder.write, _b(''))
def test_decode_hex(self):
self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n')))
self.assertEqual(_b('1234567890'), self.output.getvalue())
def test_decode_long_ranges(self):
self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
self.assertEqual(None, self.decoder.write(_b('1' * 65536)))
self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
self.assertEqual(None, self.decoder.write(_b('2' * 65536)))
self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue())
def test_decode_newline_nonstrict(self):
"""Tolerate chunk markers with no CR character."""
# From <http://pad.lv/505078>
self.decoder = subunit.chunked.Decoder(self.output, strict=False)
self.assertEqual(None, self.decoder.write(_b('a\n')))
self.assertEqual(None, self.decoder.write(_b('abcdeabcde')))
self.assertEqual(_b(''), self.decoder.write(_b('0\n')))
self.assertEqual(_b('abcdeabcde'), self.output.getvalue())
def test_decode_strict_newline_only(self):
"""Reject chunk markers with no CR character in strict mode."""
# From <http://pad.lv/505078>
self.assertRaises(ValueError,
self.decoder.write, _b('a\n'))
def test_decode_strict_multiple_crs(self):
self.assertRaises(ValueError,
self.decoder.write, _b('a\r\r\n'))
def test_decode_short_header(self):
self.assertRaises(ValueError,
self.decoder.write, _b('\n'))
class TestEncode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.output = BytesIO()
self.encoder = subunit.chunked.Encoder(self.output)
def test_encode_nothing(self):
self.encoder.close()
self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_empty(self):
self.encoder.write(_b(''))
self.encoder.close()
self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_short(self):
self.encoder.write(_b('abc'))
self.encoder.close()
self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue())
def test_encode_combines_short(self):
self.encoder.write(_b('abc'))
self.encoder.write(_b('def'))
self.encoder.close()
self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue())
def test_encode_over_9_is_in_hex(self):
self.encoder.write(_b('1234567890'))
self.encoder.close()
self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue())
def test_encode_long_ranges_not_combined(self):
self.encoder.write(_b('1' * 65536))
self.encoder.write(_b('2' * 65536))
self.encoder.close()
self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' +
'2' * 65536 + '0\r\n'), self.output.getvalue())
|