1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
|
import json
import os
import struct
import sys
import tempfile
import time
import timeit
from ._aux import posix_redirect_output, update_sys_path
from .discovery import disc_benchmarks
from .run import _run
wall_timer = timeit.default_timer
def recvall(sock, size):
"""
Receives data from a socket until the specified size of data has been received.
#### Parameters
**sock** (`socket`)
: The socket from which the data will be received. This socket should
already be connected to the other end from which data is to be received.
**size** (`int`)
: The total size of data to be received from the socket.
#### Returns
**data** (`bytes`)
: The data received from the socket. The length of this data will be equal
to the size specified.
#### Raises
**RuntimeError**
: If the socket closed before the specified size of data could be received.
#### Notes
This function continuously receives data from the provided socket in a loop
until the total length of the received data is equal to the specified size.
If the socket closes before the specified size of data could be received, a
`RuntimeError` is raised. The function returns the received data as a byte
string.
"""
data = b""
while len(data) < size:
s = sock.recv(size - len(data))
data += s
if not s:
raise RuntimeError(
"did not receive data from socket " f"(size {size}, got only {data !r})"
)
return data
def _run_server(args):
"""
Runs a server that executes benchmarks based on the received commands.
#### Parameters
**args** (`tuple`)
: A tuple containing the benchmark directory and socket name.
- `benchmark_dir` (`str`): The directory where the benchmarks are located.
- `socket_name` (`str`): The name of the UNIX socket to be used for
- communication.
#### Raises
**RuntimeError**
: If the received command contains unknown data.
#### Notes
This function creates a server that listens on a UNIX socket for commands.
It can perform two actions based on the received command: quit or preimport
benchmarks.
If the command is "quit", the server stops running. If the command is
"preimport", the function imports all the benchmarks in the specified
directory, capturing all the I/O to a file during import. After the
benchmarks are imported, the function sends the contents of the output file
back through the socket.
If the action is not "quit" or "preimport", the function assumes it is a
command to run a specific benchmark. It then runs the benchmark and waits
for the results. It also handles a timeout for the benchmark execution and
sends the results back through the socket.
The function continuously accepts new commands until it receives a "quit"
command or a KeyboardInterrupt.
It uses UNIX domain sockets for inter-process communication. The name of the
socket is passed as a parameter in `args`. The socket is created, bound to
the socket name, and set to listen for connections. When a connection is
accepted, the command is read from the socket, parsed, and executed
accordingly. After executing the command, the server sends back the result
through the socket and waits for the next command.
"""
import signal
import socket
(
benchmark_dir,
socket_name,
) = args
update_sys_path(benchmark_dir)
# Socket I/O
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(socket_name)
s.listen(1)
# Read and act on commands from socket
while True:
stdout_file = None
try:
conn, addr = s.accept()
except KeyboardInterrupt:
break
try:
fd, stdout_file = tempfile.mkstemp()
os.close(fd)
# Read command
(read_size,) = struct.unpack("<Q", recvall(conn, 8))
command_text = recvall(conn, read_size)
command_text = command_text.decode("utf-8")
# Parse command
command = json.loads(command_text)
action = command.pop("action")
if action == "quit":
break
elif action == "preimport":
# Import benchmark suite before forking.
# Capture I/O to a file during import.
with posix_redirect_output(stdout_file, permanent=False):
for _ in disc_benchmarks(benchmark_dir, ignore_import_errors=True):
pass
# Report result
with open(stdout_file, errors="replace") as f:
out = f.read()
out = json.dumps(out)
out = out.encode("utf-8")
conn.sendall(struct.pack("<Q", len(out)))
conn.sendall(out)
continue
benchmark_id = command.pop("benchmark_id")
params_str = command.pop("params_str")
profile_path = command.pop("profile_path")
result_file = command.pop("result_file")
timeout = command.pop("timeout")
cwd = command.pop("cwd")
if command:
raise RuntimeError(f"Command contained unknown data: {command_text !r}")
# Spawn benchmark
run_args = (
benchmark_dir,
benchmark_id,
params_str,
profile_path,
result_file,
)
pid = os.fork()
if pid == 0:
conn.close()
sys.stdin.close()
exitcode = 1
try:
with posix_redirect_output(stdout_file, permanent=True):
try:
os.chdir(cwd)
_run(run_args)
exitcode = 0
except BaseException:
import traceback
traceback.print_exc()
finally:
os._exit(exitcode)
# Wait for results
# (Poll in a loop is simplest --- also used by subprocess.py)
start_time = wall_timer()
is_timeout = False
time2sleep = 1e-15
while True:
res, status = os.waitpid(pid, os.WNOHANG)
if res != 0:
break
if timeout is not None and wall_timer() > start_time + timeout:
# Timeout
if is_timeout:
os.kill(pid, signal.SIGKILL)
else:
os.kill(pid, signal.SIGTERM)
is_timeout = True
time2sleep *= 1e1
time.sleep(min(time2sleep, 0.001))
# Report result
with open(stdout_file, errors="replace") as f:
out = f.read()
# Emulate subprocess
if os.WIFSIGNALED(status):
retcode = -os.WTERMSIG(status)
elif os.WIFEXITED(status):
retcode = os.WEXITSTATUS(status)
elif os.WIFSTOPPED(status):
retcode = -os.WSTOPSIG(status)
else:
# shouldn't happen, but fail silently
retcode = -128
info = {"out": out, "errcode": -256 if is_timeout else retcode}
result_text = json.dumps(info)
result_text = result_text.encode("utf-8")
conn.sendall(struct.pack("<Q", len(result_text)))
conn.sendall(result_text)
except KeyboardInterrupt:
break
finally:
conn.close()
if stdout_file is not None:
os.unlink(stdout_file)
|