File: uloci.py

package info (click to toggle)
aegean 0.16.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 8,444 kB
  • sloc: ansic: 12,050; python: 465; sh: 315; makefile: 308; perl: 151
file content (118 lines) | stat: -rwxr-xr-x 5,210 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/python3

# Copyright (c) 2010-2016, Daniel S. Standage and CONTRIBUTORS
#
# The AEGeAn Toolkit is distributed under the ISC License. See
# the 'LICENSE' file in the AEGeAn source code distribution or
# online at https://github.com/standage/AEGeAn/blob/master/LICENSE.


import argparse
import re
import sys


def parse_gff3(fp, feattypes=['gene']):
    """
    Process entire (sorted) GFF3 file. Store lengths of sequences as reported
    by `##sequence-region` pragmas, and then discard lengths of sequences with
    annotated features. Sequence IDs and lengths yielded by the function
    correspond to unannotated sequences.
    """
    seqlens = {}
    for line in fp:
        if line.startswith('##sequence-region'):
            pattern = '##sequence-region\s+(\S+)\s+(\d+)\s+(\d+)'
            seqmatch = re.search(pattern, line)
            assert seqmatch, 'unable to parse seqreg pragma: %s' % line
            seqid = seqmatch.group(1)
            start = int(seqmatch.group(2))
            end = int(seqmatch.group(3))
            assert start == 1, 'assumption: start == 1: %s' % line
            seqlens[seqid] = end - start + 1
        elif len(line.split('\t')) == 9:
            fields = line.split('\t')
            seqid = fields[0]
            ftype = fields[2]
            if ftype in feattypes:
                seqlens.pop(seqid, None)

    for seqid in sorted(seqlens.keys()):
        yield seqid, seqlens[seqid]


def run_parse_gff3(gff3string, src='.', idformat='locus%d', counter=1):
    """Driver function for small examples (i.e. testing)."""
    output = ''
    for seqid, seqlen in parse_gff3(gff3string.splitlines()):
        locid = idformat % counter
        counter += 1
        attrs = 'ID=%s;Name=%s;fragment=true;unannot=true;' % (locid, locid)
        attrs += 'iLocus_type=iiLocus;effective_length=%d\n' % seqlen
        fields = [seqid, src, 'locus', '1', str(seqlen), '.', '.', '.', attrs]
        output += '\t'.join(fields)
    return output


def test_parse_gff3():
    """
    Unit tests for `parse_gff3` function. Throws exception in case of error, so
    no output is good output!
    """
    gff3 = ('##gff-version     3\n'
            '##sequence-region   chr1 1 1000000\n'
            '##sequence-region   chr2 1 100000\n'
            '##sequence-region   chr3 1 10000\n'
            '##sequence-region   chr4 1 1000\n'
            'chr1\t.\tgene\t100\t200\t.\t+\t.\tID=gene1\n'
            'chr2\t.\tgene\t100\t200\t.\t+\t.\tID=gene2\n'
            'chr3\t.\tgene\t100\t200\t.\t+\t.\tID=gene3\n')
    out = '\t'.join(['chr4', '.', 'locus', '1', '1000', '.', '.', '.',
                     'ID=locus1;Name=locus1;fragment=true;unannot=true;'
                     'iLocus_type=iiLocus;effective_length=1000\n'])
    testout = run_parse_gff3(gff3)
    assert out == testout, 'test 1 failed'

    gff3 = ('##gff-version\t3\n'
            '##sequence-region\tscaf00001   1   150000\n'
            '##sequence-region\tscaf00002   1   100000\n'
            '##sequence-region\tscaf00003   1   50000\n'
            '##sequence-region\tscaf00004   1   10000\n'
            'scaf00003\t.\tgene\t10000\t12000\t.\t+\t.\tID=gene1\n')
    out = '\t'.join(['scaf00001', '.', 'locus', '1', '150000', '.', '.', '.',
                     'ID=locus001;Name=locus001;fragment=true;unannot=true;'
                     'iLocus_type=iiLocus;effective_length=150000\n'])
    out += '\t'.join(['scaf00002', '.', 'locus', '1', '100000', '.', '.', '.',
                      'ID=locus002;Name=locus002;fragment=true;unannot=true;'
                      'iLocus_type=iiLocus;effective_length=100000\n'])
    out += '\t'.join(['scaf00004', '.', 'locus', '1', '10000', '.', '.', '.',
                      'ID=locus003;Name=locus003;fragment=true;unannot=true;'
                      'iLocus_type=iiLocus;effective_length=10000\n'])
    testout = run_parse_gff3(gff3, idformat='locus%03d')
    assert out == testout, 'test 2 failed'

test_parse_gff3()

if __name__ == '__main__':
    desc = 'Report iLoci for unannotated sequences'
    parser = argparse.ArgumentParser(description=desc)
    parser.add_argument('--idfmt', type=str, default='locus%d', help='An ID '
                        'with a serial number is assigned to each locus; '
                        'default format is "locus%%d"')
    parser.add_argument('--counter', type=int, default=1, help='Serial number '
                        'to assign to first iLocus; default is 1')
    parser.add_argument('--src', type=str, default='.', help='Source label to '
                        'use for GFF3 output (column 2); default is "."')
    parser.add_argument('infile', type=argparse.FileType('r'), help='Input '
                        'data; use - to read from stdin')
    args = parser.parse_args()

    print('##gff-version   3')
    for seqid, seqlen in parse_gff3(args.infile):
        locid = args.idfmt % args.counter
        args.counter += 1
        attrs = 'ID=%s;Name=%s;unannot=true;' % (locid, locid)
        attrs += 'iLocus_type=fiLocus;effective_length=%d' % seqlen
        fields = [seqid, args.src, 'locus', '1', str(seqlen), '.', '.', '.',
                  attrs]
        print(*fields, sep='\t')