File: hammer_logic.py

package info (click to toggle)
spades 3.13.1+dfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 22,172 kB
  • sloc: cpp: 136,213; ansic: 48,218; python: 16,809; perl: 4,252; sh: 2,115; java: 890; makefile: 507; pascal: 348; xml: 303
file content (171 lines) | stat: -rw-r--r-- 7,667 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/python3

############################################################################
# Copyright (c) 2015 Saint Petersburg State University
# Copyright (c) 2011-2014 Saint Petersburg Academic University
# All Rights Reserved
# See file LICENSE for details.
############################################################################


import os
import sys
import glob
import shutil
import support
import options_storage
import process_cfg
from site import addsitedir
from distutils import dir_util
from os.path import isfile


def compress_dataset_files(dataset_data, ext_python_modules_home, max_threads, log):
    pigz_path = support.which('pigz')
    if pigz_path:
        compressor = 'pigz'
    else:
        compressor = 'gzip'
    log.info("\n== Compressing corrected reads (with " + compressor + ")")
    to_compress = []
    for reads_library in dataset_data:
        for key, value in reads_library.items():
            if key.endswith('reads'):
                compressed_reads_filenames = []
                for reads_file in value:
                    compressed_reads_filenames.append(reads_file + ".gz")
                    if not isfile(reads_file):
                        if isfile(compressed_reads_filenames[-1]):
                            continue  # already compressed (--continue/--restart-from case)
                        support.error('something went wrong and file with corrected reads (' + reads_file + ') is missing!', log)
                    to_compress.append(reads_file)
                reads_library[key] = compressed_reads_filenames
    if len(to_compress):
        if pigz_path:
            for reads_file in to_compress:
                support.sys_call([pigz_path, '-f', '-7', '-p', str(max_threads), reads_file], log)
        else:
            addsitedir(ext_python_modules_home)
            try:
                if sys.version.startswith('2.'):
                    from joblib2 import Parallel, delayed
                elif sys.version.startswith('3.'):
                    from joblib3 import Parallel, delayed
            except:
                from joblib import Parallel, delayed
            n_jobs = min(len(to_compress), max_threads)
            outputs = Parallel(n_jobs=n_jobs)(delayed(support.sys_call)(['gzip', '-f', '-7', reads_file]) for reads_file in to_compress)
            for output in outputs:
                if output:
                    log.info(output)


def remove_not_corrected_reads(output_dir):
    for not_corrected in glob.glob(os.path.join(output_dir, "*.bad.fastq")):
        os.remove(not_corrected)


def prepare_config_bh(filename, cfg, log):
    subst_dict = dict()

    subst_dict["dataset"] = process_cfg.process_spaces(cfg.dataset_yaml_filename)
    subst_dict["input_working_dir"] = process_cfg.process_spaces(cfg.tmp_dir)
    subst_dict["output_dir"] = process_cfg.process_spaces(cfg.output_dir)
    subst_dict["general_max_iterations"] = cfg.max_iterations
    subst_dict["general_max_nthreads"] = cfg.max_threads
    subst_dict["count_merge_nthreads"] = cfg.max_threads
    subst_dict["bayes_nthreads"] = cfg.max_threads
    subst_dict["expand_nthreads"] = cfg.max_threads
    subst_dict["correct_nthreads"] = cfg.max_threads
    subst_dict["general_hard_memory_limit"] = cfg.max_memory
    if "qvoffset" in cfg.__dict__:
        subst_dict["input_qvoffset"] = cfg.qvoffset
    if "count_filter_singletons" in cfg.__dict__:
        subst_dict["count_filter_singletons"] = cfg.count_filter_singletons
    if "read_buffer_size" in cfg.__dict__:
        subst_dict["count_split_buffer"] = cfg.read_buffer_size
    process_cfg.substitute_params(filename, subst_dict, log)


