File: testxopen.py

package info (click to toggle)
python-xopen 0.1.1-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 104 kB
  • sloc: python: 331; makefile: 15
file content (197 lines) | stat: -rw-r--r-- 4,250 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# coding: utf-8
from __future__ import print_function, division, absolute_import
import gzip
import os
import random
import sys
import signal
from contextlib import contextmanager
from nose.tools import raises
from xopen import xopen


base = "tests/file.txt"
files = [ base + ext for ext in ['', '.gz', '.bz2' ] ]
try:
	import lzma
	files.append(base + '.xz')
except ImportError:
	lzma = None


major, minor = sys.version_info[0:2]


@contextmanager
def temporary_path(name):
	directory = os.path.join(os.path.dirname(__file__), 'testtmp')
	if not os.path.isdir(directory):
		os.mkdir(directory)
	path = os.path.join(directory, name)
	yield path
	os.remove(path)


def test_xopen_text():
	for name in files:
		with xopen(name, 'rt') as f:
			lines = list(f)
			assert len(lines) == 2
			assert lines[1] == 'The second line.\n', name


def test_xopen_binary():
	for name in files:
		with xopen(name, 'rb') as f:
			lines = list(f)
			assert len(lines) == 2
			assert lines[1] == b'The second line.\n', name


def test_no_context_manager_text():
	for name in files:
		f = xopen(name, 'rt')
		lines = list(f)
		assert len(lines) == 2
		assert lines[1] == 'The second line.\n', name
		f.close()
		assert f.closed


def test_no_context_manager_binary():
	for name in files:
		f = xopen(name, 'rb')
		lines = list(f)
		assert len(lines) == 2
		assert lines[1] == b'The second line.\n', name
		f.close()
		assert f.closed


@raises(IOError)
def test_nonexisting_file():
	with xopen('this-file-does-not-exist') as f:
		pass


@raises(IOError)
def test_nonexisting_file_gz():
	with xopen('this-file-does-not-exist.gz') as f:
		pass


@raises(IOError)
def test_nonexisting_file_bz2():
	with xopen('this-file-does-not-exist.bz2') as f:
		pass


if lzma:
	@raises(IOError)
	def test_nonexisting_file_xz():
		with xopen('this-file-does-not-exist.xz') as f:
			pass


@raises(IOError)
def test_write_to_nonexisting_dir():
	with xopen('this/path/does/not/exist/file.txt', 'w') as f:
		pass


@raises(IOError)
def test_write_to_nonexisting_dir_gz():
	with xopen('this/path/does/not/exist/file.gz', 'w') as f:
		pass


@raises(IOError)
def test_write_to_nonexisting_dir_bz2():
	with xopen('this/path/does/not/exist/file.bz2', 'w') as f:
		pass


if lzma:
	@raises(IOError)
	def test_write_to_nonexisting_dir():
		with xopen('this/path/does/not/exist/file.xz', 'w') as f:
			pass


def test_append():
	for ext in ["", ".gz"]:  # BZ2 does NOT support append
		text = "AB"
		if ext != "":
			text = text.encode("utf-8")  # On Py3, need to send BYTES, not unicode
		reference = text + text
		with temporary_path('truncated.fastq' + ext) as path:
			try:
				os.unlink(path)
			except OSError:
				pass
			with xopen(path, 'a') as f:
				f.write(text)
			with xopen(path, 'a') as f:
				f.write(text)
			with xopen(path, 'r') as f:
				for appended in f:
					pass
				try:
					reference = reference.decode("utf-8")
				except AttributeError:
					pass
				assert appended == reference


def create_truncated_file(path):
	# Random text
	random_text = ''.join(random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ') for _ in range(1024))
	# Make the text a lot bigger in order to ensure that it is larger than the
	# pipe buffer size.
	random_text *= 1024  # 1MB
	with xopen(path, 'w') as f:
		f.write(random_text)
	with open(path, 'a') as f:
		f.truncate(os.stat(path).st_size - 10)


class TookTooLongError(Exception):
	pass


class timeout:
	# copied from https://stackoverflow.com/a/22348885/715090
	def __init__(self, seconds=1):
		self.seconds = seconds

	def handle_timeout(self, signum, frame):
		raise TookTooLongError()

	def __enter__(self):
		signal.signal(signal.SIGALRM, self.handle_timeout)
		signal.alarm(self.seconds)

	def __exit__(self, type, value, traceback):
		signal.alarm(0)


if sys.version_info[:2] != (3, 3):
	@raises(EOFError, IOError)
	def test_truncated_gz():
		with temporary_path('truncated.gz') as path:
			create_truncated_file(path)
			with timeout(seconds=2):
				f = xopen(path, 'r')
				f.read()
				f.close()


	@raises(EOFError, IOError)
	def test_truncated_gz_iter():
		with temporary_path('truncated.gz') as path:
			create_truncated_file(path)
			with timeout(seconds=2):
				f = xopen(path, 'r')
				for line in f:
					pass
				f.close()