File: gffutils-flybase-convert.py

package info (click to toggle)
python-gffutils 0.13-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,164 kB
  • sloc: python: 5,557; makefile: 57; sh: 13
file content (201 lines) | stat: -rwxr-xr-x 6,948 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python3

import os
import logging
from collections import Counter
import gffutils
from gffutils.create import logger

logger.setLevel(logging.INFO)

usage = """

    Converts a GFF file downloaded from FlyBase into a GTF file suitable for
    use with Cufflinks.  Support for ignoring chromosomes, featuretypes, and
    fixing various issues with the original GFF file.
"""


def read_ignore_list(fn):
    """
    Converts a file into a list
    """
    return [i.strip() for i in open(fn) if not i.startswith("#")]


def clean_gff(
    gff, cleaned, add_chr=False, chroms_to_ignore=None, featuretypes_to_ignore=None
):
    """
    Cleans a GFF file by removing features on unwanted chromosomes and of
    unwanted featuretypes.  Optionally adds "chr" to chrom names.
    """
    logger.info("Cleaning GFF")
    chroms_to_ignore = chroms_to_ignore or []
    featuretypes_to_ignore = featuretypes_to_ignore or []
    with open(cleaned, "w") as fout:
        for i in gffutils.iterators.DataIterator(gff):
            if add_chr:
                i.chrom = "chr" + i.chrom

            if i.chrom in chroms_to_ignore:
                continue

            if i.featuretype in featuretypes_to_ignore:
                continue
            fout.write(str(i) + "\n")
    return cleaned


if __name__ == "__main__":

    import argparse

    ap = argparse.ArgumentParser(usage=usage)
    ap.add_argument(
        "--gff",
        help="""GFF file downloaded from FlyBase.  Not
                    required if --db already exists""",
    )
    ap.add_argument(
        "--db",
        required=True,
        help="""Database that will be
                    created from FlyBase GFF""",
    )
    ap.add_argument(
        "--gtf",
        required=True,
        help="""GTF file that will
                    be created""",
    )
    ap.add_argument(
        "--featuretype-ignore",
        help="""File containing the
                    featuretypes to ignore in the GFF file, one featuretype on
                    each line.  This can speed up processing dramatically""",
    )
    ap.add_argument(
        "--chrom-ignore",
        help='''File containing chromosomes to
                    ignore in the GFF file.  If --add-chr, then chromosomes in
                    this list should also have "chr"''',
    )
    ap.add_argument(
        "--add-chr",
        action="store_true",
        help="""Prepend a "chr"
                    to chromosome names""",
    )
    ap.add_argument(
        "--fix-strand",
        action="store_true",
        help="""When exporting
                    the GTF file, check to ensure that all exons of a gene have
                    the same strand.  If not, then issue a warning and fix them
                    so that they all have the most common strand across all the
                    gene's exons.""",
    )
    ap.add_argument(
        "--fix-extent",
        choices=["gene", "exon"],
        help="""When
                    exporting the GTF file, check that all exons fall within
                    the gene's annotated extent.  If not, then issue a warning,
                    and, if --fix-extent=gene, then assume the gene annotation
                    is correct; if --fix-extent=exon then assume the exon
                    annotations are correct and make no changes to the exported
                    exons""",
    )
    args = ap.parse_args()

    if args.gff:
        chroms_to_ignore = []
        if args.chrom_ignore:
            chroms_to_ignore = read_ignore_list(args.chrom_ignore)

        featuretypes_to_ignore = []
        if args.featuretype_ignore:
            featuretypes_to_ignore = read_ignore_list(args.featuretype_ignore)

        cleaned = clean_gff(
            args.gff,
            args.gff + ".cleaned",
            add_chr=args.add_chr,
            chroms_to_ignore=chroms_to_ignore,
            featuretypes_to_ignore=featuretypes_to_ignore,
        )

        db = gffutils.create_db(
            args.gff + ".cleaned", args.db, verbose=True, id_spec=["ID"], force=True
        )

    db = gffutils.FeatureDB(args.db)

    logger.info("Creating GTF file")
    with open(args.gtf, "w") as fout:
        for gene in db.features_of_type("gene"):
            gene_id = gene.id
            transcripts = []
            exons = []
            for child in db.children(gene, level=1):
                transcript_id = child.id
                transcripts.append(child)
                for grandchild in db.children(child, level=1):
                    if grandchild.featuretype not in ["exon", "CDS"]:
                        continue
                    exons.append(grandchild)

            if args.fix_strand:
                c = Counter([i.strand for i in exons])

                if len(c) > 1:
                    # Exons have inconsistent strands.  So assume the most
                    # common strand is the "true" strand to use.
                    most_common = c.most_common()
                    new_strand = most_common[0][0]
                    logger.warning(
                        "Gene %s has inconsistent strands: %s.  "
                        'Changing all exons to "%s" strand'
                        % (gene_id, most_common, new_strand)
                    )
                    new_exons = []
                    for exon in exons:
                        exon.strand = new_strand
                        new_exons.append(exon)
                    exons = new_exons

            if args.fix_extent:
                if len(exons) > 0:
                    exon_extent = [
                        min(i.start for i in exons),
                        max(i.stop for i in exons),
                    ]
                    gene_extent = [gene.start, gene.stop]
                    if exon_extent != gene_extent:
                        logger.warning(
                            "Exons of gene %s do not match gene annotation. "
                            "Gene extent: %s.  Exon extent: %s"
                            % (gene_id, exon_extent, gene_extent)
                        )

                    if args.fix_extent == "gene":
                        new_exons = sorted(exons, key=lambda x: x.start)
                        new_exons[0].start = gene.start
                        new_exons[-1].stop = gene.stop
                        exons = new_exons

            for exon in exons:
                fields = str(grandchild).split("\t")[:-1]
                attributes = (
                    'gene_id "{0}"; transcript_id "{1}"; '
                    'gene_name "{2}" transcript_type="{3}"'
                ).format(
                    gene_id,
                    transcript_id,
                    gene.attributes["Name"][0],
                    child.featuretype,
                )
                fields.append(attributes)
                fout.write("\t".join(fields) + "\n")
    logger.info("Wrote %s" % args.gtf)