1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
"""Self-contained performance tests.
"""
from bisect import bisect
import gzip
from itertools import accumulate
from random import random, randint
import time
from xphyle.utils import read_lines
from xphyle.paths import TempDir
import pytest
class TimeKeeper:
def __init__(self, msg, **kwargs):
self.msg = msg
self.msg_args = kwargs
self.duration = 0
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, exception_type, exception_value, traceback):
self.stop = time.perf_counter()
self.duration = self.stop - self.start
print(self.msg.format(
duration=self.duration,
**self.msg_args))
def choices(population, weights=None, *, cum_weights=None, k=1):
"""Return a k sized list of population elements chosen with replacement.
If the relative weights or cumulative weights are not specified,
the selections are made with equal probability.
This function is borrowed from the python 3.6 'random' package.
"""
if cum_weights is None:
if weights is None:
_int = int
total = len(population)
return [population[_int(random() * total)] for _ in range(k)]
cum_weights = list(accumulate(weights))
elif weights is not None:
raise TypeError('Cannot specify both weights and cumulative weights')
if len(cum_weights) != len(population):
raise ValueError('The number of weights does not match the population')
total = cum_weights[-1]
return [population[bisect(cum_weights, random() * total)] for _ in range(k)]
def perftest(name, text_generator, num_iter=10):
# generate a big text
msg = """
Timing of {iter} {name} tests with total size {size:,d} characters and
use_system = {use_system}: {duration:0.2f} sec"""
total_size = 0
with TempDir() as root:
paths = tuple(
root.make_file(suffix='.gz')
for _ in range(num_iter))
for path in paths:
txt = text_generator()
total_size += len(txt)
with gzip.open(path, 'wt') as out:
out.write(txt)
with TimeKeeper(
msg, name=name, iter=num_iter, size=total_size,
use_system=None):
for path in paths:
list(gzip.open(path))
for use_system in (True, False):
with TimeKeeper(
msg, name=name, iter=num_iter, size=total_size,
use_system=use_system):
for path in paths:
list(read_lines(path, use_system=use_system))
@pytest.mark.skip(reason="Lorem Ipsum has not been packaged, hence skipping this test")
@pytest.mark.perf
def test_lorem_ipsum():
from lorem.text import TextLorem
generate_lorem = TextLorem(prange=(500, 1000), trange=(500, 1000))
return perftest('lorem ipsum', generate_lorem.text)
@pytest.mark.perf
def test_fastq():
def generate_fastq(seqlen=100):
num_records = randint(100000, 500000)
qualspace = list(chr(i + 33) for i in range(60))
def rand_seq():
return "".join(choices(['A', 'C', 'G', 'T'], k=seqlen))
def rand_qual():
return "".join(choices(qualspace, k=seqlen))
return "\n".join(
"\n".join((
"read{}".format(i),
rand_seq(),
'+',
rand_qual()))
for i in range(num_records))
return perftest('fastq', generate_fastq)
|