File: test_performance.py

package info (click to toggle)
xphyle 4.4.4-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,904 kB
  • sloc: python: 6,308; makefile: 205; ruby: 105
file content (110 lines) | stat: -rw-r--r-- 3,565 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""Self-contained performance tests.
"""
from bisect import bisect
import gzip
from itertools import accumulate
from random import random, randint
import time
from xphyle.utils import read_lines
from xphyle.paths import TempDir
import pytest


class TimeKeeper:
    def __init__(self, msg, **kwargs):
        self.msg = msg
        self.msg_args = kwargs
        self.duration = 0

    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, exception_type, exception_value, traceback):
        self.stop = time.perf_counter()
        self.duration = self.stop - self.start
        print(self.msg.format(
            duration=self.duration,
            **self.msg_args))


def choices(population, weights=None, *, cum_weights=None, k=1):
    """Return a k sized list of population elements chosen with replacement.
    If the relative weights or cumulative weights are not specified,
    the selections are made with equal probability.

    This function is borrowed from the python 3.6 'random' package.
    """
    if cum_weights is None:
        if weights is None:
            _int = int
            total = len(population)
            return [population[_int(random() * total)] for _ in range(k)]
        cum_weights = list(accumulate(weights))
    elif weights is not None:
        raise TypeError('Cannot specify both weights and cumulative weights')
    if len(cum_weights) != len(population):
        raise ValueError('The number of weights does not match the population')
    total = cum_weights[-1]
    return [population[bisect(cum_weights, random() * total)] for _ in range(k)]


def perftest(name, text_generator, num_iter=10):
    # generate a big text
    msg = """
    Timing of {iter} {name} tests with total size {size:,d} characters and 
    use_system = {use_system}: {duration:0.2f} sec"""
    total_size = 0

    with TempDir() as root:
        paths = tuple(
            root.make_file(suffix='.gz')
            for _ in range(num_iter))
        for path in paths:
            txt = text_generator()
            total_size += len(txt)
            with gzip.open(path, 'wt') as out:
                out.write(txt)

        with TimeKeeper(
                msg, name=name, iter=num_iter, size=total_size,
                use_system=None):
            for path in paths:
                list(gzip.open(path))

        for use_system in (True, False):
            with TimeKeeper(
                    msg, name=name, iter=num_iter, size=total_size,
                    use_system=use_system):
                for path in paths:
                    list(read_lines(path, use_system=use_system))


@pytest.mark.skip(reason="Lorem Ipsum has not been packaged, hence skipping this test")
@pytest.mark.perf
def test_lorem_ipsum():
    from lorem.text import TextLorem
    generate_lorem = TextLorem(prange=(500, 1000), trange=(500, 1000))
    return perftest('lorem ipsum', generate_lorem.text)


@pytest.mark.perf
def test_fastq():
    def generate_fastq(seqlen=100):
        num_records = randint(100000, 500000)
        qualspace = list(chr(i + 33) for i in range(60))

        def rand_seq():
            return "".join(choices(['A', 'C', 'G', 'T'], k=seqlen))

        def rand_qual():
            return "".join(choices(qualspace, k=seqlen))

        return "\n".join(
            "\n".join((
                "read{}".format(i),
                rand_seq(),
                '+',
                rand_qual()))
            for i in range(num_records))
    return perftest('fastq', generate_fastq)