File: read_gtf.py

package info (click to toggle)
python-gtfparse 1.3.0%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 224 kB
  • sloc: python: 583; makefile: 12; sh: 8
file content (243 lines) | stat: -rw-r--r-- 8,462 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from os.path import exists

from sys import intern
import numpy as np
import pandas as pd

from .attribute_parsing import expand_attribute_strings
from .parsing_error import ParsingError
from .required_columns import REQUIRED_COLUMNS

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def parse_gtf(
        filepath_or_buffer,
        chunksize=1024 * 1024,
        features=None,
        intern_columns=["seqname", "source", "strand", "frame"],
        fix_quotes_columns=["attribute"]):
    """
    Parameters
    ----------

    filepath_or_buffer : str or buffer object

    chunksize : int

    features : set or None
        Drop entries which aren't one of these features

    intern_columns : list
        These columns are short strings which should be interned

    fix_quotes_columns : list
        Most commonly the 'attribute' column which had broken quotes on
        some Ensembl release GTF files.
    """
    if features is not None:
        features = set(features)

    dataframes = []

    def parse_frame(s):
        if s == ".":
            return 0
        else:
            return int(s)

    # GTF columns:
    # 1) seqname: str ("1", "X", "chrX", etc...)
    # 2) source : str
    #      Different versions of GTF use second column as of:
    #      (a) gene biotype
    #      (b) transcript biotype
    #      (c) the annotation source
    #      See: https://www.biostars.org/p/120306/#120321
    # 3) feature : str ("gene", "transcript", &c)
    # 4) start : int
    # 5) end : int
    # 6) score : float or "."
    # 7) strand : "+", "-", or "."
    # 8) frame : 0, 1, 2 or "."
    # 9) attribute : key-value pairs separated by semicolons
    # (see more complete description in docstring at top of file)

    chunk_iterator = pd.read_csv(
        filepath_or_buffer,
        sep="\t",
        comment="#",
        names=REQUIRED_COLUMNS,
        skipinitialspace=True,
        skip_blank_lines=True,
        on_bad_lines="error",
        chunksize=chunksize,
        engine="c",
        dtype={
            "start": np.int64,
            "end": np.int64,
            "score": np.float32,
            "seqname": str,
        },
        na_values=".",
        converters={"frame": parse_frame})
    dataframes = []
    try:
        for df in chunk_iterator:
            for intern_column in intern_columns:
                df[intern_column] = [intern(str(s)) for s in df[intern_column]]

            # compare feature strings after interning
            if features is not None:
                df = df[df["feature"].isin(features)]

            for fix_quotes_column in fix_quotes_columns:
                # Catch mistaken semicolons by replacing "xyz;" with "xyz"
                # Required to do this since the Ensembl GTF for Ensembl
                # release 78 has mistakes such as:
                #   gene_name = "PRAMEF6;" transcript_name = "PRAMEF6;-201"
                df[fix_quotes_column] = [
                    s.replace(';\"', '\"').replace(";-", "-")
                    for s in df[fix_quotes_column]
                ]
            dataframes.append(df)
    except Exception as e:
        raise ParsingError(str(e))
    df = pd.concat(dataframes)
    return df


def parse_gtf_and_expand_attributes(
        filepath_or_buffer,
        chunksize=1024 * 1024,
        restrict_attribute_columns=None,
        features=None):
    """
    Parse lines into column->values dictionary and then expand
    the 'attribute' column into multiple columns. This expansion happens
    by replacing strings of semi-colon separated key-value values in the
    'attribute' column with one column per distinct key, with a list of
    values for each row (using None for rows where key didn't occur).

    Parameters
    ----------
    filepath_or_buffer : str or buffer object

    chunksize : int

    restrict_attribute_columns : list/set of str or None
        If given, then only usese attribute columns.

    features : set or None
        Ignore entries which don't correspond to one of the supplied features
    """
    result = parse_gtf(
        filepath_or_buffer,
        chunksize=chunksize,
        features=features)
    attribute_values = result["attribute"]
    del result["attribute"]
    for column_name, values in expand_attribute_strings(
            attribute_values, usecols=restrict_attribute_columns).items():
        result[column_name] = values
    return result


def read_gtf(
        filepath_or_buffer,
        expand_attribute_column=True,
        infer_biotype_column=False,
        column_converters={},
        usecols=None,
        features=None,
        chunksize=1024 * 1024):
    """
    Parse a GTF into a dictionary mapping column names to sequences of values.

    Parameters
    ----------
    filepath_or_buffer : str or buffer object
        Path to GTF file (may be gzip compressed) or buffer object
        such as StringIO

    expand_attribute_column : bool
        Replace strings of semi-colon separated key-value values in the
        'attribute' column with one column per distinct key, with a list of
        values for each row (using None for rows where key didn't occur).

    infer_biotype_column : bool
        Due to the annoying ambiguity of the second GTF column across multiple
        Ensembl releases, figure out if an older GTF's source column is actually
        the gene_biotype or transcript_biotype.

    column_converters : dict, optional
        Dictionary mapping column names to conversion functions. Will replace
        empty strings with None and otherwise passes them to given conversion
        function.

    usecols : list of str or None
        Restrict which columns are loaded to the give set. If None, then
        load all columns.

    features : set of str or None
        Drop rows which aren't one of the features in the supplied set

    chunksize : int
    """
    if type(filepath_or_buffer) is str and not exists(filepath_or_buffer):
        raise ValueError("GTF file does not exist: %s" % filepath_or_buffer)

    if expand_attribute_column:
        result_df = parse_gtf_and_expand_attributes(
            filepath_or_buffer,
            chunksize=chunksize,
            restrict_attribute_columns=usecols,
            features=features)
    else:
        result_df = parse_gtf(result_df, features=features)

    for column_name, column_type in list(column_converters.items()):
        result_df[column_name] = [
            column_type(string_value) if len(string_value) > 0 else None
            for string_value
            in result_df[column_name]
        ]

    # Hackishly infer whether the values in the 'source' column of this GTF
    # are actually representing a biotype by checking for the most common
    # gene_biotype and transcript_biotype value 'protein_coding'
    if infer_biotype_column:
        unique_source_values = set(result_df["source"])
        if "protein_coding" in unique_source_values:
            column_names = set(result_df.columns)
            # Disambiguate between the two biotypes by checking if
            # gene_biotype is already present in another column. If it is,
            # the 2nd column is the transcript_biotype (otherwise, it's the
            # gene_biotype)
            if "gene_biotype" not in column_names:
                logging.info("Using column 'source' to replace missing 'gene_biotype'")
                result_df["gene_biotype"] = result_df["source"]
            if "transcript_biotype" not in column_names:
                logging.info("Using column 'source' to replace missing 'transcript_biotype'")
                result_df["transcript_biotype"] = result_df["source"]

    if usecols is not None:
        column_names = set(result_df.columns)
        valid_columns = [c for c in usecols if c in column_names]
        result_df = result_df[valid_columns]

    return result_df