File: free_threading.py

package info (click to toggle)
numexpr 2.14.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 784 kB
  • sloc: cpp: 4,250; python: 3,985; ansic: 369; makefile: 203
file content (171 lines) | stat: -rw-r--r-- 6,008 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#################################################################################
# To compare the performance of numexpr when free-threading CPython is used.
#
# This example makes use of Python threads, as opposed to C native ones
# in order to highlight the improvement introduced by free-threading CPython,
# which now disables the GIL altogether.
#################################################################################
"""
Results with GIL-enabled CPython:

Benchmarking Expression 1:
NumPy time (threaded over 32 chunks with 16 threads): 1.173090 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 0.951071 seconds
numexpr speedup: 1.23x
----------------------------------------
Benchmarking Expression 2:
NumPy time (threaded over 32 chunks with 16 threads): 10.410874 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 8.248753 seconds
numexpr speedup: 1.26x
----------------------------------------
Benchmarking Expression 3:
NumPy time (threaded over 32 chunks with 16 threads): 9.605909 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 11.087108 seconds
numexpr speedup: 0.87x
----------------------------------------
Benchmarking Expression 4:
NumPy time (threaded over 32 chunks with 16 threads): 3.836962 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 18.054531 seconds
numexpr speedup: 0.21x
----------------------------------------

Results with free-threading CPython:

Benchmarking Expression 1:
NumPy time (threaded over 32 chunks with 16 threads): 3.415349 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 2.618876 seconds
numexpr speedup: 1.30x
----------------------------------------
Benchmarking Expression 2:
NumPy time (threaded over 32 chunks with 16 threads): 19.005238 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 12.611407 seconds
numexpr speedup: 1.51x
----------------------------------------
Benchmarking Expression 3:
NumPy time (threaded over 32 chunks with 16 threads): 20.555149 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 17.690749 seconds
numexpr speedup: 1.16x
----------------------------------------
Benchmarking Expression 4:
NumPy time (threaded over 32 chunks with 16 threads): 38.338372 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 35.074684 seconds
numexpr speedup: 1.09x
----------------------------------------
"""

import os

os.environ["NUMEXPR_NUM_THREADS"] = "2"
import threading
import timeit

import numpy as np

import numexpr as ne

array_size = 10**8
num_runs = 10
num_chunks = 32  # Number of chunks
num_threads = 16  # Number of threads constrained by how many chunks memory can hold

a = np.random.rand(array_size).reshape(10**4, -1)
b = np.random.rand(array_size).reshape(10**4, -1)
c = np.random.rand(array_size).reshape(10**4, -1)

chunk_size = array_size // num_chunks

expressions_numpy = [
    lambda a, b, c: a + b * c,
    lambda a, b, c: a**2 + b**2 - 2 * a * b * np.cos(c),
    lambda a, b, c: np.sin(a) + np.log(b) * np.sqrt(c),
    lambda a, b, c: np.exp(a) + np.tan(b) - np.sinh(c),
]

expressions_numexpr = [
    "a + b * c",
    "a**2 + b**2 - 2 * a * b * cos(c)",
    "sin(a) + log(b) * sqrt(c)",
    "exp(a) + tan(b) - sinh(c)",
]


def benchmark_numpy_chunk(func, a, b, c, results, indices):
    for index in indices:
        start = index * chunk_size
        end = (index + 1) * chunk_size
        time_taken = timeit.timeit(
            lambda: func(a[start:end], b[start:end], c[start:end]), number=num_runs
        )
        results.append(time_taken)


def benchmark_numexpr_re_evaluate(expr, a, b, c, results, indices):
    for index in indices:
        start = index * chunk_size
        end = (index + 1) * chunk_size
        # if index == 0:
        # Evaluate the first chunk with evaluate
        time_taken = timeit.timeit(
            lambda: ne.evaluate(
                expr,
                local_dict={
                    "a": a[start:end],
                    "b": b[start:end],
                    "c": c[start:end],
                },
            ),
            number=num_runs,
        )
        results.append(time_taken)


def run_benchmark_threaded():
    chunk_indices = list(range(num_chunks))

    for i in range(len(expressions_numpy)):
        print(f"Benchmarking Expression {i+1}:")

        results_numpy = []
        results_numexpr = []

        threads_numpy = []
        for j in range(num_threads):
            indices = chunk_indices[j::num_threads]  # Distribute chunks across threads
            thread = threading.Thread(
                target=benchmark_numpy_chunk,
                args=(expressions_numpy[i], a, b, c, results_numpy, indices),
            )
            threads_numpy.append(thread)
            thread.start()

        for thread in threads_numpy:
            thread.join()

        numpy_time = sum(results_numpy)
        print(
            f"NumPy time (threaded over {num_chunks} chunks with {num_threads} threads): {numpy_time:.6f} seconds"
        )

        threads_numexpr = []
        for j in range(num_threads):
            indices = chunk_indices[j::num_threads]  # Distribute chunks across threads
            thread = threading.Thread(
                target=benchmark_numexpr_re_evaluate,
                args=(expressions_numexpr[i], a, b, c, results_numexpr, indices),
            )
            threads_numexpr.append(thread)
            thread.start()

        for thread in threads_numexpr:
            thread.join()

        numexpr_time = sum(results_numexpr)
        print(
            f"numexpr time (threaded with re_evaluate over {num_chunks} chunks with {num_threads} threads): {numexpr_time:.6f} seconds"
        )
        print(f"numexpr speedup: {numpy_time / numexpr_time:.2f}x")
        print("-" * 40)


if __name__ == "__main__":
    run_benchmark_threaded()