File: annotation.py

package info (click to toggle)
python-seqcluster 1.2.9%2Bds-3
  • links: PTS, VCS
  • area: contrib
  • in suites: bookworm
  • size: 113,624 kB
  • sloc: python: 5,308; makefile: 184; sh: 122; javascript: 55
file content (106 lines) | stat: -rw-r--r-- 3,754 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import seqcluster.libs.logger as mylog
import os

from seqcluster.libs.classes import annotation, dbannotation

logger = mylog.getLogger("run")


def read_gtf_line(cols, field="name"):
    """parse gtf line to get class/name information"""
    field = field.lower()
    try:
        group = cols[2]
        attrs = cols[8].split(";")
        name = [attr.strip().split(" ")[1] for attr in attrs if attr.strip().split(" ")[0].lower().endswith(field)]
        if not name:
            name = [attr.strip().split(" ")[1] for attr in attrs if attr.strip().split(" ")[0].lower().endswith("gene_id")]
        if not name:
            name = ["None"]
        biotype = [attr.strip().split(" ")[1] for attr in attrs if attr.strip().split(" ")[0].lower().endswith("biotype")]
        if biotype:
            group = biotype[0]
        c = cols[0]
        s = int(cols[3])
        e = int(cols[4])
        st = cols[6]
        return [c, s, e, st, group, name[0]]
    except(Exception, e):
        logger.error(cols)
        logger.error("File is not in correct format")
        logger.error("Expect chr source feature start end . strand attributes")
        logger.error("Attributes are 'gene_name SNCA; gene_id ENSG; '")
        logger.error("The 3rd column is used as type of small RNA (like miRNA)")
        logger.error("at least should contains '; *name NAME; '")
        logger.error(e)
        raise


def _position_in_feature(pos_a, pos_b):
    """return distance to 3' and 5' end of the feature"""
    strd = "-"
    if pos_a[2] in pos_b[2]:
        strd = "+"
    if pos_a[2] in "+" and pos_b[2] in "+":
        lento5 = pos_a[0] - pos_b[1] + 1
        lento3 = pos_a[1] - pos_b[1] + 1
    if pos_a[2] in "+" and pos_b[2] in "-":
        lento5 = pos_a[1] - pos_b[0] + 1
        lento3 = pos_a[0] - pos_b[1] + 1
    if pos_a[2] in "-" and pos_b[2] in "+":
        lento5 = pos_a[0] - pos_b[1] + 1
        lento3 = pos_a[1] - pos_b[0] + 1
    if pos_a[2] in "-" and pos_b[2] in "-":
        lento3 = pos_a[0] - pos_b[0] + 1
        lento5 = pos_a[1] - pos_b[1] + 1
    else:
        lento5 = pos_a[0] - pos_b[0] + 1
        lento3 = pos_a[1] - pos_b[1] + 1
    return lento5, lento3, strd


def anncluster(c, clus_obj, db, type_ann, feature_id="name"):
    """intersect transcription position with annotation files"""
    id_sa, id_ea, id_id, id_idl, id_sta = 1, 2, 3, 4, 5
    if type_ann == "bed":
        id_sb = 7
        id_eb = 8
        id_stb = 11
        id_tag = 9
    ida = 0
    clus_id = clus_obj.clus
    loci_id = clus_obj.loci
    db = os.path.splitext(db)[0]
    logger.debug("Type:%s\n" % type_ann)
    for cols in c.features():
        if type_ann == "gtf":
            cb, sb, eb, stb, db, tag = read_gtf_line(cols[6:], feature_id)
        else:
            sb = int(cols[id_sb])
            eb = int(cols[id_eb])
            stb = cols[id_stb]
            tag = cols[id_tag]
        id = int(cols[id_id])
        idl = int(cols[id_idl])
        if (id in clus_id):
            clus = clus_id[id]
            sa = int(cols[id_sa])
            ea = int(cols[id_ea])
            ida += 1
            lento5, lento3, strd = _position_in_feature([sa, ea, cols[id_sta]], [sb, eb, stb])
            if db in loci_id[idl].db_ann:
                ann = annotation(db, tag, strd, lento5, lento3)
                tdb = loci_id[idl].db_ann[db]
                tdb.add_db_ann(ida, ann)
                loci_id[idl].add_db(db, tdb)
            else:
                ann = annotation(db, tag, strd, lento5, lento3)
                tdb = dbannotation(1)
                tdb.add_db_ann(ida, ann)
                loci_id[idl].add_db(db, tdb)
            clus_id[id] = clus
    clus_obj.clus = clus_id
    clus_obj.loci = loci_id
    return clus_obj