1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
import os
import sys
import subprocess
import threading
import errno
import unittest
from pysam import AlignmentFile
from TestUtils import BAM_DATADIR
IS_PYTHON2 = sys.version_info[0] == 2
def alignmentfile_writer_thread(infile, outfile):
def _writer_thread(infile, outfile):
"""read from infile and write to outfile"""
try:
i = 0
for record in infile:
outfile.write(record)
i += 1
except IOError as e:
if e.errno != errno.EPIPE:
pass
finally:
outfile.close()
writer = threading.Thread(target=_writer_thread, args=(infile, outfile))
writer.daemon = True
writer.start()
return writer
class StreamTest(unittest.TestCase):
def stream_process(self, proc, in_stream, out_stream, writer):
with AlignmentFile(proc.stdout) as infile:
read = 0
for record in infile:
read += 1
return 0, read
@unittest.skipIf(IS_PYTHON2, "no context manager in py2")
def test_text_processing(self):
with subprocess.Popen('head -n200',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True) as proc:
in_stream = AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam'))
out_stream = AlignmentFile(
proc.stdin, 'wh', header=in_stream.header)
writer = alignmentfile_writer_thread(in_stream,
out_stream)
written, read = self.stream_process(proc,
in_stream,
out_stream,
writer)
self.assertEqual(read, 198)
@unittest.skip("test contains bug")
def test_samtools_processing(self):
# The following test causes the suite to hang
# as the stream_processor raises:
# ValueError: file has no sequences defined (mode='r') - is it SAM/BAM format?
# The whole setup then hangs during exception handling.
with subprocess.Popen('samtools view -b -f 4',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True) as proc:
in_stream = AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam'))
out_stream = AlignmentFile(
proc.stdin, 'wb', header=in_stream.header)
writer = alignmentfile_writer_thread(in_stream,
out_stream)
written, read = self.stream_process(proc,
in_stream,
out_stream,
writer)
self.assertEqual(read, 35)
if __name__ == "__main__":
unittest.main()
|