File: compress_fast5.py

package info (click to toggle)
ont-fast5-api 4.1.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 3,548 kB
  • sloc: python: 3,799; makefile: 153; sh: 13
file content (136 lines) | stat: -rw-r--r-- 6,541 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
import os
import shutil
from argparse import ArgumentParser, ArgumentError
from multiprocessing.pool import Pool

from ont_fast5_api import __version__
from ont_fast5_api.compression_settings import COMPRESSION_MAP
from ont_fast5_api.conversion_tools.conversion_utils import get_fast5_file_list, get_progress_bar
from ont_fast5_api.fast5_file import Fast5File, EmptyFast5
from ont_fast5_api.fast5_read import copy_attributes
from ont_fast5_api.fast5_interface import is_multi_read
from ont_fast5_api.multi_fast5 import MultiFast5File
from ont_fast5_api.static_data import OPTIONAL_READ_GROUPS


def compress_batch(input_folder, output_folder, target_compression, recursive=True, threads=1, follow_symlinks=True,
                   in_place=False, sanitize=False):
    # We require an absolute input path to we can replicate the data structure relative to it later on
    input_folder = os.path.abspath(input_folder)

    file_list = get_fast5_file_list(input_folder, recursive, follow_symlinks=follow_symlinks)
    if len(file_list) == 0:
        raise ValueError("No input fast5 files found in '{}'. Recursive={}".format(input_folder, recursive))

    # Set up the process pool and the progressbar
    pool = Pool(min(threads, len(file_list)))
    pbar = get_progress_bar(len(file_list))

    def update(result):
        if in_place and result is not None:
            input_path, output_path = result
            shutil.move(output_path, input_path)
        pbar.update(pbar.currval + 1)

    for input_file in file_list:
        input_path = os.path.join(input_folder, input_file)
        if in_place:
            output_path = os.path.join(input_path + ".tmp.compressed")
        else:
            output_path = os.path.join(output_folder, os.path.relpath(input_path, input_folder))

        pool.apply_async(func=compress_file,
                         args=(input_path, output_path, target_compression, sanitize),
                         callback=update)

    # Tear down the process pool and pbar. We can't use contextmanagers since we need to close() then join()
    pool.close()
    pool.join()
    pbar.finish()


def compress_file(input_file, output_file, target_compression, sanitize=False):
    try:
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if is_multi_read(input_file):
            with MultiFast5File(input_file, 'r') as input_f5, MultiFast5File(output_file, 'a') as output_f5:
                for read in input_f5.get_reads():
                    output_f5.add_existing_read(read, target_compression, sanitize=sanitize)
        else:
            with Fast5File(input_file, 'r') as input_f5, \
                    EmptyFast5(output_file, 'a') as output_f5:
                compress_single_read(output_f5, input_f5, target_compression, sanitize=sanitize)
    except Exception as e:
        # Error raised in Pool.async will be lost so we explicitly print them.
        logging.exception(e)
        raise
    return (input_file, output_file)


def compress_single_read(output_f5, read_to_copy, target_compression, sanitize=False):
    read_id = read_to_copy.get_read_id()
    raw_dataset_name = read_to_copy.raw_dataset_name
    raw_group_name = read_to_copy.raw_dataset_group_name
    read_name = "read_" + read_id
    # Recreating the status object is painful, but doesn't actually interact with the file so we can just reference it.
    output_f5.status = read_to_copy.status

    if str(target_compression) in read_to_copy.raw_compression_filters:
        # If we have the right compression then no need for doing anything fancy
        output_f5.handle.copy(read_to_copy.handle, read_name)
    else:
        copy_attributes(read_to_copy.handle.attrs, output_f5.handle)
        for subgroup in read_to_copy.handle:
            if subgroup not in raw_dataset_name:
                if sanitize and subgroup in OPTIONAL_READ_GROUPS:
                    # skip optional groups when sanitizing
                    continue
                output_f5.handle.copy(read_to_copy.handle[subgroup], subgroup)
            else:
                raw_attrs = read_to_copy.handle[raw_group_name].attrs
                raw_data = read_to_copy.handle[raw_dataset_name]
                output_f5.add_raw_data(raw_data, raw_attrs, compression=target_compression)


def main():
    parser = ArgumentParser("Tool for changing the compression of Fast5 files")
    parser.add_argument('-i', '--input_path', required=True,
                        help='Folder containing fast5 files')

    output_group = parser.add_mutually_exclusive_group(required=True)
    save_arg = output_group.add_argument('-s', '--save_path', default=None,
                                         help="Folder to output fast5 read files to")
    output_group.add_argument('--in_place', action='store_true',
                              help='Replace the old files with new files in place')

    parser.add_argument('-c', '--compression', required=True, choices=list(COMPRESSION_MAP.keys()),
                        help="Target output compression type")
    parser.add_argument('--sanitize', action='store_true',
                        help="Clean output files of optional groups and datasets (e.g. 'Analyses')")
    parser.add_argument('-t', '--threads', type=int, default=1, required=False,
                        help="Maximum number of threads to use")
    parser.add_argument('--recursive', action='store_true',
                        help="Search recursively through folders for single_read fast5 files")
    parser.add_argument('--ignore_symlinks', action='store_true',
                        help="Ignore symlinks when searching recursively for fast5 files")
    parser.add_argument('-v', '--version', action='version', version=__version__)
    args = parser.parse_args()

    if args.input_path == args.save_path:
        raise ArgumentError(save_arg, "--input_path and --save_path must be different locations, or use --in_place")
    if args.sanitize and args.save_path is None:
        raise ArgumentError(save_arg, "--save_path must be given if using --sanitize")

    compress_batch(input_folder=args.input_path,
                   output_folder=args.save_path,
                   target_compression=COMPRESSION_MAP[args.compression],
                   threads=args.threads,
                   recursive=args.recursive,
                   follow_symlinks=not args.ignore_symlinks,
                   in_place=args.in_place,
                   sanitize=args.sanitize)


if __name__ == '__main__':
    main()