File: aggregate_scores_in_intervals.py

package info (click to toggle)
python-bx 0.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,000 kB
  • sloc: python: 17,136; ansic: 2,326; makefile: 24; sh: 8
file content (140 lines) | stat: -rwxr-xr-x 4,084 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/python3

"""
Given a list of intervals in BED format (`interval_file`) and a set of scores
(`score_file`) print each interval plus the average, minimum, and maximum of
the scores that fall in that interval. Scores can either be wiggle format
data or a directory containing binned array files (named according to the
sequence source / chromosome of the intervals).

usage: %prog score_file interval_file [out_file] [options]
    -b, --binned: 'score_file' is actually a directory of binned array files
    -m, --mask=FILE: bed file containing regions not to consider valid
"""

import os
import os.path
import sys
from collections.abc import Mapping

import bx.wiggle
from bx import misc
from bx.binned_array import (
    BinnedArray,
    FileBinnedArray,
)
from bx.bitset_builders import binned_bitsets_from_file
from bx.cookbook import doc_optparse
from bx_extras.fpconst import isNaN


class FileBinnedArrayDir(Mapping):
    """
    Adapter that makes a directory of FileBinnedArray files look like
    a regular dict of BinnedArray objects.
    """

    def __init__(self, dir):
        self.dir = dir
        self.cache = {}

    def __getitem__(self, key):
        value = None
        if key in self.cache:
            value = self.cache[key]
        else:
            fname = os.path.join(self.dir, f"{key}.ba")
            if os.path.exists(fname):
                value = FileBinnedArray(open(fname))
                self.cache[key] = value
        if value is None:
            raise KeyError("File does not exist: " + fname)
        return value

    def __iter__(self):
        raise NotImplementedError()

    def __len__(self):
        raise NotImplementedError()


def load_scores_wiggle(fname):
    """
    Read a wiggle file and return a dict of BinnedArray objects keyed
    by chromosome.
    """
    scores_by_chrom = {}
    for chrom, pos, val in bx.wiggle.Reader(misc.open_compressed(fname)):
        if chrom not in scores_by_chrom:
            scores_by_chrom[chrom] = BinnedArray()
        scores_by_chrom[chrom][pos] = val
    return scores_by_chrom


def load_scores_ba_dir(dir):
    """
    Return a dict-like object (keyed by chromosome) that returns
    FileBinnedArray objects created from "key.ba" files in `dir`
    """
    return FileBinnedArrayDir(dir)


def main():
    # Parse command line
    options, args = doc_optparse.parse(__doc__)
    try:
        score_fname = args[0]
        interval_fname = args[1]
        if len(args) > 2:
            out_file = open(args[2], "w")
        else:
            out_file = sys.stdout
        binned = bool(options.binned)
        mask_fname = options.mask
    except Exception:
        doc_optparse.exit()

    if binned:
        scores_by_chrom = load_scores_ba_dir(score_fname)
    else:
        scores_by_chrom = load_scores_wiggle(score_fname)

    if mask_fname:
        masks = binned_bitsets_from_file(open(mask_fname))
    else:
        masks = None

    for line in open(interval_fname):
        fields = line.split()
        chrom, start, stop = fields[0], int(fields[1]), int(fields[2])
        total = 0
        count = 0
        min_score = 100000000
        max_score = -100000000
        for i in range(start, stop):
            if chrom in scores_by_chrom and scores_by_chrom[chrom][i]:
                # Skip if base is masked
                if masks and chrom in masks:
                    if masks[chrom][i]:
                        continue
                # Get the score, only count if not 'nan'
                score = scores_by_chrom[chrom][i]
                if not isNaN(score):
                    total += score
                    count += 1
                    max_score = max(score, max_score)
                    min_score = min(score, min_score)
        if count > 0:
            avg = total / count
        else:
            avg = "nan"
            min_score = "nan"
            max_score = "nan"

        print("\t".join(map(str, [chrom, start, stop, avg, min_score, max_score])), file=out_file)

    out_file.close()


if __name__ == "__main__":
    main()