File: StreamFiledescriptors_test.py

package info (click to toggle)
python-pysam 0.15.4+ds-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 27,992 kB
  • sloc: ansic: 140,738; python: 7,881; sh: 265; makefile: 223; perl: 41
file content (89 lines) | stat: -rw-r--r-- 3,024 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import sys
import subprocess
import threading
import errno
import unittest
from pysam import AlignmentFile
from TestUtils import BAM_DATADIR

IS_PYTHON2 = sys.version_info[0] == 2


def alignmentfile_writer_thread(infile, outfile):
    def _writer_thread(infile, outfile):
        """read from infile and write to outfile"""
        try:
            i = 0
            for record in infile:
                outfile.write(record)
                i += 1
        except IOError as e:
            if e.errno != errno.EPIPE:
                pass
        finally:
            outfile.close()

    writer = threading.Thread(target=_writer_thread, args=(infile, outfile))
    writer.daemon = True
    writer.start()
    return writer


class StreamTest(unittest.TestCase):

    def stream_process(self, proc, in_stream, out_stream, writer):

        with AlignmentFile(proc.stdout) as infile:
            read = 0
            for record in infile:
                read += 1
        return 0, read

    @unittest.skipIf(IS_PYTHON2, "no context manager in py2")
    def test_text_processing(self):

        with subprocess.Popen('head -n200',
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE,
                              shell=True) as proc:

            in_stream = AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam'))
            out_stream = AlignmentFile(
                proc.stdin, 'wh', header=in_stream.header)
            writer = alignmentfile_writer_thread(in_stream,
                                                 out_stream)

            written, read = self.stream_process(proc,
                                                in_stream,
                                                out_stream,
                                                writer)
            self.assertEqual(read, 198)

    @unittest.skip("test contains bug")
    def test_samtools_processing(self):

        # The following test causes the suite to hang
        # as the stream_processor raises:
        # ValueError: file has no sequences defined (mode='r') - is it SAM/BAM format?
        # The whole setup then hangs during exception handling.
        with subprocess.Popen('samtools view -b -f 4',
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE,
                              shell=True) as proc:

            in_stream = AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam'))
            out_stream = AlignmentFile(
                proc.stdin, 'wb', header=in_stream.header)
            writer = alignmentfile_writer_thread(in_stream,
                                                 out_stream)

            written, read = self.stream_process(proc,
                                                in_stream,
                                                out_stream,
                                                writer)
            self.assertEqual(read, 35)


if __name__ == "__main__":
    unittest.main()