File: construction.py

package info (click to toggle)
python-bioframe 0.4.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,000 kB
  • sloc: python: 5,860; makefile: 38; sh: 13
file content (337 lines) | stat: -rw-r--r-- 11,444 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import pandas as pd
import numpy as np
from .specs import _get_default_colnames, _verify_columns, is_chrom_dtype
from .stringops import parse_region_string, to_ucsc_string, is_complete_ucsc_string
from . import checks

__all__ = [
    "from_dict",
    "from_series",
    "from_list",
    "from_any",
    "make_viewframe",
    "sanitize_bedframe",
]

### conversions from various input formats into dataframes ###


def from_dict(regions, cols=None):
    """
    Makes a dataframe from a dictionary of {str,int} pairs, interpreted as chromosome names.

    Note that {str,(int,int)} dictionaries of tuples are no longer supported!

    Parameters
    ----------

    regions : dict

    name_col : str
        Default 'name'.

    cols : (str, str, str) or None
        The names of columns containing the chromosome, start and end of the
        genomic intervals, provided separately for each set. The default
        values are 'chrom', 'start', 'end'.

    Returns
    -------
    df : pandas.DataFrame
    """
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols
    data = []
    for k, v in dict(regions).items():
        chrom = k
        if np.isscalar(v):
            start = 0
            end = v
        else:
            raise ValueError("Unsupported dict format: {type(v)}")
        data.append([chrom, start, end])
    return pd.DataFrame(data, columns=[ck1, sk1, ek1])


def from_series(regions, cols=None):
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols
    chroms = regions.index.values
    data = {ck1: chroms, sk1: 0, ek1: regions.values}
    return pd.DataFrame(data)


def from_list(regions, name_col="name", cols=None):
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols
    df = pd.DataFrame(regions)
    if df.shape[1] == 3:
        df.columns = [ck1, sk1, ek1]
    elif df.shape[1] == 4:
        df.columns = [ck1, sk1, ek1, name_col]
    else:
        raise ValueError("wrong number of columns for list input format")
    return df


def from_ucsc_string_list(region_list, cols=None):
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols
    parsed = [parse_region_string(i) for i in region_list]
    df = pd.DataFrame(parsed, columns=[ck1, sk1, ek1])
    return df


def from_any(regions, fill_null=False, name_col="name", cols=None):
    """
    Attempts to make a genomic interval dataframe with columns [chr, start, end, name_col] from a variety of input types.

    Parameters
    ----------
    regions : supported input
        Currently supported inputs:

            - dataframe
            - series of UCSC strings
            - dictionary of {str:int} key value pairs
            - pandas series where the index is interpreted as chromosomes and values are interpreted as end
            - list of tuples or lists, either [(chrom,start,end)] or [(chrom,start,end,name)]
            - tuple of tuples or lists, either [(chrom,start,end)] or [(chrom,start,end,name)]

    fill_null : False or dictionary
        Accepts a dictionary of {str:int} pairs, interpreted as chromosome sizes.
        Kept or backwards compatibility. Default False.

    name_col : str
        Column name. Only used if 4 column list is provided. Default "name".

    cols : (str,str,str)
        Names for dataframe columns.
        Default None sets them with get_default_colnames().

    Returns
    -------
    out_df:dataframe

    """
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols

    if type(regions) is pd.core.frame.DataFrame:
        if set([ck1, sk1, ek1]).issubset(regions.columns):
            out_df = regions.copy()
        elif (len(regions[name_col].values.shape) == 1) and is_complete_ucsc_string(
            regions[name_col].values[0]
        ):
            out_df = from_ucsc_string_list(
                regions[name_col].values, cols=[ck1, sk1, ek1]
            )
        else:
            raise ValueError("Unknown dataFrame format: check column names")

    elif type(regions) is dict:
        out_df = from_dict(regions, cols=[ck1, sk1, ek1])

    elif type(regions) is pd.core.series.Series:
        out_df = from_series(regions, cols=[ck1, sk1, ek1])

    elif type(regions) is tuple:
        if np.shape(regions) == (3,):
            out_df = from_list([regions], name_col=name_col, cols=[ck1, sk1, ek1])

        elif len(np.shape(regions)) == 1 and type(regions[0]) is str:
            out_df = from_ucsc_string_list(regions, cols=[ck1, sk1, ek1])
        else:
            out_df = from_list(list(regions), name_col=name_col, cols=[ck1, sk1, ek1])

    elif type(regions) is list:
        if np.shape(regions) == (3,):
            out_df = from_list([regions], name_col=name_col, cols=[ck1, sk1, ek1])
        elif len(np.shape(regions)) == 1 and type(regions[0]) is str:
            out_df = from_ucsc_string_list(regions, cols=[ck1, sk1, ek1])
        else:
            out_df = from_list(regions, name_col=name_col, cols=[ck1, sk1, ek1])
    else:
        raise ValueError(f"Unknown input format: {type(regions)}")

    if fill_null:
        try:
            out_df[sk1].fillna(0, inplace=True)
            ends = []
            for i in range(len(out_df)):
                if out_df[ek1].values[i] is None:
                    ends.append(fill_null[out_df[ck1].values[i]])
                else:
                    ends.append(out_df[ek1].values[i])
            out_df[ek1] = ends
        except:
            raise ValueError("could not fill ends with provided chromsizes")

    return out_df


