1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
#!/usr/bin/env python
"""Unit tests for M2Crypto.BIO.File.
Copyright (c) 1999-2002 Ng Pheng Siong. All rights reserved."""
import logging
import os
import platform
import tempfile
import ctypes
if platform.system() == "Windows":
import ctypes.wintypes
from M2Crypto.BIO import File, openfile
from tests import unittest
log = logging.getLogger(__name__)
def getCountProcHandles():
PROCESS_QUERY_INFORMATION = 0x400
handle = ctypes.windll.kernel32.OpenProcess(
PROCESS_QUERY_INFORMATION, 0, os.getpid()
)
hndcnt = ctypes.wintypes.DWORD()
ctypes.windll.kernel32.GetProcessHandleCount(handle, ctypes.byref(hndcnt))
sys_value = hndcnt.value
ctypes.windll.kernel32.CloseHandle(handle)
return sys_value + 1
class FileTestCase(unittest.TestCase):
def setUp(self):
self.data = b"abcdef" * 64
self.fd, self.fname = tempfile.mkstemp()
if platform.system() in ["Linux", "Darwin", "FreeBSD"]:
self.__dev_fd = "/dev/fd/"
self.fd_count = self.__mfd()
def __mfd(self):
if hasattr(self, "__dev_fd"):
return len(os.listdir(self.__dev_fd))
elif platform.system() == "Windows":
return getCountProcHandles()
else:
return None
def tearDown(self):
self.assertEqual(
self.fd_count,
self.__mfd(),
"last test did not close all file descriptors properly",
)
try:
os.close(self.fd)
except OSError:
pass
def test_openfile_rb(self):
# First create the file using Python's open().
with open(self.fname, "wb") as f:
f.write(self.data)
# Now open the file using M2Crypto.BIO.openfile().
with openfile(self.fname, "rb") as f:
data = f.read(len(self.data))
self.assertEqual(data, self.data)
def test_openfile_wb(self):
# First create the file using M2Crypto.BIO.openfile().
with openfile(self.fname, "wb") as f:
f.write(self.data)
# Now open the file using Python's open().
with open(self.fname, "rb") as f:
data = f.read(len(self.data))
self.assertEqual(data, self.data)
def test_closed(self):
f = openfile(self.fname, "wb")
f.write(self.data)
f.close()
with self.assertRaises(IOError):
f.write(self.data)
def test_use_pyfile(self):
# First create the file.
with open(self.fname, "wb") as f:
f2 = File(f)
f2.write(self.data)
f2.close()
# Now read the file.
with open(self.fname, "rb") as f:
in_data = f.read(len(self.data))
self.assertEqual(len(in_data), len(self.data))
self.assertEqual(in_data, self.data)
def test_readline_bin(self):
with open(self.fname, "wb") as f:
f.write(b"hello\nworld\n")
with openfile(self.fname, "rb") as f:
self.assertTrue(f.readable())
self.assertEqual(f.readline(), b"hello\n")
self.assertEqual(f.readline(), b"world\n")
with openfile(self.fname, "rb") as f:
self.assertEqual(f.readlines(), [b"hello\n", b"world\n"])
def test_readline(self):
sep = os.linesep.encode()
with open(self.fname, "w") as f:
f.write("hello\nworld\n")
with openfile(self.fname, "rb") as f:
self.assertTrue(f.readable())
self.assertEqual(f.readline(), b"hello" + sep)
self.assertEqual(f.readline(), b"world" + sep)
with openfile(self.fname, "rb") as f:
self.assertEqual(f.readlines(), [b"hello" + sep, b"world" + sep])
def test_tell_seek(self):
with open(self.fname, "w") as f:
f.write("hello world")
with openfile(self.fname, "r") as f:
# Seek absolute
f.seek(6)
self.assertEqual(f.tell(), 6)
def suite():
return unittest.TestLoader().loadTestsFromTestCase(FileTestCase)
if __name__ == "__main__":
unittest.TextTestRunner().run(suite())
|