File: test_SffIO.py

package info (click to toggle)
python-biopython 1.68%2Bdfsg-3~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 46,856 kB
  • sloc: python: 160,306; xml: 93,216; ansic: 9,118; sql: 1,208; makefile: 155; sh: 63
file content (373 lines) | stat: -rw-r--r-- 15,213 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# Copyright 2012 by Jeff Hussmann.  All rights reserved.
# Revisions copyright 2013-2016 by Peter Cock.  All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license.  Please see the LICENSE file that should have been included
# as part of this package.

import sys
import re
import unittest
from io import BytesIO

from Bio.SeqIO.SffIO import _sff_find_roche_index, _sff_read_roche_index
from Bio.SeqIO.SffIO import _sff_do_slow_index
from Bio.SeqIO.SffIO import SffIterator, SffWriter, ReadRocheXmlManifest
from Bio import SeqIO

# sffinfo E3MFGYR02_random_10_reads.sff | sed -n '/>\|Run Prefix\|Region\|XY/p'
test_data = """
>E3MFGYR02JWQ7T
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  3946_2103
>E3MFGYR02JA6IL
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  3700_3115
>E3MFGYR02JHD4H
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  3771_2095
>E3MFGYR02GFKUC
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  2520_2738
>E3MFGYR02FTGED
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  2268_2739
>E3MFGYR02FR9G7
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  2255_0361
>E3MFGYR02GAZMS
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  2468_1618
>E3MFGYR02HHZ8O
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  2958_1574
>E3MFGYR02GPGB1
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  2633_0607
>E3MFGYR02F7Z7G
  Run Prefix:   R_2008_01_09_16_16_00_
  Region #:     2
  XY Location:  2434_1658"""


class TestUAN(unittest.TestCase):
    def setUp(self):
        self.records = [record for record in SeqIO.parse('Roche/E3MFGYR02_random_10_reads.sff', 'sff')]
        self.test_annotations = {}
        for line in test_data.splitlines():
            fields = re.split(r"\s+", line.strip())
            if '>' in line:
                current_name = fields[0].lstrip('>')
                self.test_annotations[current_name] = {}
            elif 'Prefix' in line:
                time_list = [int(v) for v in fields[2].split('_')[1:-1]]
                self.test_annotations[current_name]["time"] = time_list
            elif 'Region' in line:
                region = int(fields[-1])
                self.test_annotations[current_name]["region"] = region
            elif 'XY' in line:
                x, y = [int(v) for v in fields[-1].split('_')]
                self.test_annotations[current_name]["coords"] = (x, y)

    def test_time(self):
        for record in self.records:
            self.assertEqual(record.annotations["time"], self.test_annotations[record.name]["time"])

    def test_region(self):
        for record in self.records:
            self.assertEqual(record.annotations["region"], self.test_annotations[record.name]["region"])

    def test_coords(self):
        for record in self.records:
            self.assertEqual(record.annotations["coords"], self.test_annotations[record.name]["coords"])


