File: benchmarks.py

package info (click to toggle)
bowtie2 2.5.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 27,492 kB
  • sloc: cpp: 63,838; perl: 7,232; sh: 1,131; python: 987; makefile: 541; ansic: 122
file content (275 lines) | stat: -rwxr-xr-x 9,766 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/python3
"""
A few items to deal with sets of benchmarks.
"""

import os
import re
import csv
import glob
import json
import logging
import subprocess
import samreader as Sr


class Benchmarks(object):
    """ 
    Iterable for all benchmarks found in our test directory. 
    """

    def __init__(self,
                 benchmarks_dir=None,
                 benchmark_test=None,
                 data_dir=None,
                 output_dir=None,
                 bin_dir=None,
                 benchmark_id=None):
        self.set_idx = 0
        self.values = list()
        self.benchmark_id = benchmark_id
        self.data_dir = data_dir
        self.bin_dir = bin_dir
        self.benchmarks_dir = benchmarks_dir
        self.output_dir = os.path.join(output_dir, self.benchmark_id)
        # 
        if os.path.exists(self.output_dir):
            logging.error("A benchmark with the same name already exists (%s)" % self.output_dir)
            raise OSError("Directory already exists: %s" % self.output_dir)
        #
        logging.debug("Creating test output directory: %s" % self.output_dir)
        os.mkdir(self.output_dir)

        if benchmark_test is not None:
            self.values.append(self._load_benchmark(benchmark_test))

        if self.benchmarks_dir is not None and os.path.isdir(self.benchmarks_dir):
            logging.debug("Parse all json files from: %s" % self.benchmarks_dir)
            all_tests = glob.glob(os.path.join(self.benchmarks_dir, "*.json"))
            if all_tests:
                for b_set in all_tests:
                    logging.debug("Adding test: %s" % b_set)
                    self.values.append(self._load_benchmark(b_set))
            else:
                logging.warn("No json files found!")

    def __iter__(self):
        self.set_idx = 0
        return self

    def __next__(self):
        if self.set_idx == len(self.values):
            raise StopIteration

        value = self.values[self.set_idx]
        self.set_idx += 1
        return value

    def _load_benchmark(self, bench_fname):
        logging.debug("Loading test: %s" % bench_fname)
        with open(bench_fname) as fp:
            set_data = json.load(fp)
            test_list = set_data["tests"]

            for test_desc in test_list:
                data_load_list = test_desc["input_data"]["loading"]
                for i, load_cmd in enumerate(data_load_list):
                    load_cmd = re.sub(r'##BT2DIR##', self.bin_dir, load_cmd)
                    load_cmd = re.sub(r'##DATADIR##', self.data_dir, load_cmd)
                    data_load_list[i] = load_cmd

                runable_opt = test_desc["runable"]
                for key in ["options", "parameters", "outfiles"]:
                    try:
                        runable_prop = runable_opt[key]
                    except KeyError:
                        continue

                    for i, item in enumerate(runable_prop):
                        item = re.sub(r'##BT2DIR##', self.bin_dir, item)
                        item = re.sub(r'##DATADIR##', self.data_dir, item)
                        item = re.sub(r'##OUTDIR##', self.output_dir, item)
                        runable_prop[i] = item

            return BenchmarkSet(set_data, self.data_dir, self.output_dir, self.bin_dir)


class BenchmarkSet(object):
    """ A Benchmark item
    """

    def __init__(self, data, data_dir, out_dir, bin_dir):
        self.data = data
        self.data_dir = data_dir
        self.out_dir = out_dir
        self.bin_dir = bin_dir
        self.input_data_loaded = False

    def run(self):
        logging.info("Running benchmark set: %s" % self.data["description"])
        all_tests = self.data["tests"]
        for test in all_tests:
            bench_cmd = globals()[test["metric"]](self, test)
            bench_cmd.launch()

    def load(self):
        if self.input_data_loaded:
            return
        logging.info("Loading data for %s" % self.data["name"])
        # load data
        test_list = self.data["tests"]

        for test in test_list:
            data_is_here = True
            data_set = test["input_data"]["files"]
            for data_file in data_set:
                if not os.path.isfile(os.path.join(self.data_dir, data_file)):
                    data_is_here = False

            if not data_is_here:
                logging.info("Generate data for %s" % test["name"])
                for cmd in test["input_data"]["loading"]:
                    logging.info("running: %s" % cmd)
                    subprocess.check_call(cmd, shell=True)

        self.input_data_loaded = True


