File: large_array_vs_numpy.py

package info (click to toggle)
numexpr 2.14.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 784 kB
  • sloc: cpp: 4,250; python: 3,985; ansic: 369; makefile: 203
file content (154 lines) | stat: -rw-r--r-- 5,269 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#################################################################################
# To mimic the scenario that computation is i/o bound and constrained by memory
#
# It's a much simplified version that the chunk is computed in a loop,
# and expression is evaluated in a sequence, which is not true in reality.
# Neverthless, numexpr outperforms numpy.
#################################################################################
"""
Benchmarking Expression 1:
NumPy time (threaded over 32 chunks with 2 threads): 4.612313 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 0.951172 seconds
numexpr speedup: 4.85x
----------------------------------------
Benchmarking Expression 2:
NumPy time (threaded over 32 chunks with 2 threads): 23.862752 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 2.182058 seconds
numexpr speedup: 10.94x
----------------------------------------
Benchmarking Expression 3:
NumPy time (threaded over 32 chunks with 2 threads): 20.594895 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 2.927881 seconds
numexpr speedup: 7.03x
----------------------------------------
Benchmarking Expression 4:
NumPy time (threaded over 32 chunks with 2 threads): 12.834101 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 5.392480 seconds
numexpr speedup: 2.38x
----------------------------------------
"""

import os

os.environ["NUMEXPR_NUM_THREADS"] = "16"
import threading
import timeit

import numpy as np

import numexpr as ne

array_size = 10**8
num_runs = 10
num_chunks = 32  # Number of chunks
num_threads = 2  # Number of threads constrained by how many chunks memory can hold

a = np.random.rand(array_size).reshape(10**4, -1)
b = np.random.rand(array_size).reshape(10**4, -1)
c = np.random.rand(array_size).reshape(10**4, -1)

chunk_size = array_size // num_chunks

expressions_numpy = [
    lambda a, b, c: a + b * c,
    lambda a, b, c: a**2 + b**2 - 2 * a * b * np.cos(c),
    lambda a, b, c: np.sin(a) + np.log(b) * np.sqrt(c),
    lambda a, b, c: np.exp(a) + np.tan(b) - np.sinh(c),
]

expressions_numexpr = [
    "a + b * c",
    "a**2 + b**2 - 2 * a * b * cos(c)",
    "sin(a) + log(b) * sqrt(c)",
    "exp(a) + tan(b) - sinh(c)",
]


def benchmark_numpy_chunk(func, a, b, c, results, indices):
    for index in indices:
        start = index * chunk_size
        end = (index + 1) * chunk_size
        time_taken = timeit.timeit(
            lambda: func(a[start:end], b[start:end], c[start:end]), number=num_runs
        )
        results.append(time_taken)


def benchmark_numexpr_re_evaluate(expr, a, b, c, results, indices):
    for index in indices:
        start = index * chunk_size
        end = (index + 1) * chunk_size
        if index == 0:
            # Evaluate the first chunk with evaluate
            time_taken = timeit.timeit(
                lambda: ne.evaluate(
                    expr,
                    local_dict={
                        "a": a[start:end],
                        "b": b[start:end],
                        "c": c[start:end],
                    },
                ),
                number=num_runs,
            )
        else:
            # Re-evaluate subsequent chunks with re_evaluate
            time_taken = timeit.timeit(
                lambda: ne.re_evaluate(
                    local_dict={"a": a[start:end], "b": b[start:end], "c": c[start:end]}
                ),
                number=num_runs,
            )
        results.append(time_taken)


def run_benchmark_threaded():
    chunk_indices = list(range(num_chunks))

    for i in range(len(expressions_numpy)):
        print(f"Benchmarking Expression {i+1}:")

        results_numpy = []
        results_numexpr = []

        threads_numpy = []
        for j in range(num_threads):
            indices = chunk_indices[j::num_threads]  # Distribute chunks across threads
            thread = threading.Thread(
                target=benchmark_numpy_chunk,
                args=(expressions_numpy[i], a, b, c, results_numpy, indices),
            )
            threads_numpy.append(thread)
            thread.start()

        for thread in threads_numpy:
            thread.join()

        numpy_time = sum(results_numpy)
        print(
            f"NumPy time (threaded over {num_chunks} chunks with {num_threads} threads): {numpy_time:.6f} seconds"
        )

        threads_numexpr = []
        for j in range(num_threads):
            indices = chunk_indices[j::num_threads]  # Distribute chunks across threads
            thread = threading.Thread(
                target=benchmark_numexpr_re_evaluate,
                args=(expressions_numexpr[i], a, b, c, results_numexpr, indices),
            )
            threads_numexpr.append(thread)
            thread.start()

        for thread in threads_numexpr:
            thread.join()

        numexpr_time = sum(results_numexpr)
        print(
            f"numexpr time (threaded with re_evaluate over {num_chunks} chunks with {num_threads} threads): {numexpr_time:.6f} seconds"
        )
        print(f"numexpr speedup: {numpy_time / numexpr_time:.2f}x")
        print("-" * 40)


if __name__ == "__main__":
    run_benchmark_threaded()