File: coverage.py

package info (click to toggle)
cnvkit 0.9.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 96,464 kB
  • sloc: python: 12,407; makefile: 263; sh: 84; xml: 38
file content (257 lines) | stat: -rw-r--r-- 8,897 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
"""Supporting functions for the 'antitarget' command."""
import logging
import math
import os.path
import time
from concurrent import futures
from io import StringIO

import numpy as np
import pandas as pd
import pysam
from skgenome import tabio

from . import core, samutil
from .cnary import CopyNumArray as CNA
from .parallel import rm, to_chunks
from .params import NULL_LOG2_COVERAGE


def do_coverage(
    bed_fname, bam_fname, by_count=False, min_mapq=0, processes=1, fasta=None
):
    """Calculate coverage in the given regions from BAM read depths."""
    if not samutil.ensure_bam_sorted(bam_fname, fasta=fasta):
        raise RuntimeError(f"BAM file {bam_fname} must be sorted by coordinates")
    samutil.ensure_bam_index(bam_fname)
    # ENH: count importers.TOO_MANY_NO_COVERAGE & warn
    cnarr = interval_coverages(
        bed_fname, bam_fname, by_count, min_mapq, processes, fasta
    )
    return cnarr


def interval_coverages(bed_fname, bam_fname, by_count, min_mapq, processes, fasta=None):
    """Calculate log2 coverages in the BAM file at each interval."""
    meta = {"sample_id": core.fbase(bam_fname)}
    start_time = time.time()

    # Skip processing if the BED file is empty
    with open(bed_fname) as bed_handle:
        for line in bed_handle:
            if line.strip():
                break
        else:
            logging.info(
                "Skip processing %s with empty regions file %s",
                os.path.basename(bam_fname),
                bed_fname,
            )
            return CNA.from_rows([], meta_dict=meta)

    # Calculate average read depth in each bin
    if by_count:
        results = interval_coverages_count(
            bed_fname, bam_fname, min_mapq, processes, fasta
        )
        read_counts, cna_rows = zip(*results)
        read_counts = pd.Series(read_counts)
        cnarr = CNA.from_rows(
            list(cna_rows), columns=CNA._required_columns + ("depth",), meta_dict=meta
        )
    else:
        table = interval_coverages_pileup(
            bed_fname, bam_fname, min_mapq, processes, fasta
        )
        read_len = samutil.get_read_length(bam_fname, fasta=fasta)
        read_counts = table["basecount"] / read_len
        table = table.drop("basecount", axis=1)
        cnarr = CNA(table, meta)

    # Log some stats
    tot_time = time.time() - start_time
    tot_reads = read_counts.sum()
    logging.info(
        "Time: %.3f seconds (%d reads/sec, %s bins/sec)",
        tot_time,
        int(round(tot_reads / tot_time, 0)),
        int(round(len(read_counts) / tot_time, 0)),
    )
    logging.info(
        "Summary: #bins=%d, #reads=%d, mean=%.4f, min=%s, max=%s",
        len(read_counts),
        tot_reads,
        (tot_reads / len(read_counts)),
        read_counts.min(),
        read_counts.max(),
    )
    tot_mapped_reads = samutil.bam_total_reads(bam_fname, fasta=fasta)
    if tot_mapped_reads:
        logging.info(
            "Percent reads in regions: %.3f (of %d mapped)",
            100.0 * tot_reads / tot_mapped_reads,
            tot_mapped_reads,
        )
    else:
        logging.info("(Couldn't calculate total number of mapped reads)")

    return cnarr


def interval_coverages_count(bed_fname, bam_fname, min_mapq, procs=1, fasta=None):
    """Calculate log2 coverages in the BAM file at each interval."""
    regions = tabio.read_auto(bed_fname)
    if procs == 1:
        bamfile = pysam.AlignmentFile(bam_fname, "rb", reference_filename=fasta)
        for chrom, subregions in regions.by_chromosome():
            logging.info(
                "Processing chromosome %s of %s", chrom, os.path.basename(bam_fname)
            )
            for count, row in _rdc_chunk(bamfile, subregions, min_mapq):
                yield [count, row]
    else:
        with futures.ProcessPoolExecutor(procs) as pool:
            args_iter = (
                (bam_fname, subr, min_mapq, fasta)
                for _c, subr in regions.by_chromosome()
            )
            for chunk in pool.map(_rdc, args_iter):
                for count, row in chunk:
                    yield [count, row]


def _rdc(args):
    """Wrapper for parallel."""
    return list(_rdc_chunk(*args))