def prepare_config_ih(filename, cfg, ext_python_modules_home):
    addsitedir(ext_python_modules_home)
    if sys.version.startswith('2.'):
        import pyyaml2 as pyyaml
    elif sys.version.startswith('3.'):
        import pyyaml3 as pyyaml

    data = pyyaml.load(open(filename, 'r'), Loader=pyyaml.FullLoader)
    data["dataset"] = cfg.dataset_yaml_filename
    data["working_dir"] = cfg.tmp_dir
    data["output_dir"] = cfg.output_dir
    data["hard_memory_limit"] = cfg.max_memory
    data["max_nthreads"] = cfg.max_threads
    pyyaml.dump(data, open(filename, 'w'),
                default_flow_style=False, default_style='"', width=float("inf"))


def run_hammer(corrected_dataset_yaml_filename, configs_dir, execution_home, cfg,
               dataset_data, ext_python_modules_home, only_compressing_is_needed, log):
    addsitedir(ext_python_modules_home)
    if sys.version.startswith('2.'):
        import pyyaml2 as pyyaml
    elif sys.version.startswith('3.'):
        import pyyaml3 as pyyaml

    # not all reads need processing
    if support.get_lib_ids_by_type(dataset_data, options_storage.LONG_READS_TYPES):
        not_used_dataset_data = support.get_libs_by_type(dataset_data, options_storage.LONG_READS_TYPES)
        to_correct_dataset_data = support.rm_libs_by_type(dataset_data, options_storage.LONG_READS_TYPES)
        to_correct_dataset_yaml_filename = os.path.join(cfg.output_dir, "to_correct.yaml")
        pyyaml.dump(to_correct_dataset_data, open(to_correct_dataset_yaml_filename, 'w'),
                    default_flow_style=False, default_style='"', width=float("inf"))
        cfg.dataset_yaml_filename = to_correct_dataset_yaml_filename
    else:
        not_used_dataset_data = None

    if not only_compressing_is_needed:
        dst_configs = os.path.join(cfg.output_dir, "configs")
        if os.path.exists(dst_configs):
            shutil.rmtree(dst_configs)
        if cfg.iontorrent:
            dir_util.copy_tree(os.path.join(configs_dir, "ionhammer"), dst_configs, preserve_times=False)
            cfg_file_name = os.path.join(dst_configs, "ionhammer.cfg")
        else:
            dir_util.copy_tree(os.path.join(configs_dir, "hammer"), dst_configs, preserve_times=False)
            cfg_file_name = os.path.join(dst_configs, "config.info")

        cfg.tmp_dir = support.get_tmp_dir(prefix="hammer_")
        if cfg.iontorrent:
            prepare_config_ih(cfg_file_name, cfg, ext_python_modules_home)
            binary_name = "spades-ionhammer"
        else:
            prepare_config_bh(cfg_file_name, cfg, log)
            binary_name = "spades-hammer"

        command = [os.path.join(execution_home, binary_name),
                   os.path.abspath(cfg_file_name)]

        log.info("\n== Running read error correction tool: " + ' '.join(command) + "\n")
        support.sys_call(command, log)
        if not os.path.isfile(corrected_dataset_yaml_filename):
            support.error("read error correction finished abnormally: " + corrected_dataset_yaml_filename + " not found!")
    else:
        log.info("\n===== Skipping %s (already processed). \n" % "read error correction tool")
        support.continue_from_here(log)

    corrected_dataset_data = pyyaml.load(open(corrected_dataset_yaml_filename, 'r'), Loader=pyyaml.FullLoader)
    remove_not_corrected_reads(cfg.output_dir)
    is_changed = False
    if cfg.gzip_output:
        is_changed = True
        compress_dataset_files(corrected_dataset_data, ext_python_modules_home, cfg.max_threads, log)
    if not_used_dataset_data:
        is_changed = True
        corrected_dataset_data += not_used_dataset_data
    if is_changed:
        pyyaml.dump(corrected_dataset_data, open(corrected_dataset_yaml_filename, 'w'),
                    default_flow_style=False, default_style='"', width=float("inf"))
    log.info("\n== Dataset description file was created: " + corrected_dataset_yaml_filename + "\n")

    if os.path.isdir(cfg.tmp_dir):
        shutil.rmtree(cfg.tmp_dir)