File: miloci.py

package info (click to toggle)
aegean 0.16.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 8,444 kB
  • sloc: ansic: 12,050; python: 465; sh: 315; makefile: 308; perl: 151
file content (127 lines) | stat: -rwxr-xr-x 3,569 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/python3

# Copyright (c) 2010-2016, Daniel S. Standage and CONTRIBUTORS
#
# The AEGeAn Toolkit is distributed under the ISC License. See
# the 'LICENSE' file in the AEGeAn source code distribution or
# online at https://github.com/standage/AEGeAn/blob/master/LICENSE.


import re
import sys


class Locus(object):
    def __init__(self, line):
        self._rawdata = line
        self.fields = line.strip().split('\t')
        assert len(self.fields) == 9

    @property
    def seqid(self):
        return self.fields[0]

    @property
    def start(self):
        return int(self.fields[3])

    @property
    def end(self):
        return int(self.fields[4])

    @property
    def ilocus_class(self):
        typematch = re.search('iLocus_type=([^;\n]+)', self.fields[8])
        assert typematch, 'could not determine iLocus type: ' + self._rawdata
        return typematch.group(1)

    @property
    def mergeable(self):
        if self.ilocus_class not in ['siLocus', 'niLocus']:
            return False
        if 'iiLocus_exception=intron-gene' in self.fields[8]:
            return False
        return True

    def __len__(self):
        return self.end - self.start + 1

    def __str__(self):
        return '\t'.join(self.fields)

    def strip(self):
        self.fields[8] = re.sub('ID=[^;\n]+;*', '', self.fields[8])
        self.fields[8] = re.sub('Name=[^;\n]+;*', '', self.fields[8])


def merge_iloci(loci):
    """Merge ajacent or overlapping gene-containing iLoci."""
    assert len(loci) > 0
    if len(loci) == 1:
        loci[0].strip()
        return loci[0]

    seqid = None
    start, end = -1, -1
    attrs = {}
    for locus in loci:
        if seqid:
            assert locus.seqid == seqid
        seqid = locus.seqid
        if start == -1 or locus.start < start:
            start = locus.start
        end = max(end, locus.end)
        numeric_attrs = re.findall('([^;=]+=\d+)', locus.fields[8])
        for key_value_pair in numeric_attrs:
            assert '=' in key_value_pair, \
                'malformed key/value pair %s' % key_value_pair
            key, value = key_value_pair.split('=')
            if key in ['left_overlap', 'right_overlap']:
                continue
            value = int(value)
            if key not in attrs:
                attrs[key] = 0
            attrs[key] += value

    attrstring = 'iLocus_type=miLocus'
    for key in sorted(attrs):
        attrstring += ';%s=%d' % (key, attrs[key])
    gff3 = [seqid, 'AEGeAn::miloci.py', 'locus', str(start), str(end),
            str(len(loci)), '.', '.', attrstring]
    line = '\t'.join(gff3)
    return Locus(line)


def parse_iloci(fp):
    """
    Input: a GFF3 file containing iLoci (LocusPocus output)
    Output: merged iLoci; gene-containing iLoci that are adjacent or
            overlapping are combined
    """
    locus_buffer = []
    for line in fp:
        if '\tlocus\t' not in line:
            continue
        locus = Locus(line)

        if len(locus_buffer) > 0 and locus.seqid != locus_buffer[0].seqid:
            yield merge_iloci(locus_buffer)
            locus_buffer = []

        if locus.mergeable:
            locus_buffer.append(locus)
            continue
        else:
            if len(locus_buffer) > 0:
                yield merge_iloci(locus_buffer)
                locus_buffer = []
            locus.strip()
            yield locus

    if len(locus_buffer) > 0:
        yield merge_iloci(locus_buffer)


if __name__ == '__main__':
    for locus in parse_iloci(sys.stdin):
        print(locus)