class TestErrors(unittest.TestCase):
    with open("Roche/E3MFGYR02_random_10_reads.sff", "rb") as handle:
        good = handle.read()

    def test_empty(self):
        fh = BytesIO()
        try:
            records = list(SeqIO.parse(fh, "sff"))
        except ValueError as err:
            self.assertEqual(str(err), "Empty file.")
        else:
            self.assertTrue(False, "Empty file did not raise exception")

    def check_bad_header(self, header, msg):
        try:
            records = list(SeqIO.parse(BytesIO(header), "sff"))
        except ValueError as err:
            if isinstance(msg, (tuple, list)):
                self.assertTrue(str(err) in msg, "Unexpected error: %s" % err)
            else:
                self.assertEqual(str(err), msg)
        else:
            self.assertTrue(False, "Test SFF header only did not raise exception")

    def test_30bytes(self):
        self.check_bad_header(b"x" * 30,
                              "File too small to hold a valid SFF header.")

    def test_31bytes(self):
        self.check_bad_header(b"x" * 31,
                              ("SFF file did not start '.sff', but 'xxxx'",
                               "SFF file did not start '.sff', but b'xxxx'"))

    def test_31bytes_index_header(self):
        self.check_bad_header(b".srt" + b"x" * 27,
                              "Handle seems to be at SFF index block, not start")

    def test_31bytes_bad_ver(self):
        self.check_bad_header(b".sff1.00" + b"x" * 23,
                              "Unsupported SFF version in header, 49.46.48.48")

    def test_31bytes_bad_flowgram(self):
        self.check_bad_header(b".sff\x00\x00\x00\x01" + b"x" * 23,
                              "Flowgram format code 120 not supported")

    def test_bad_index_offset(self):
        bad = self.good[:12] + b"\x00\x00\x00\x00" + self.good[16:]
        self.check_bad_header(bad,
                              "Index offset 0 but index length 764")

    def test_bad_index_length(self):
        bad = self.good[:16] + b"\x00\x00\x00\x00" + self.good[20:]
        self.check_bad_header(bad,
                              "Index offset 16824 but index length 0")

    def test_bad_index_eof(self):
        # Semi-random edit to the index offset value,
        bad = self.good[:13] + b"\x01" + self.good[14:]
        self.check_bad_header(bad,
                              "Gap of 65536 bytes after final record end 16824, "
                              "before 82360 where index starts?")

    def test_no_index(self):
        # Does a lot of work to create a no-index SFF file
        # (in the process checking this bit of SffWriter works)
        records = list(SeqIO.parse(BytesIO(self.good), "sff"))
        with BytesIO() as handle:
            writer = SffWriter(handle, index=False)
            count = writer.write_file(records)
            self.assertEqual(count, len(records))
            handle.seek(0)
            new = list(SeqIO.parse(handle, "sff"))
            self.assertEqual(len(records), len(new))
            for a, b in zip(records, new):
                self.assertEqual(a.id, b.id)
            handle.seek(0)
            try:
                values = _sff_find_roche_index(handle)
            except ValueError as err:
                self.assertEqual(str(err), "No index present in this SFF file")
            else:
                self.assertTrue(False, "Test _sff_find_roche_index did not raise exception")

    def test_unknown_index(self):
        # TODO - Add SFF file with no index,
        # self.assertEqual(str(err), "No index present in this SFF file")
        with open("Roche/E3MFGYR02_alt_index_in_middle.sff", "rb") as handle:
            try:
                values = _sff_find_roche_index(handle)
            except ValueError as err:
                self.assertTrue(str(err) in ("Unknown magic number '.diy' in SFF index header:\n'.diy1.00'",
                                             "Unknown magic number b'.diy' in SFF index header:\nb'.diy1.00'"))
            else:
                self.assertTrue(False, "Test _sff_find_roche_index did not raise exception")

    def check_sff_read_roche_index(self, data, msg):
        handle = BytesIO(data)
        try:
            index = list(_sff_read_roche_index(handle))
        except ValueError as err:
            self.assertEqual(str(err), msg)
        else:
            self.assertTrue(False, "_sff_read_roche_index did not raise exception")

    def test_premature_end_of_index(self):
        self.check_sff_read_roche_index(self.good[:-50],
                                        "Premature end of file!")

    def test_index_name_no_null(self):
        self.assertEqual(self.good[17502:17503], b"\x00")
        self.check_sff_read_roche_index(self.good[:17502] + b"x" + self.good[17503:],
                                        "Expected a null terminator to the read name.")

    def test_index_mft_version(self):
        self.assertEqual(self.good[16824:16832], b".mft1.00")
        self.check_sff_read_roche_index(self.good[:16828] + b"\x01\x02\x03\x04" + self.good[16832:],
                                        "Unsupported version in .mft index header, 1.2.3.4")

    def test_index_mft_data_size(self):
        self.assertEqual(self.good[16824:16832], b".mft1.00")
        self.check_sff_read_roche_index(self.good[:16836] + b"\x00\x00\x00\x00" + self.good[16840:],
                                        "Problem understanding .mft index header, 764 != 8 + 8 + 548 + 0")

    def test_index_lengths(self):
        # Reduce the number of reads from 10 to 9 so index loading fails...
        self.assertEqual(self.good[20:24], b"\x00\x00\x00\x0A")
        self.check_sff_read_roche_index(self.good[:20] + b"\x00\x00\x00\x09" + self.good[24:],
                                        "Problem with index length? 17568 vs 17588")

    def test_no_manifest_xml(self):
        with open("Roche/E3MFGYR02_no_manifest.sff", "rb") as handle:
            try:
                xml = ReadRocheXmlManifest(handle)
            except ValueError as err:
                self.assertEqual(str(err), "No XML manifest found")
            else:
                self.assertTrue(False, "ReadRocheXmlManifest did not raise exception")


class TestIndex(unittest.TestCase):

    def test_manifest(self):
        filename = "Roche/E3MFGYR02_random_10_reads.sff"
        with open(filename, "rb") as handle:
            metadata = ReadRocheXmlManifest(handle)

    def test_both_ways(self):
        filename = "Roche/E3MFGYR02_random_10_reads.sff"
        with open(filename, "rb") as handle:
            index1 = sorted(_sff_read_roche_index(handle))
        with open(filename, "rb") as handle:
            index2 = sorted(_sff_do_slow_index(handle))
        self.assertEqual(index1, index2)
        with open(filename, "rb") as handle:
            self.assertEqual(len(index1), len(list(SffIterator(handle))))
        with open(filename, "rb") as handle:
            self.assertEqual(len(index1), len(list(SffIterator(BytesIO(handle.read())))))

        if sys.platform != "win32" and sys.version_info[0] < 3:
            # Can be lazy and treat as binary...
            with open(filename, "r") as handle:
                self.assertEqual(len(index1), len(list(SffIterator(handle))))
            with open(filename) as handle:
                index2 = sorted(_sff_read_roche_index(handle))
            self.assertEqual(index1, index2)
            with open(filename, "r") as handle:
                index2 = sorted(_sff_do_slow_index(handle))
            self.assertEqual(index1, index2)
            with open(filename, "r") as handle:
                self.assertEqual(len(index1), len(list(SffIterator(handle))))
            with open(filename, "r") as handle:
                self.assertEqual(len(index1), len(list(SffIterator(BytesIO(handle.read())))))


