File: mapReduce.py

package info (click to toggle)
python-deeptools 3.5.6%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 34,456 kB
  • sloc: python: 14,503; xml: 4,212; sh: 33; makefile: 5
file content (263 lines) | stat: -rw-r--r-- 9,786 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import multiprocessing
from deeptoolsintervals import GTF
import random

debug = 0


def mapReduce(staticArgs, func, chromSize,
              genomeChunkLength=None,
              region=None,
              bedFile=None,
              blackListFileName=None,
              numberOfProcessors=4,
              verbose=False,
              includeLabels=False,
              keepExons=False,
              transcriptID="transcriptID",
              exonID="exonID",
              transcript_id_designator="transcript_id",
              self_=None):
    """
    Split the genome into parts that are sent to workers using a defined
    number of procesors. Results are collected and returned.

    For each genomic region the given 'func' is called using
    the following parameters:

     chrom, start, end, staticArgs

    The *arg* are static, *pickable* variables that need to be sent
    to workers.

    The genome chunk length corresponds to a fraction of the genome, in bp,
    that is send to each of the workers for processing.

    Depending on the type of process a larger or shorter regions may be
    preferred

    :param chromSize: A list of duples containing the chromosome
                      name and its length
    :param region: The format is chr:start:end:tileSize (see function
                   getUserRegion)
    :param staticArgs: tuple of arguments that are sent to the given 'func'

    :param func: function to call. The function is called using the
                 following parameters (chrom, start, end, staticArgs)
    :param bedFile: Is a bed file is given, the args to the func to be
                    called are extended to include a list of bed
                    defined regions.
    :param blackListFileName: A list of regions to exclude from all computations.
                              Note that this has genomeChunkLength resolution...
    :param self_: In case mapreduce should make a call to an object
                  the self variable has to be passed.
    :param includeLabels: Pass group and transcript labels into the calling
                          function. These are added to the static args
                          (groupLabel and transcriptName).

    If "includeLabels" is true, a tuple of (results, labels) is returned
    """

    if not genomeChunkLength:
        genomeChunkLength = 1e5
    genomeChunkLength = int(genomeChunkLength)

    if verbose:
        print("genome partition size for multiprocessing: {0}".format(
            genomeChunkLength))

    region_start = 0
    region_end = None

    # if a region is set, that means that the task should only cover
    # the given genomic position

    if region:
        chromSize, region_start, region_end, genomeChunkLength = getUserRegion(chromSize, region)
        if verbose:
            print("chrom size: {0}, region start: {1}, region end: {2}, "
                  "genome chunk length sent to each procesor: {3}".format(chromSize, region_start, region_end, genomeChunkLength))

    if bedFile:
        defaultGroup = None
        if len(bedFile) == 1:
            defaultGroup = "genes"
        bed_interval_tree = GTF(bedFile, defaultGroup=defaultGroup, transcriptID=transcriptID, exonID=exonID, transcript_id_designator=transcript_id_designator, keepExons=keepExons)

    if blackListFileName:
        blackList = GTF(blackListFileName)

    TASKS = []
    # iterate over all chromosomes
    for chrom, size in chromSize:
        # the start is zero unless a specific region is defined
        start = 0 if region_start == 0 else region_start
        for startPos in range(start, size, genomeChunkLength):
            endPos = min(size, startPos + genomeChunkLength)

            # Reject a chunk if it overlaps
            if blackListFileName:
                regions = blSubtract(blackList, chrom, [startPos, endPos])
            else:
                regions = [[startPos, endPos]]

            for reg in regions:
                if self_ is not None:
                    argsList = [self_]
                else:
                    argsList = []

                argsList.extend([chrom, reg[0], reg[1]])
                # add to argument list the static list received the the function
                argsList.extend(staticArgs)

                # if a bed file is given, append to the TASK list,
                # a list of bed regions that overlap with the
                # current genomeChunk.
                if bedFile:
                    # This effectively creates batches of intervals, which is
                    # generally more performant due to the added overhead of
                    # initializing additional workers.

                    # TODO, there's no point in including the chromosome
                    if includeLabels:
                        bed_regions_list = [[chrom, x[4], x[2], x[3], x[5], x[6]] for x in bed_interval_tree.findOverlaps(chrom, reg[0], reg[1], trimOverlap=True, numericGroups=True, includeStrand=True)]
                    else:
                        bed_regions_list = [[chrom, x[4], x[5], x[6]] for x in bed_interval_tree.findOverlaps(chrom, reg[0], reg[1], trimOverlap=True, includeStrand=True)]

                    if len(bed_regions_list) == 0:
                        continue
                    # add to argument list, the position of the bed regions to use
                    argsList.append(bed_regions_list)

                TASKS.append(tuple(argsList))

    if len(TASKS) > 1 and numberOfProcessors > 1:
        if verbose:
            print(("using {} processors for {} "
                   "number of tasks".format(numberOfProcessors,
                                            len(TASKS))))
        random.shuffle(TASKS)
        pool = multiprocessing.Pool(numberOfProcessors)
        res = pool.map_async(func, TASKS).get(9999999)
        pool.close()
        pool.join()
    else:
        res = list(map(func, TASKS))

    if includeLabels:
        if bedFile:
            return res, bed_interval_tree.labels
        else:
            return res, None
    return res


