File: dask.py

package info (click to toggle)
python-cooler 0.9.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 32,596 kB
  • sloc: python: 10,555; makefile: 198; sh: 31
file content (186 lines) | stat: -rw-r--r-- 5,737 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from itertools import product
from math import ceil

import h5py
import numpy as np
import pandas as pd

try:
    from sparse import COO
except ImportError:
    raise ImportError("The 'sparse' package is required to use dask") from None

import dask.array as da
import dask.dataframe as dd
from dask.base import tokenize

from ..core import CSRReader, query_rect
from ..util import parse_cooler_uri, partition


def _get_group_info(path, grouppath, keys):
    with h5py.File(path, "r") as f:
        grp = f[grouppath]

        if keys is None:
            keys = list(grp.keys())

        nrows = len(grp[keys[0]])

        categoricals = {}
        for key in keys:
            dt = h5py.check_dtype(enum=grp[key].dtype)
            if dt is not None:
                categoricals[key] = sorted(dt, key=dt.__getitem__)

        # Meta is an empty dataframe that serves as a compound "dtype"
        meta = pd.DataFrame(
            {key: np.array([], dtype=grp[key].dtype) for key in keys}, columns=keys
        )

        for key in categoricals:
            meta[key] = pd.Categorical([], categories=categoricals[key], ordered=True)

    return nrows, keys, meta, categoricals


def _slice_dataset(filepath, grouppath, key, slc, lock=None):
    try:
        if lock is not None:
            lock.acquire()
        with h5py.File(filepath, "r") as f:
            return f[grouppath][key][slc]
    finally:
        if lock is not None:
            lock.release()


def _slice_group(filepath, grouppath, keys, slc, lock=None):
    try:
        if lock is not None:
            lock.acquire()
        with h5py.File(filepath, "r") as f:
            return {key: f[grouppath][key][slc] for key in keys}
    finally:
        if lock is not None:
            lock.release()


def _restore_categories(data, categorical_columns):
    for key, category_dict in categorical_columns.items():
        data[key] = pd.Categorical.from_codes(data[key], category_dict, ordered=True)
    return data


def read_table(group_uri, keys=None, chunksize=10_000_000, index=None, lock=None):
    """
    Create a dask dataframe around a column-oriented table in HDF5.

    A table is a group containing equal-length 1D datasets.

    Parameters
    ----------
    group_uri : str
        URI to the HDF5 group storing the table.
    keys : list, optional
        list of HDF5 Dataset keys, default is to use all keys in the group
    chunksize : int, optional
        Chunk size
    index : str, optional
        Sorted column to use as index
    lock : multiprocessing.Lock, optional
        Lock to serialize HDF5 read/write access. Default is no lock.

    Returns
    -------
    :class:`dask.dataframe.DataFrame`

    Notes
    -----
    Learn more about the `dask <https://docs.dask.org/en/latest/>`_ project.

    """
    filepath, grouppath = parse_cooler_uri(group_uri)
    nrows, keys, meta, categoricals = _get_group_info(filepath, grouppath, keys)

    # Make a unique task name
    token = tokenize(filepath, grouppath, chunksize, keys)
    task_name = "daskify-h5py-table-" + token

    # Partition the table
    divisions = (0,) + tuple(range(-1, nrows, chunksize))[1:]
    if divisions[-1] != nrows - 1:
        divisions = divisions + (nrows - 1,)

    # Build the task graph
    dsk = {}
    for i in range(0, int(ceil(nrows / chunksize))):
        slc = slice(i * chunksize, (i + 1) * chunksize)
        data_dict = (_slice_group, filepath, grouppath, keys, slc, lock)
        if categoricals:
            data_dict = (_restore_categories, data_dict, categoricals)
        dsk[task_name, i] = (pd.DataFrame, data_dict, None, meta.columns)

    # Generate ddf from dask graph
    df = dd.DataFrame(dsk, task_name, meta, divisions)
    if index is not None:
        df = df.set_index(index, sorted=True, drop=False)
    return df


def _array_select(clr, i0, i1, j0, j1, field, sparse_array):
    is_upper = clr._is_symm_upper
    with clr.open("r") as h5:
        dtype = h5['pixels'][field].dtype
        reader = CSRReader(h5, field, max_chunk=500000000)
        if is_upper:
            i, j, v = query_rect(reader.query, i0, i1, j0, j1, duplex=True)
        else:
            i, j, v = reader.query(i0, i1, j0, j1)
        if not len(v):
            v = v.astype(dtype)
    arr = COO((i - i0, j - j0), v, shape=(i1 - i0, j1 - j0))
    if not sparse_array:
        arr = arr.todense()
    return arr


def load_dask_array(
    clr, i0, i1, j0, j1, field="count", sparse_array=False, chunksize=256
):
    """
    Create a parallel Dask array around the matrix representation of a cooler.

    Parameters
    ----------
    clr : :class:`cooler.Cooler`
        Cooler object
    i0, i1 : int
        Row query range
    j0, j1 : int
        Column query range
    field : str
        Value column to query
    sparse_array : bool, optional
        Create a dask array backed by :class:`sparse.COO` sparse arrays
        instead of dense numpy arrays (default).
    chunksize : int, optional
        Length of the rowwise chunks to partition the underlying data into.

    Returns
    -------
    :class:`dask.array.Array`

    """
    token = tokenize(clr.uri, i0, i1, j0, i1, field, chunksize)
    task_name = "cooler-array-slice-" + token

    shape = (i1 - i0, j1 - j0)
    meta = _array_select(clr, 0, 0, 0, 0, field, sparse_array)
    slices = [(lo, hi, j0, j1) for lo, hi in partition(0, shape[0], chunksize)]
    chunks = (tuple(s[1] - s[0] for s in slices), (shape[1],))
    keys = list(product([task_name], *[range(len(dim)) for dim in chunks]))
    values = [(_array_select, clr, *slc, field, sparse_array) for slc in slices]
    dsk = dict(zip(keys, values))

    return da.Array(dsk, task_name, chunks, meta=meta, shape=shape)