File: merge.py

package info (click to toggle)
python-cooler 0.9.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 32,596 kB
  • sloc: python: 10,555; makefile: 198; sh: 31
file content (85 lines) | stat: -rw-r--r-- 2,239 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import click

from ..reduce import merge_coolers
from . import cli
from ._util import parse_field_param


@cli.command()
@click.argument(
    "out_path",
    type=click.Path(exists=False)
)
@click.argument(
    "in_paths",
    nargs=-1,
    type=click.Path(exists=False)
)
@click.option(
    "--chunksize", "-c",
    help="Size of the merge buffer in number of pixel table rows.",
    type=int,
    default=int(20e6),
    show_default=True,
)
@click.option(
    "--field",
    help="Specify the names of value columns to merge as '<name>'. "
    "Repeat the `--field` option for each one. "
    "Use '<name>,dtype=<dtype>' to specify the dtype. Include "
    "',agg=<agg>' to specify an aggregation function different from 'sum'.",
    type=str,
    multiple=True,
)
@click.option(
    "--append", "-a",
    is_flag=True,
    default=False,
    help="Pass this flag to append the output cooler to an existing file "
         "instead of overwriting the file."
)
def merge(out_path, in_paths, chunksize, field, append):
    """
    Merge multiple coolers with identical axes.

    OUT_PATH : Output file path or URI.

    IN_PATHS : Input file paths or URIs of coolers to merge.

    **Notes**

    Data columns merged:

        pixels/bin1_id, pixels/bin2_id, pixels/<value columns>

    Data columns preserved:

        chroms/name, chroms/length
        bins/chrom, bins/start, bins/end

    Additional columns in the the input files are not transferred to the output.

    """
    # logger = get_logger(__name__)

    if len(field):
        field_specifiers = [
            parse_field_param(arg, includes_colnum=False) for arg in field
        ]
        columns, _, dtypes, agg = zip(*field_specifiers)
        dtypes = {col: dt for col, dt in zip(columns, dtypes) if dt is not None}
        agg = {col: f for col, f in zip(columns, agg) if f is not None}
    else:
        # If no other fields are given, 'count' is implicitly chosen.
        # Default aggregation. Dtype will be inferred.
        columns, dtypes, agg = ["count"], None, None

    merge_coolers(
        out_path,
        in_paths,
        mergebuf=chunksize,
        columns=columns,
        dtypes=dtypes,
        agg=agg,
        mode="a" if append else "w"
    )