File: StreamFiledescriptors_test.py

package info (click to toggle)
python-pysam 0.10.0%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 14,196 kB
  • ctags: 10,087
  • sloc: ansic: 79,627; python: 8,569; sh: 282; makefile: 215; perl: 41
file content (82 lines) | stat: -rw-r--r-- 2,544 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import subprocess
import threading
import errno
import unittest

from pysam import AlignmentFile

DATADIR = os.path.abspath(os.path.join(
    os.path.dirname(__file__),
    "pysam_data"))


def alignmentfile_writer_thread(infile, outfile):
    def _writer_thread(infile, outfile):
        """read  from infile and write to outfile"""
        try:
            i = 0
            for record in infile:
                outfile.write(record)
                i += 1
        except IOError as e:
            if e.errno != errno.EPIPE:
                pass
        finally:
            outfile.close()

    writer = threading.Thread(target=_writer_thread, args=(infile, outfile))
    writer.daemon = True
    writer.start()
    return writer


class StreamTest(unittest.TestCase):

    def stream_process(self, proc, in_stream, out_stream, writer):

        with AlignmentFile(proc.stdout) as infile:
            read = 0
            for record in infile:
                read += 1
        return 0, read

    def test_text_processing(self):

        proc = subprocess.Popen('head -n200',
                                stdin=subprocess.PIPE,
                                stdout=subprocess.PIPE,
                                shell=True)

        in_stream = AlignmentFile('pysam_data/ex1.bam')
        out_stream = AlignmentFile(proc.stdin, 'wh', header=in_stream.header)
        writer = alignmentfile_writer_thread(in_stream,
                                             out_stream)

        written, read = self.stream_process(proc,
                                            in_stream,
                                            out_stream,
                                            writer)
        self.assertEqual(read, 198)

    def test_samtools_processing(self):

        proc = subprocess.Popen('samtools view -b -f 4',
                                stdin=subprocess.PIPE,
                                stdout=subprocess.PIPE,
                                shell=True)

        in_stream = AlignmentFile('pysam_data/ex1.bam')
        out_stream = AlignmentFile(proc.stdin, 'wb', header=in_stream.header)
        writer = alignmentfile_writer_thread(in_stream,
                                             out_stream)

        written, read = self.stream_process(proc,
                                            in_stream,
                                            out_stream,
                                            writer)
        self.assertEqual(read, 35)


if __name__ == "__main__":
    unittest.main()