def _rdc_chunk(bamfile, regions, min_mapq, fasta=None):
    if isinstance(bamfile, str):
        bamfile = pysam.AlignmentFile(bamfile, "rb", reference_filename=fasta)
    for chrom, start, end, gene in regions.coords(["gene"]):
        yield region_depth_count(bamfile, chrom, start, end, gene, min_mapq)


def region_depth_count(bamfile, chrom, start, end, gene, min_mapq):
    """Calculate depth of a region via pysam count.

    i.e. counting the number of read starts in a region, then scaling for read
    length and region width to estimate depth.

    Coordinates are 0-based, per pysam.
    """

    def filter_read(read):
        """True if the given read should be counted towards coverage."""
        return not (
            read.is_duplicate
            or read.is_secondary
            or read.is_unmapped
            or read.is_qcfail
            or read.mapq < min_mapq
        )

    count = 0
    bases = 0
    for read in bamfile.fetch(reference=chrom, start=start, end=end):
        if filter_read(read):
            count += 1
            # Only count the bases aligned to the region
            bases += sum(1 for p in read.positions if start <= p < end)
    depth = bases / (end - start) if end > start else 0
    row = (
        chrom,
        start,
        end,
        gene,
        math.log(depth, 2) if depth else NULL_LOG2_COVERAGE,
        depth,
    )
    return count, row


def interval_coverages_pileup(bed_fname, bam_fname, min_mapq, procs=1, fasta=None):
    """Calculate log2 coverages in the BAM file at each interval."""
    logging.info("Processing reads in %s", os.path.basename(bam_fname))
    if procs == 1:
        table = bedcov(bed_fname, bam_fname, min_mapq, fasta)
    else:
        chunks = []
        with futures.ProcessPoolExecutor(procs) as pool:
            args_iter = (
                (bed_chunk, bam_fname, min_mapq, fasta)
                for bed_chunk in to_chunks(bed_fname)
            )
            for bed_chunk_fname, table in pool.map(_bedcov, args_iter):
                chunks.append(table)
                rm(bed_chunk_fname)
        table = pd.concat(chunks, ignore_index=True)
    # Fill in CNA required columns
    if "gene" in table:
        table["gene"] = table["gene"].fillna("-")
    else:
        table["gene"] = "-"
    # User-supplied bins might be zero-width or reversed -- skip those
    spans = table.end - table.start
    ok_idx = spans > 0
    table = table.assign(depth=0.0, log2=NULL_LOG2_COVERAGE)
    table.loc[ok_idx, "depth"] = table.loc[ok_idx, "basecount"] / spans[ok_idx]
    ok_idx = table["depth"] > 0
    table.loc[ok_idx, "log2"] = np.log2(table.loc[ok_idx, "depth"])
    return table


def _bedcov(args):
    """Wrapper for parallel."""
    bed_fname = args[0]
    table = bedcov(*args)
    return bed_fname, table


def bedcov(bed_fname, bam_fname, min_mapq, fasta=None):
    """Calculate depth of all regions in a BED file via samtools (pysam) bedcov.

    i.e. mean pileup depth across each region.
    """
    # Count bases in each region; exclude low-MAPQ reads
    cmd = [bed_fname, bam_fname]
    if min_mapq and min_mapq > 0:
        cmd.extend(["-Q", str(min_mapq)])
    if fasta:
        cmd.extend(["--reference", fasta])
    try:
        raw = pysam.bedcov(*cmd, split_lines=False)
    except pysam.SamtoolsError as exc:
        raise ValueError(
            f"Failed processing {bam_fname!r} coverages in {bed_fname!r} regions. "
            f"PySAM error: {exc}"
        ) from exc
    if not raw:
        raise ValueError(
            f"BED file {bed_fname!r} chromosome names don't match any in "
            f"BAM file {bam_fname!r}"
        )
    columns = detect_bedcov_columns(raw)
    table = pd.read_csv(StringIO(raw), sep="\t", names=columns, usecols=columns)
    return table


def detect_bedcov_columns(text):
    """Determine which 'bedcov' output columns to keep.

    Format is the input BED plus a final appended column with the count of
    basepairs mapped within each row's region. The input BED might have 3
    columns (regions without names), 4 (named regions), or more (arbitrary
    columns after 'gene').
    """
    firstline = text[: text.index("\n")]
    tabcount = firstline.count("\t")
    if tabcount < 3:
        raise RuntimeError(f"Bad line from bedcov:\n{firstline!r}")
    if tabcount == 3:
        return ["chromosome", "start", "end", "basecount"]
    if tabcount == 4:
        return ["chromosome", "start", "end", "gene", "basecount"]
    # Input BED has arbitrary columns after 'gene' -- ignore them
    fillers = [f"_{i}" for i in range(1, tabcount - 3)]
    return ["chromosome", "start", "end", "gene"] + fillers + ["basecount"]