1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
#!/usr/bin/env python
"""Unit tests for M2Crypto.BIO.MemoryBuffer.
Copyright (c) 2000 Ng Pheng Siong. All rights reserved."""
import os
import multiprocessing
from platform import system
from sys import version_info
from M2Crypto import m2
from M2Crypto.BIO import MemoryBuffer
from tests import unittest
class TimeLimitExpired(Exception):
pass
def time_limit(timeout, func, exc_msg, *args, **kwargs):
# multiprocessing.get_context() available in Python >= 3.4
# Python >=3.8 on MacOS changed start_method to 'spawn' as default.
# This creates a new context with the previous 'fork'
# start_method. Fixes issue #286.
if system() == "Darwin" and version_info >= (3, 8):
start_method = "fork"
else:
# use default context
start_method = None
mp = multiprocessing.get_context(start_method)
p = mp.Process(target=func)
p.start()
p.join(timeout)
if p.is_alive():
p.terminate()
raise TimeLimitExpired(exc_msg)
class MemoryBufferTestCase(unittest.TestCase):
def setUp(self):
self.data = b"abcdef" * 64
def tearDown(self):
pass
def test_init_empty(self):
mb = MemoryBuffer()
self.assertEqual(len(mb), 0)
out = mb.read()
assert out is None
def test_init_empty_cm(self):
with MemoryBuffer() as mb:
self.assertEqual(len(mb), 0)
out = mb.read()
assert out is None
def test_init_something(self):
mb = MemoryBuffer(self.data)
self.assertEqual(len(mb), len(self.data))
out = mb.read()
self.assertEqual(out, self.data)
def test_init_something_result_bytes(self):
mb = MemoryBuffer(self.data)
self.assertEqual(len(mb), len(self.data))
out = mb.read()
self.assertIsInstance(out, bytes)
def test_init_something_cm(self):
with MemoryBuffer(self.data) as mb:
self.assertEqual(len(mb), len(self.data))
out = mb.read()
self.assertEqual(out, self.data)
def test_read_less_than(self):
chunk = len(self.data) - 7
mb = MemoryBuffer(self.data)
out = mb.read(chunk)
self.assertEqual(out, self.data[:chunk])
self.assertEqual(len(mb), (len(self.data)) - chunk)
def test_read_more_than(self):
chunk = len(self.data) + 8
mb = MemoryBuffer(self.data)
out = mb.read(chunk)
self.assertEqual(out, self.data)
self.assertEqual(len(mb), 0)
def test_write_close(self):
mb = MemoryBuffer(self.data)
assert mb.writeable()
mb.write_close()
assert mb.readable()
with self.assertRaises(IOError):
mb.write(self.data)
assert not mb.writeable()
def test_closed(self):
mb = MemoryBuffer(self.data)
mb.close()
with self.assertRaises(IOError):
mb.write(self.data)
m2.err_clear_error()
assert mb.readable() and not mb.writeable()
def test_readline(self):
# test against possible endless loop
# http://stackoverflow.com/questions/9280550/
timeout_secs = 10
time_limit(
timeout_secs,
run_test,
"The readline() should not timeout!",
)
def run_test(*args, **kwargs):
sep = os.linesep.encode()
with MemoryBuffer(b"hello" + sep + b"world" + sep) as mb:
assert mb.readable()
assert mb.readline() == b"hello" + sep
assert mb.readline() == b"world" + sep
with MemoryBuffer(b"hello" + sep + b"world" + sep) as mb:
assert mb.readlines() == [b"hello" + sep, b"world" + sep]
def suite():
return unittest.TestLoader().loadTestsFromTestCase(MemoryBufferTestCase)
if __name__ == "__main__":
unittest.TextTestRunner().run(suite())
|