File: test_pbcore_io_GffIO.py

package info (click to toggle)
python-pbcore 1.2.11%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 18,612 kB
  • ctags: 5,336
  • sloc: python: 22,160; xml: 2,667; makefile: 239
file content (204 lines) | stat: -rw-r--r-- 8,246 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204

from StringIO import StringIO
import tempfile
import unittest
import os.path

from nose.tools import assert_equal, assert_raises

from pbcore.io import GffWriter, Gff3Record, GffReader
from pbcore.io.GffIO import merge_gffs, merge_gffs_sorted, sort_gff
from pbcore import data

def rm_out(fname):
    if os.path.exists(fname):
        os.remove(fname)

class TestGff3Record:

    def setup(self):
        self.record = Gff3Record("chr1", 10, 11, "insertion",
                                 attributes=[("cat", "1"), ("dog", "2")])

    def test_str(self):
        assert_equal("chr1\t.\tinsertion\t10\t11\t.\t.\t.\tcat=1;dog=2",
                     str(self.record))

    def test_modification(self):
        record = self.record.copy()
        record.dog = 3
        record.cat = 4
        record.mouse = 5
        record.start = 100
        record.end = 110
        assert_equal("chr1\t.\tinsertion\t100\t110\t.\t.\t.\tcat=4;dog=3;mouse=5",
                     str(record))

    def test_fromString(self):
        newRecord = Gff3Record.fromString(str(self.record))
        assert_equal(str(self.record),  str(newRecord))

    def test_get(self):
        """
        Verify field access behavior
        """
        record = self.record
        record.dog = 3
        record.cat = 4
        record.mouse = 5
        record.start = 100
        record.end = 110

        assert_equal(3, record.dog)
        assert_equal(100, record.start)
        with assert_raises(AttributeError):
            record.god

        assert_equal(3, record.get("dog"))
        assert_equal(None, record.get("god"))
        assert_equal(100, record.get("start", 100))




class TestGffReader:
    def setup(self):
        self.rawFile = open(data.getGff3())
        self.reader = GffReader(data.getGff3())

    def test_headers(self):
        assert_equal(["##gff-version 3",
                      "##pacbio-variant-version 2.1",
                      "##date Sat Mar 22 12:16:13 2014",
                      "##feature-ontology http://song.cvs.sourceforge.net/*checkout*/song/ontology/sofa.obo?revision=1.12",
                      "##source GenomicConsensus 0.8.0",
                      "##source-commandline /Users/dalexander/.virtualenvs/VE/bin/variantCaller.py --algorithm=plurality -q20 -x5 pbcore/data/aligned_reads_1.cmp.h5 -r /Users/dalexander/Data/lambdaNEB.fa -o /tmp/v.gff",
                      "##source-alignment-file /Users/dalexander/Dropbox/Sources/git/pbcore/pbcore/data/aligned_reads_1.cmp.h5",
                      "##source-reference-file /Users/dalexander/Data/lambdaNEB.fa",
                      "##sequence-region lambda_NEB3011 1 48502"],
                     self.reader.headers)

    def test__iter__(self):
        records = list(self.reader)
        rawLines = self.rawFile.readlines()[9:]
        for record, rawLine in zip(records, rawLines):
            # No newlines or whitespace allowed in records
            assert_equal(str(record).strip(), str(record))
            # Make sure record matches line
            assert_equal(rawLine.strip(), str(record))


class TestGffWriter:
    def setup(self):
        self.outfile = StringIO()
        self.record1 = Gff3Record("chr1", 10, 11, "insertion",
                                  attributes=[("cat", "1"), ("dog", "2")])
        self.record2 = Gff3Record("chr1", 200, 201, "substitution",
                                  attributes=[("mouse", "1"), ("moose", "2")])
        self.gffWriter = GffWriter(self.outfile)

    def test_writeHeader(self):
        self.gffWriter.writeHeader("##foo bar")
        assert_equal("##gff-version 3\n##foo bar\n",
                     self.outfile.getvalue())

    def test_writeRecord(self):
        self.gffWriter.writeRecord(self.record1)
        self.gffWriter.writeRecord(self.record2)
        expected = ("##gff-version 3\n" +
                    "chr1\t.\tinsertion\t10\t11\t.\t.\t.\tcat=1;dog=2\n" +
                    "chr1\t.\tsubstitution\t200\t201\t.\t.\t.\tmouse=1;moose=2\n")
        assert_equal(expected, self.outfile.getvalue())


