1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
# -*- coding: utf-8 -*-
import codecs
import errno
import os
import shutil
import sys
import tempfile
import unittest
from dirq import QueueBase
from dirq.QueueBase import QueueBase as QueueBaseClass
__all__ = ['TestQueueBase', 'TestQueueBaseModuleFunctions']
class TestDirQueue(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp(prefix='dirq-base')
self.qdir = self.tempdir + '/dirq'
self.tempfile = self.tempdir + '/file'
def tearDown(self):
shutil.rmtree(self.tempdir, ignore_errors=True)
class TestQueueBase(TestDirQueue):
def test1init(self):
'QueueBase.__init__()'
path = self.tempdir + '/aaa/bbb/ccc'
QueueBaseClass(path, umask=None)
assert os.path.exists(path) is True
class TestQueueBaseModuleFunctions(TestDirQueue):
def test1_special_mkdir(self):
'QueueBase._special_mkdir()'
assert QueueBase._special_mkdir(self.qdir) is True
assert QueueBase._special_mkdir(self.qdir) is False
# test against a file
test_file = os.path.join(self.qdir, 'foo')
open(test_file, 'w').write('bar')
self.assertRaises(OSError, QueueBase._special_mkdir, (test_file))
def test2_name(self):
'QueueBase._name()'
n = QueueBase._name(7)
assert len(n) == 14
assert n.endswith('%01x' % 7)
def test3_file_create(self):
'QueueBase._file_create()'
# File in non existent directory should produce ENOENT.
fn = os.getcwd() + "/nodir/nofile"
try:
QueueBase._file_create(fn, 0, False)
except OSError:
error = sys.exc_info()[1]
assert error.errno == errno.ENOENT
QueueBase._file_create(self.tempfile, 0, False)
self.assertRaises(OSError,
QueueBase._file_create,
*(self.tempfile, 0, False))
os.unlink(self.tempfile)
# utf8 data
QueueBase._file_create(self.tempfile, 0, True)
self.assertRaises(OSError,
QueueBase._file_create,
*(self.tempfile, 0, True))
def test4_file_write(self):
'QueueBase._file_write()'
QueueBase._file_write(self.tempfile, 0, False, 'a\n'.encode())
os.unlink(self.tempfile)
QueueBase._file_write(self.tempfile, 0,
False, ('a' * (2 ** 10) * 10).encode())
os.unlink(self.tempfile)
for t in [1, [], (), {}, object]:
self.assertRaises(TypeError, QueueBase._file_write, ('', t))
def test5_file_read(self):
'QueueBase._file_read()'
text = 'hello\n'.encode()
open(self.tempfile, 'wb').write(text)
text_in = QueueBase._file_read(self.tempfile, False)
self.assertEqual(text, text_in)
# utf8
try:
text = 'Élément \u263A\n'.decode("utf-8")
except AttributeError:
text = 'Élément \u263A\n'
codecs.open(self.tempfile, 'w', 'utf8').write(text)
text_in = QueueBase._file_read(self.tempfile, True)
self.assertEqual(text, text_in)
def main():
testcases = [TestQueueBase,
TestQueueBaseModuleFunctions]
for tc in testcases:
unittest.TextTestRunner(verbosity=2).\
run(unittest.TestLoader().loadTestsFromTestCase(tc))
if __name__ == "__main__":
main()
|