File: nanoget.py

package info (click to toggle)
python-nanoget 1.19.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 25,876 kB
  • sloc: python: 643; sh: 25; makefile: 9
file content (164 lines) | stat: -rw-r--r-- 6,334 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
This module provides functions to extract useful metrics
from Oxford Nanopore sequencing reads and alignments.


Data can be presented in the following formats, using the following functions:
- A sorted bam file
  process_bam(bamfile, threads)
- A standard fastq file
  process_fastq_plain(fastqfile, 'threads')
- A fastq file with metadata from MinKNOW or Albacore
  process_fastq_rich(fastqfile)
- A sequencing_summary file generated by Albacore
  process_summary(sequencing_summary.txt, 'readtype')

Fastq files can be compressed using gzip, bzip2 or bgzip.
The data is returned as a pandas DataFrame with standardized headernames for convenient extraction.
The functions perform logging while being called and extracting data.
"""

import sys
import logging
import pandas as pd
from functools import partial
import concurrent.futures as cfutures
import nanoget.extraction_functions as ex


def get_input(
    source,
    files,
    threads=4,
    readtype="1D",
    combine="simple",
    names=None,
    barcoded=False,
    huge=False,
    keep_supp=True,
):
    """Get input and process accordingly.

    Data can be:
    - a uncompressed, bgzip, bzip2 or gzip compressed fastq file
    - a uncompressed, bgzip, bzip2 or gzip compressed fasta file
    - a rich fastq containing additional key=value information in the description,
      as produced by MinKNOW and albacore with the same compression options as above
    - a sorted bam file
    - a sorted cram file
    - a (compressed) sequencing_summary.txt file generated by albacore/guppy

    If data is huge=True then no parallelization (using concurrent.futures) is used,
     as pickling these dataframes leads to crashes

    Handle is passed to the proper functions to get DataFrame with metrics
    Multiple files of the same type can be used to extract info from, which is done in parallel
    Arguments:
    - source: defines the input data type and the function that needs to be called
    - files: is a list of one or more files to operate on, from the type of <source>
    - threads: is the amount of workers which can be used
    - readtype: (only relevant for summary input) and specifies which columns have to be extracted
    - combine: is either 'simple' or 'track', with the difference that  with 'track' an additional
      field is created with the name of the dataset
    - names: if combine="track", the names to be used for the datasets. Needs to have same length as
      files, or None
    """
    proc_functions = {
        "fastq": ex.process_fastq_plain,
        "fasta": ex.process_fasta,
        "bam": ex.process_bam,
        "summary": ex.process_summary,
        "fastq_rich": ex.process_fastq_rich,
        "fastq_minimal": ex.process_fastq_minimal,
        "cram": ex.process_cram,
        "ubam": ex.process_ubam,
    }

    if source not in proc_functions.keys():
        logging.error("nanoget: Unsupported data source: {}".format(source))
        sys.exit("Unsupported data source: {}".format(source))
    filethreads = min(len(files), threads)
    threadsleft = threads - filethreads or 1
    if huge:
        logging.info("nanoget: Running with a single huge input file.")
        if not len(files) == 1:
            logging.error("nanoget: Using multiple huge input files is currently not supported.")
            sys.exit(
                "Using multiple huge input files is currently not supported.\n"
                "Please let me know on GitHub if that's of interest for your application.\n"
            )

        datadf = proc_functions[source](
            files[0],
            threads=threadsleft,
            readtype=readtype,
            barcoded=barcoded,
            keep_supp=keep_supp,
            huge=True,
        )
    else:
        with cfutures.ProcessPoolExecutor(max_workers=filethreads) as executor:
            extraction_function = partial(
                proc_functions[source],
                threads=threadsleft,
                readtype=readtype,
                barcoded=barcoded,
                keep_supp=keep_supp,
                huge=False,
            )
            datadf = combine_dfs(
                dfs=[out for out in executor.map(extraction_function, files)],
                names=names or files,
                method=combine,
            )
    if "readIDs" in datadf.columns and pd.isna(datadf["readIDs"]).any():
        datadf.drop("readIDs", axis="columns", inplace=True)
    datadf = calculate_start_time(datadf)
    logging.info("Nanoget: Gathered all metrics of {} reads".format(len(datadf)))
    if len(datadf) == 0:
        logging.critical("Nanoget: no reads retrieved.")
        sys.exit("Fatal: No reads found in input.")
    else:
        return datadf


def combine_dfs(dfs, names=None, method="simple"):
    """Combine dataframes.

    Combination is either done simple by just concatenating the DataFrames
    or performs tracking by adding the name of the dataset as a column."""
    if method == "track":
        return pd.concat([df.assign(dataset=n) for df, n in zip(dfs, names)], ignore_index=True)
    elif method == "simple":
        return pd.concat(dfs, ignore_index=True)


def calculate_start_time(df):
    """Calculate the start_time per read.

    Time data is either
    a "time" (in seconds, derived from summary files) or
    a "timestamp" (in UTC, derived from fastq_rich format)
    and has to be converted appropriately in a datetime format time_arr

    For both the time_zero is the minimal value of the time_arr,
    which is then used to subtract from all other times

    In the case of method=track (and dataset is a column in the df) then this
    subtraction is done per dataset
    """
    if "time" in df.columns:
        df["time_arr"] = pd.Series(df["time"], dtype="datetime64[s]")
    elif "timestamp" in df.columns:
        df["time_arr"] = df["timestamp"]
    else:
        return df
    if "dataset" in df.columns:
        for dset in df["dataset"].unique():
            time_zero = df.loc[df["dataset"] == dset, "time_arr"].min()
            df.loc[df["dataset"] == dset, "start_time"] = (
                df.loc[df["dataset"] == dset, "time_arr"] - time_zero
            )
    else:
        df["start_time"] = df["time_arr"] - df["time_arr"].min()
    return df.drop(["time", "timestamp", "time_arr"], axis=1, errors="ignore")