class TestAlternativeIndexes(unittest.TestCase):
    filename = "Roche/E3MFGYR02_random_10_reads.sff"
    with open(filename, "rb") as handle:
        sff = list(SffIterator(handle))

    def check_same(self, new_sff):
        self.assertEqual(len(self.sff), len(new_sff))
        for old, new in zip(self.sff, new_sff):
            self.assertEqual(old.id, new.id)
            self.assertEqual(str(old.seq), str(new.seq))

    def test_alt_index_at_end(self):
        with open("Roche/E3MFGYR02_alt_index_at_end.sff", "rb") as handle:
            sff2 = list(SffIterator(handle))
        self.check_same(sff2)

    def test_alt_index_at_start(self):
        with open("Roche/E3MFGYR02_alt_index_at_start.sff", "rb") as handle:
            sff2 = list(SffIterator(handle))
        self.check_same(sff2)

    def test_alt_index_in_middle(self):
        with open("Roche/E3MFGYR02_alt_index_in_middle.sff", "rb") as handle:
            sff2 = list(SffIterator(handle))
        self.check_same(sff2)

    def test_index_at_start(self):
        with open("Roche/E3MFGYR02_index_at_start.sff", "rb") as handle:
            sff2 = list(SffIterator(handle))
        self.check_same(sff2)

    def test_index_in_middle(self):
        with open("Roche/E3MFGYR02_index_in_middle.sff", "rb") as handle:
            sff2 = list(SffIterator(handle))
        self.check_same(sff2)

    def test_trim(self):
        with open(self.filename, "rb") as handle:
            sff_trim = list(SffIterator(handle, trim=True))
        self.assertEqual(len(self.sff), len(sff_trim))
        for old, new in zip(self.sff, sff_trim):
            self.assertEqual(old.id, new.id)


class TestConcatenated(unittest.TestCase):
    def test_parses_gzipped_stream(self):
        import gzip
        count = 0
        fh = gzip.open("Roche/E3MFGYR02_random_10_reads.sff.gz", 'rb')
        for record in SeqIO.parse(fh, 'sff'):
            count += 1
        self.assertEqual(10, count)

    def test_parse1(self):
        count = 0
        caught = False
        try:
            for record in SeqIO.parse("Roche/invalid_greek_E3MFGYR02.sff", "sff"):
                count += 1
        except ValueError as err:
            self.assertTrue("Additional data at end of SFF file, perhaps "
                            "multiple SFF files concatenated? "
                            "See offset 65296" in str(err), err)
            caught = True
        self.assertTrue(caught, "Didn't spot concatenation")
        self.assertEqual(count, 24)

    def test_index1(self):
        try:
            d = SeqIO.index("Roche/invalid_greek_E3MFGYR02.sff", "sff")
        except ValueError as err:
            self.assertTrue("Additional data at end of SFF file, perhaps "
                            "multiple SFF files concatenated? "
                            "See offset 65296" in str(err), err)
        else:
            raise ValueError("Indxing Roche/invalid_greek_E3MFGYR02.sff should fail")

    def test_parse2(self):
        count = 0
        caught = False
        try:
            for record in SeqIO.parse("Roche/invalid_paired_E3MFGYR02.sff", "sff"):
                count += 1
        except ValueError as err:
            self.assertTrue("Your SFF file is invalid, post index 5 byte "
                            "null padding region ended '.sff' which could "
                            "be the start of a concatenated SFF file? "
                            "See offset 54371" in str(err), err)
            caught = True
        self.assertTrue(caught, "Didn't spot concatenation")
        self.assertEqual(count, 20)

    def test_index2(self):
        try:
            d = SeqIO.index("Roche/invalid_paired_E3MFGYR02.sff", "sff")
        except ValueError as err:
            self.assertTrue("Your SFF file is invalid, post index 5 byte "
                            "null padding region ended '.sff' which could "
                            "be the start of a concatenated SFF file? "
                            "See offset 54371" in str(err), err)
        else:
            raise ValueError("Indxing Roche/invalid_paired_E3MFGYR02.sff should fail")


if __name__ == "__main__":
    runner = unittest.TextTestRunner(verbosity=2)
    unittest.main(testRunner=runner)