1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
import os
import subprocess
import threading
import errno
import unittest
from pysam import AlignmentFile
DATADIR = os.path.abspath(os.path.join(
os.path.dirname(__file__),
"pysam_data"))
def alignmentfile_writer_thread(infile, outfile):
def _writer_thread(infile, outfile):
"""read from infile and write to outfile"""
try:
i = 0
for record in infile:
outfile.write(record)
i += 1
except IOError as e:
if e.errno != errno.EPIPE:
pass
finally:
outfile.close()
writer = threading.Thread(target=_writer_thread, args=(infile, outfile))
writer.daemon = True
writer.start()
return writer
class StreamTest(unittest.TestCase):
def stream_process(self, proc, in_stream, out_stream, writer):
with AlignmentFile(proc.stdout) as infile:
read = 0
for record in infile:
read += 1
return 0, read
def test_text_processing(self):
proc = subprocess.Popen('head -n200',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True)
in_stream = AlignmentFile('pysam_data/ex1.bam')
out_stream = AlignmentFile(proc.stdin, 'wh', header=in_stream.header)
writer = alignmentfile_writer_thread(in_stream,
out_stream)
written, read = self.stream_process(proc,
in_stream,
out_stream,
writer)
self.assertEqual(read, 198)
def test_samtools_processing(self):
proc = subprocess.Popen('samtools view -b -f 4',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True)
in_stream = AlignmentFile('pysam_data/ex1.bam')
out_stream = AlignmentFile(proc.stdin, 'wb', header=in_stream.header)
writer = alignmentfile_writer_thread(in_stream,
out_stream)
written, read = self.stream_process(proc,
in_stream,
out_stream,
writer)
self.assertEqual(read, 35)
if __name__ == "__main__":
unittest.main()
|