def getUserRegion(chrom_sizes, region_string, max_chunk_size=1e6):
    r"""
    Verifies if a given region argument, given by the user
    is valid. The format of the region_string is chrom:start:end:tileSize
    where start, end and tileSize are optional.

    :param chrom_sizes: dictionary of chromosome/scaffold size. Key=chromosome name
    :param region_string: a string of the form chr:start:end
    :param max_chunk_size: upper limit for the chunk size
    :return: tuple chrom_size for the region start, region end, chunk size

    #>>> data = getUserRegion({'chr2': 1000}, "chr1:10:10")
    #Traceback (most recent call last):
    #    ...
    #NameError: Unknown chromosome: chr1
    #Known chromosomes are: ['chr2']

    If the region end is biger than the chromosome size, this
    value is used instead
    >>> getUserRegion({'chr2': 1000}, "chr2:10:1001")
    ([('chr2', 1000)], 10, 1000, 990)

    Test chunk and regions size reduction to match tile size
    >>> getUserRegion({'chr2': 200000}, "chr2:10:123344:3")
    ([('chr2', 123344)], 9, 123345, 123336)

    Test chromosome name mismatch
    >>> getUserRegion({'2': 200000}, "chr2:10:123344:3")
    ([('2', 123344)], 9, 123345, 123336)
    >>> getUserRegion({'chrM': 200000}, "MT:10:123344:3")
    ([('chrM', 123344)], 9, 123345, 123336)
    """
    region = region_string.split(":")
    chrom = region[0]
    chrom_sizes = dict(chrom_sizes)

    if chrom not in list(chrom_sizes.keys()):
        if chrom == "MT":
            chromUse = "chrM"
        elif chrom == "chrM":
            chromUse = "MT"
        elif chrom[0:3] == "chr":
            chromUse = chrom[3:]
        else:
            chromUse = "chr" + chrom
        if chromUse not in list(chrom_sizes.keys()):
            raise NameError("Unknown chromosome: %s\nKnown "
                            "chromosomes are: %s " % (chrom, list(chrom_sizes.keys())))
        chrom = chromUse
    try:
        region_start = int(region[1])
    except IndexError:
        region_start = 0
    try:
        region_end = int(region[2]) if int(region[2]) <= chrom_sizes[chrom] \
            else chrom_sizes[chrom]
    except IndexError:
        region_end = chrom_sizes[chrom]
    if region_start > region_end or region_start < 0:
        raise NameError("{} not valid. The format is chrom:start:end. "
                        "Without comas, dashes or dots. ".format(region_string))
    try:
        tilesize = int(region[3])
    except IndexError:
        tilesize = None

    chrom_sizes = [(chrom, region_end)]

    # if tilesize is given, make region_start and region_end
    # multiple of tileSize
    if tilesize:
        region_start -= region_start % tilesize
        region_end += tilesize - (region_end % tilesize)

    chunk_size = int(region_end - region_start)
    if chunk_size > max_chunk_size:
        chunk_size = max_chunk_size
        if tilesize and tilesize < chunk_size:
            chunk_size -= chunk_size % tilesize

    return chrom_sizes, region_start, region_end, int(chunk_size)


def blSubtract(t, chrom, chunk):
    """
    If a genomic region overlaps with a blacklisted region, then subtract that region out

    returns a list of lists
    """

    if t is None:
        return [chunk]

    overlaps = t.findOverlaps(chrom, chunk[0], chunk[1])
    if overlaps is not None and len(overlaps) > 0:
        output = []
        for o in overlaps:
            if chunk[1] <= chunk[0]:
                break
            if chunk[0] < o[0]:
                output.append([chunk[0], o[0]])
            chunk[0] = o[1]
        if chunk[0] < chunk[1]:
            output.append([chunk[0], chunk[1]])
    else:
        output = [chunk]

    return output