File: multirun.py

package info (click to toggle)
spades 3.13.1+dfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 22,172 kB
  • sloc: cpp: 136,213; ansic: 48,218; python: 16,809; perl: 4,252; sh: 2,115; java: 890; makefile: 507; pascal: 348; xml: 303
file content (103 lines) | stat: -rwxr-xr-x 4,162 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/python3

import argparse
import os
import os.path
import subprocess
import sys
import yaml

parser = argparse.ArgumentParser(description="MTS Multi Runner")

all_assemblers = ["main", "spades", "megahit"]
all_binners = ["canopy", "concoct", "gattaca", "maxbin", "metabat"]
unsupported = set(["main_maxbin"])

parser.add_argument("--threads", "-t", type=int, default=8, help="Number of threads for each run")
parser.add_argument("dir", type=str, help="Output directory")
parser.add_argument("--config", "-c", type=str, default=None, help="Base config")
parser.add_argument("--pipelines", "-p", type=str, nargs="+", default=[], help="Pipeline configurations to run")
parser.add_argument("--assemblers", "-a", type=str, nargs="+", default=all_assemblers, help="Assemblers to use")
parser.add_argument("--binners", "-b", type=str, nargs="+", default=all_binners, help="Binners to use")
parser.add_argument("--exclude", "-e", type=str, nargs="+", default=[], help="Excluded (skipped) configurations")
parser.add_argument("--no-stats", "-S", action="store_true", help="Skip the stats section (overrides the config value)")
parser.add_argument("--verbose", "-v", action="store_true", help="Increase verbosity level")
parser.add_argument("--reuse-from", type=str, default=None, help="Reuse assemblies from another multirun")
parser.add_argument("--ignore-errors", action="store_true")

args = parser.parse_args()

if not args.config:
    path = os.path.join(args.dir, "config.yaml")
    if not os.path.isfile(path):
        print("\033[31mError: no config provided with -c and no config found in the multirun directory\033[0m")
        sys.exit(1)
    args.config = path

with open(args.config) as config_in:
    config_template = yaml.load(config_in)

def pipelines():
    for assembler in args.assemblers:
        for binner in args.binners:
            yield assembler + "_" + binner
    for pipeline in args.pipelines:
        yield pipeline

prev_runs = dict()

excluded = unsupported.union(args.exclude)
for pipeline in pipelines():
    if pipeline in excluded:
        if pipeline in unsupported:
            print("\033[33mWarning:", pipeline, "is not currently supported; skipping\033[0m\n")
        continue
    print("Running", pipeline)
    cur_dir = os.path.join(args.dir, pipeline)
    if not os.path.exists(cur_dir):
        os.makedirs(cur_dir)
    call_params = ["./mts.py", "-t", str(args.threads), cur_dir]
    if args.no_stats:
        call_params.extend(["--no-stats"])
    config = config_template.copy()
    for stage in ["assembly", "profile", "binning"]:
        config.setdefault(stage, dict())
    params = pipeline.split("_")
    assembly_name = params[0]
    if assembly_name == "main":
        config["profile"]["profiler"] = "mts"
    else:
        config["assembly"]["assembler"] = params[0]
        config["assembly"]["groups"] = ["*"]
        config["profile"]["profiler"] = "jgi"
        config["propagation"] = {"enabled": False}
        config["reassembly"] = {"enabled": False}

    config["binning"]["binner"] = params[1]
    with open(os.path.join(cur_dir, "config.yaml"), "w") as config_out:
        yaml.dump(config, config_out)
    # Try to reuse assemblies from previous runs with the same assembler
    prev_run = prev_runs.get(assembly_name)
    if prev_run:
        print("Reusing same data from", prev_run)
        call_params.extend(["--reuse-from", prev_run])
    elif args.reuse_from:
        for run in os.listdir(args.reuse_from):
            if run.startswith(assembly_name + "_"):
                path = os.path.join(args.reuse_from, run, "assembly")
                if os.path.isdir(path):
                    print("Reusing assembly from", path)
                    call_params.extend(["--reuse-assemblies", path])
                    break

    #TODO: rewrite using Snakemake API
    errcode = subprocess.call(call_params)
    if errcode:
        print(" ".join(call_params), "returned with error:", errcode)
        if not args.ignore_errors:
            sys.exit(errcode)
    elif not prev_run: #Reuse only successful run
        prev_runs[assembly_name] = cur_dir
    print()

#TODO: compare stats