1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
# coding: utf-8
from __future__ import print_function, division, absolute_import
import gzip
import os
import random
import sys
import signal
from contextlib import contextmanager
from nose.tools import raises
from xopen import xopen
base = "tests/file.txt"
files = [ base + ext for ext in ['', '.gz', '.bz2' ] ]
try:
import lzma
files.append(base + '.xz')
except ImportError:
lzma = None
major, minor = sys.version_info[0:2]
@contextmanager
def temporary_path(name):
directory = os.path.join(os.path.dirname(__file__), 'testtmp')
if not os.path.isdir(directory):
os.mkdir(directory)
path = os.path.join(directory, name)
yield path
os.remove(path)
def test_xopen_text():
for name in files:
with xopen(name, 'rt') as f:
lines = list(f)
assert len(lines) == 2
assert lines[1] == 'The second line.\n', name
def test_xopen_binary():
for name in files:
with xopen(name, 'rb') as f:
lines = list(f)
assert len(lines) == 2
assert lines[1] == b'The second line.\n', name
def test_no_context_manager_text():
for name in files:
f = xopen(name, 'rt')
lines = list(f)
assert len(lines) == 2
assert lines[1] == 'The second line.\n', name
f.close()
assert f.closed
def test_no_context_manager_binary():
for name in files:
f = xopen(name, 'rb')
lines = list(f)
assert len(lines) == 2
assert lines[1] == b'The second line.\n', name
f.close()
assert f.closed
@raises(IOError)
def test_nonexisting_file():
with xopen('this-file-does-not-exist') as f:
pass
@raises(IOError)
def test_nonexisting_file_gz():
with xopen('this-file-does-not-exist.gz') as f:
pass
@raises(IOError)
def test_nonexisting_file_bz2():
with xopen('this-file-does-not-exist.bz2') as f:
pass
if lzma:
@raises(IOError)
def test_nonexisting_file_xz():
with xopen('this-file-does-not-exist.xz') as f:
pass
@raises(IOError)
def test_write_to_nonexisting_dir():
with xopen('this/path/does/not/exist/file.txt', 'w') as f:
pass
@raises(IOError)
def test_write_to_nonexisting_dir_gz():
with xopen('this/path/does/not/exist/file.gz', 'w') as f:
pass
@raises(IOError)
def test_write_to_nonexisting_dir_bz2():
with xopen('this/path/does/not/exist/file.bz2', 'w') as f:
pass
if lzma:
@raises(IOError)
def test_write_to_nonexisting_dir():
with xopen('this/path/does/not/exist/file.xz', 'w') as f:
pass
def test_append():
for ext in ["", ".gz"]: # BZ2 does NOT support append
text = "AB"
if ext != "":
text = text.encode("utf-8") # On Py3, need to send BYTES, not unicode
reference = text + text
with temporary_path('truncated.fastq' + ext) as path:
try:
os.unlink(path)
except OSError:
pass
with xopen(path, 'a') as f:
f.write(text)
with xopen(path, 'a') as f:
f.write(text)
with xopen(path, 'r') as f:
for appended in f:
pass
try:
reference = reference.decode("utf-8")
except AttributeError:
pass
assert appended == reference
def create_truncated_file(path):
# Random text
random_text = ''.join(random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ') for _ in range(1024))
# Make the text a lot bigger in order to ensure that it is larger than the
# pipe buffer size.
random_text *= 1024 # 1MB
with xopen(path, 'w') as f:
f.write(random_text)
with open(path, 'a') as f:
f.truncate(os.stat(path).st_size - 10)
class TookTooLongError(Exception):
pass
class timeout:
# copied from https://stackoverflow.com/a/22348885/715090
def __init__(self, seconds=1):
self.seconds = seconds
def handle_timeout(self, signum, frame):
raise TookTooLongError()
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
if sys.version_info[:2] != (3, 3):
@raises(EOFError, IOError)
def test_truncated_gz():
with temporary_path('truncated.gz') as path:
create_truncated_file(path)
with timeout(seconds=2):
f = xopen(path, 'r')
f.read()
f.close()
@raises(EOFError, IOError)
def test_truncated_gz_iter():
with temporary_path('truncated.gz') as path:
create_truncated_file(path)
with timeout(seconds=2):
f = xopen(path, 'r')
for line in f:
pass
f.close()
|