class Runable(object):
    """ cmd line and run helper """

    def __init__(self, main_set, test):
        """ start """
        self.benchmark_set = main_set
        self.test = test
        self.prologue = ''
        self.err_log = os.path.join(self.benchmark_set.out_dir, test["name"]) + ".metric"

    def launch(self):
        """ builds cmd and launch"""
        logging.debug("Building command.")
        cmd = self._build_cmd()
        logging.debug("Running command: %s" % cmd)
        self._run_cmd(cmd)
        logging.debug("Calling report formating.")
        self._format_report()

    def _build_cmd(self):
        """ build command line"""
        space = " "
        test = self.test
        prg = os.path.join(self.benchmark_set.bin_dir, test["runable"]["program"])
        cmd = self.prologue + space + prg

        for opt in test["runable"]["options"]:
            cmd = cmd + space + opt

        for parm in test["runable"]["parameters"]:
            cmd = cmd + space + parm

        return cmd

    def _run_cmd(self, cmd):
        """ running """
        logging.info("Start Benchmark %s" % self.test["name"])
        logging.info("Running: %s" % cmd)
        with open(self.err_log, 'w') as errlog:
            subprocess.check_call(cmd, shell=True, stderr=errlog)

    def _format_report(self, cmd):
        """ not always required """
        pass


class TestTime(Runable):
    """ Time benchmarks """

    def __init__(self, main_set, test):
        super(TestTime, self).__init__(main_set, test)
        self.prologue = "/usr/bin/time -f %U,%S,%E "

    def _format_report(self):
        """ formats data in csv format"""
        csv_file = os.path.join(self.benchmark_set.out_dir, self.test["name"]) + ".csv"
        p1 = subprocess.Popen(["tail -1 %s" % self.err_log], shell=True, stdout=subprocess.PIPE)
        line = p1.communicate()[0].rstrip()

        with open(csv_file, 'w') as csvf:
            writer = csv.writer(csvf)
            writer.writerow(["Test", "User Time", "System Time", "Wall Time"])
            row = [self.test["name"]]
            row.extend(line.split(","))
            writer.writerow(row)


class TestAccuracy(Runable):
    """ accuracy """

    def __init__(self, main_set, test):
        super(TestAccuracy, self).__init__(main_set, test)

    def _format_report(self):
        """ collect data in csv format"""
        in_sam_file = self._get_first_sam_input_file()
        in_sam_file = os.path.join(self.benchmark_set.data_dir, in_sam_file)
        initial_data = dict()
        mapq_summary = dict()
        with open(in_sam_file, "r") as fh_sin:
            in_reader = Sr.SamReader(fh_sin)
            for rec in in_reader:
                fl = rec.flag & 255
                if fl % 2 == 0:
                    logging.error("Initial data does not look like a paired search.")
                if fl > 128:
                    q_name = rec.qname + "_2"
                elif fl > 64:
                    q_name = rec.qname + "_1"
                else:
                    logging.error("Again, initial data does not look like a paired search.")
                    q_name = rec.qname
                initial_data[q_name] = (rec.rname, rec.pos)

        out_sam_file = self.test["runable"]["outfiles"][0]
        with open(out_sam_file, "r") as fh_out:
            in_reader = Sr.SamReader(fh_out)
            for rec in in_reader:
                fl = rec.flag & 255
                if fl % 2 == 0:
                    logging.error("This does not look like a paired search.")
                if fl > 128:
                    q_name = rec.qname + "_2"
                elif fl > 64:
                    q_name = rec.qname + "_1"
                else:
                    logging.error("Again, initial data does not look like a paired search.")
                    q_name = rec.qname
                orig = initial_data[q_name]
                if orig[0] == rec.rname and orig[1] - rec.pos < 3:
                    delta = [1, 0]
                else:
                    delta = [0, 1]
                    logging.debug("%s: missed (pos:%d vs %d)" % (q_name, orig[1], rec.pos))
                try:
                    mapq_summary[rec.mapq] = list(map(sum, list(zip(delta, mapq_summary[rec.mapq]))))
                except KeyError:
                    mapq_summary[rec.mapq] = delta

        csv_file = os.path.join(self.benchmark_set.out_dir, self.test["name"]) + ".csv"
        with open(csv_file, 'w') as csvf:
            writer = csv.writer(csvf)
            writer.writerow(["Test name", "MAPQ", "No. Correct", "No. Misses"])
            for k in mapq_summary:
                row = [self.test["name"], k]
                row.extend(mapq_summary[k])
                writer.writerow(row)

    def _get_first_sam_input_file(self):
        all_input_files = self.test["input_data"]["files"]
        for fname in all_input_files:
            if fname[-4:] == ".sam":
                logging.debug("Compare with origin SAM file: %s" % fname)
                return fname

        raise LookupError("No SAM data input file defined for this test!")