def add_ucsc_name_column(reg_df, name_col="name", cols=None):
    """
    Auto-creates a UCSC name 'chrom:start-end' for each region (chrom,start,end) in reg_df.

    Replaces name_col if it exists.



    """
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols
    df = reg_df.copy()
    _verify_columns(df, [ck1, sk1, ek1])
    data = zip(df[ck1], df[sk1], df[ek1])
    df[name_col] = [to_ucsc_string(i) for i in data]
    return df


def make_viewframe(
    regions,
    check_bounds=None,
    name_style=None,
    view_name_col="name",
    cols=None,
):
    """
    Makes and validates a dataframe `view_df` out of regions.

    Parameters
    ----------
    regions : supported input type
        Currently supported input types:

            - a dictionary where keys are strings and values are integers {str:int}, specifying regions (chrom, 0, end, chrom)
            - a pandas series of chromosomes lengths with index specifying region names
            - a list of tuples [(chrom,start,end), ...] or [(chrom,start,end,name), ...]
            - a pandas DataFrame, skips to validation step

    name_style : None or "ucsc"
        If None and no column view_name_col, propagate values from cols[0]
        If "ucsc" and no column view_name_col, create UCSC style names

    check_bounds : None, or chromosome sizes provided as any of valid formats above
        Optional, if provided checks if regions in the view are contained by regions
        supplied in check_bounds, typically provided as a series of chromosome sizes.
        Default None.

    view_name_col : str
        Specifies column name of the view regions. Default 'name'.

    cols : (str, str, str) or None
        The names of columns containing the chromosome, start and end of the
        genomic intervals, provided separately for each set. The default
        values are 'chrom', 'start', 'end'.

    Returns
    -------
    view_df:dataframe satisfying properties of a view

    """
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols

    view_df = from_any(regions, name_col=view_name_col, cols=cols)

    if check_bounds is not None:
        bounds_df = from_any(check_bounds, name_col="bounds", cols=cols)
        if not checks.is_contained(
            view_df,
            bounds_df,
            df_view_col=None,
            view_name_col="bounds",
            cols=cols,
        ):
            raise ValueError(
                "Invalid input to make a viewFrame, regions not contained by bounds"
            )

    if not view_name_col in view_df.columns:
        if name_style is None:
            view_df[view_name_col] = view_df[ck1].values
        elif name_style.lower() == "ucsc":
            view_df = add_ucsc_name_column(view_df, name_col=view_name_col, cols=cols)
        else:
            raise ValueError("unknown value for name_style")

    if checks.is_viewframe(
        view_df, view_name_col=view_name_col, cols=cols, raise_errors=True
    ):
        return view_df
    else:
        raise ValueError("could not make valid viewFrame, retry with new input")


def sanitize_bedframe(
    df1,
    recast_dtypes=True,
    drop_null=False,
    start_exceed_end_action=None,
    cols=None,
):
    """
    Attempts to clean a genomic interval dataframe to be a valid bedframe.

    Parameters
    ----------
    df1 : pandas.DataFrame

    recast_dtypes : bool
        Whether to attempt to recast column dtypes to pandas nullable dtypes.

    drop_null : bool
        Drops rows with pd.NA. Default False.

    start_exceed_end_action : str or None
        Options: 'flip' or 'drop' or None. Default None.

            - If 'flip', attempts to sanitize by flipping intervals with start>end.
            - If 'drop' attempts to sanitize dropping intervals with start>end.
            - If None, does not alter these intervals if present.

    cols : (str, str, str) or None
        The names of columns containing the chromosome, start and end of the
        genomic intervals, provided separately for each set. The default
        values are 'chrom', 'start', 'end'.

    Returns
    -------
    out_df : pandas.DataFrame
        Sanitized dataframe satisfying the properties of a bedframe.

    Notes
    ------
    The option ``start_exceed_end_action='flip'`` may be useful for gff files with strand information but starts > ends.

    """
    ck1, sk1, ek1 = _get_default_colnames() if cols is None else cols

    out_df = df1.copy()

    _verify_columns(out_df, [ck1, sk1, ek1])

    if recast_dtypes:
        chrom_dtype, start_dtype, end_dtype = out_df.dtypes[[ck1, sk1, ek1]]
        if not is_chrom_dtype(chrom_dtype):
            out_df[ck1] = out_df[ck1].astype(str)
        if not ((start_dtype is pd.Int64Dtype()) and (end_dtype is pd.Int64Dtype())):
            out_df[sk1] = out_df[sk1].astype(pd.Int64Dtype())
            out_df[ek1] = out_df[ek1].astype(pd.Int64Dtype())

    nan_intervals = pd.isnull(out_df[[ck1, sk1, ek1]]).any(axis=1)
    out_df.loc[nan_intervals, [ck1, sk1, ek1]] = pd.NA
    if drop_null:
        out_df.dropna(axis=0, inplace=True)
        out_df.reset_index(drop=True, inplace=True)

    if start_exceed_end_action is not None:
        start_exceed_end_action = start_exceed_end_action.lower()
        if ((out_df[ek1] - out_df[sk1]) < 0).any():
            inds = ((out_df[ek1] - out_df[sk1]) < 0).values
            if start_exceed_end_action == "drop":
                out_df = out_df.loc[inds == 0]
            elif start_exceed_end_action == "flip":
                out_df.loc[inds, [sk1, ek1]] = out_df.loc[inds, [ek1, sk1]].values
            else:
                raise ValueError("unknown action for intervals with start>end")
            out_df.reset_index(drop=True, inplace=True)

    if checks.is_bedframe(out_df, cols=cols):
        return out_df
    else:
        raise ValueError("could not sanitize")