class TestGffSorting(unittest.TestCase):
    gff_data = ["""\
##gff-version 3
##source ipdSummary
##source-commandline ipdSummary etc.
##sequence-region lambda_NEB3011 1 48502
chr1\tkinModCall\tmodified_base\t32580\t32580\t32\t-\t.\tcoverage=94;context=AATGGCATCGTTCCGGTGGTGGGCGTTGATGGCTGGTCCCG;IPDRatio=1.75
chr1\tkinModCall\tmodified_base\t32766\t32766\t42\t-\t.\tcoverage=170;context=GCTGGGAAGCTGGCTGAACGTGTCGGCATGGATTCTGTCGA;IPDRatio=1.70
chr1\tkinModCall\tmodified_base\t32773\t32773\t54\t-\t.\tcoverage=154;context=AACGCTGGCTGGGAAGCTGGCTGAACGTGTCGGCATGGATT;IPDRatio=2.65""", """\
##gff-version 3
##source ipdSummary
##source-commandline ipdSummary etc.
##sequence-region lambda_NEB3011 1 48502
chr2\tkinModCall\tmodified_base\t1200\t1200\t47\t-\t.\tcoverage=109;context=ACTTTTCACGGTAGTTTTTTGCCGCTTTACCGCCCAGGCAC;IPDRatio=1.89
chr2\tkinModCall\tmodified_base\t1786\t1786\t36\t-\t.\tcoverage=153;context=TCCCACGTCTCACCGAGCGTGGTGTTTACGAAGGTTTTACG;IPDRatio=1.67
chr2\tkinModCall\tmodified_base\t1953\t1953\t39\t+\t.\tcoverage=148;context=AATGCGCGTATGGGGATGGGGGCCGGGTGAGGAAAGCTGGC;IPDRatio=1.86""", """\
chr1\tkinModCall\tmodified_base\t16204\t16204\t31\t-\t.\tcoverage=119;context=CCCGCGCAGATGATAATTACGGCTCACCTGCTGGCTGCCGA;IPDRatio=1.80
chr1\tkinModCall\tmodified_base\t16302\t16302\t33\t+\t.\tcoverage=108;context=TGGGACGGAACGTTTAAACCGGCATACAGCAACAACATGGC;IPDRatio=1.81
chr1\tkinModCall\tmodified_base\t16348\t16348\t42\t-\t.\tcoverage=115;context=CCCCATGCCGTAGCGCGGATGGGTCAGCATATCCCACAGAC;IPDRatio=1.82""",]
    sorted_start = [
        ('chr1', 16204), ('chr1', 16302), ('chr1', 16348),
        ('chr1', 32580), ('chr1', 32766), ('chr1', 32773),
        ('chr2', 1200), ('chr2', 1786), ('chr2', 1953),
    ]

    @classmethod
    def setUpClass(cls):
        cls.files = []
        cls.combined = "tmp_pbcore_all.gff"
        with open(cls.combined, "w") as f_all:
            for i in range(3):
                file_name = "tmp_pbcore_%d.gff" % i
                with open(file_name, "w") as f:
                    f.write(cls.gff_data[i])
                cls.files.append(file_name)
                for line in cls.gff_data[i].splitlines():
                    if line.startswith("#"):
                        if i == 0:
                            f_all.write(line+"\n")
                    else:
                        f_all.write(line+"\n")

    @classmethod
    def tearDownClass(cls):
        for file_name in cls.files:
            if os.path.exists(file_name):
                os.remove(file_name)
        if os.path.exists(cls.combined):
            os.remove(cls.combined)

    def test_merge_gffs(self):
        gff_out = "tmp_pbcore_merge.gff"
        merge_gffs(self.files, gff_out)
        n_rec = 0
        for fn in self.files:
            with GffReader(fn) as f:
                n_rec += len([ rec for rec in f ])
        with GffReader(gff_out) as f:
            self.assertEqual(f.headers, [
                "##gff-version 3",
                "##source ipdSummary",
                "##sequence-region lambda_NEB3011 1 48502",
            ])
            n_rec_merged = len([ rec for rec in f ])
            self.assertEqual(n_rec, n_rec_merged)
        rm_out(gff_out)

    def test_merge_gffs_sorted(self):
        gff_out = "tmp_pbcore_merged_sorted.gff"
        merge_gffs_sorted(self.files, gff_out)
        with GffReader(gff_out) as f:
            start = [ (rec.seqid, rec.start) for rec in f ]
            self.assertEqual(start, self.sorted_start)
        rm_out(gff_out)

    def test_sort_gff(self):
        gff_out = sort_gff(self.combined)
        with GffReader(gff_out) as f:
            start = [ (rec.seqid, rec.start) for rec in f ]
            self.assertEqual(start, self.sorted_start)
        rm_out(gff_out)

    def test_empty_file(self):
        gff_tmp = tempfile.NamedTemporaryFile(suffix=".gff").name
        with open(gff_tmp, "w") as f:
            f.write("##gff-version 3\n")
            f.write("##source ipdSummary\n")
            f.write("##sequence-region lambda_NEB3011 1 48502")
        gff_out = tempfile.NamedTemporaryFile(suffix=".gff").name
        merge_gffs(self.files + [gff_tmp], gff_out)
        rm_out(gff_out)