File: server.py

package info (click to toggle)
python-asv-runner 0.2.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 420 kB
  • sloc: python: 1,631; makefile: 13
file content (235 lines) | stat: -rw-r--r-- 7,902 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import json
import os
import struct
import sys
import tempfile
import time
import timeit

from ._aux import posix_redirect_output, update_sys_path
from .discovery import disc_benchmarks
from .run import _run

wall_timer = timeit.default_timer


def recvall(sock, size):
    """
    Receives data from a socket until the specified size of data has been received.

    #### Parameters
    **sock** (`socket`)
    : The socket from which the data will be received. This socket should
    already be connected to the other end from which data is to be received.

    **size** (`int`)
    : The total size of data to be received from the socket.

    #### Returns
    **data** (`bytes`)
    : The data received from the socket. The length of this data will be equal
    to the size specified.

    #### Raises
    **RuntimeError**
    : If the socket closed before the specified size of data could be received.

    #### Notes
    This function continuously receives data from the provided socket in a loop
    until the total length of the received data is equal to the specified size.
    If the socket closes before the specified size of data could be received, a
    `RuntimeError` is raised. The function returns the received data as a byte
    string.
    """
    data = b""
    while len(data) < size:
        s = sock.recv(size - len(data))
        data += s
        if not s:
            raise RuntimeError(
                "did not receive data from socket " f"(size {size}, got only {data !r})"
            )
    return data


def _run_server(args):
    """
    Runs a server that executes benchmarks based on the received commands.

    #### Parameters
    **args** (`tuple`)
    : A tuple containing the benchmark directory and socket name.

    - `benchmark_dir` (`str`): The directory where the benchmarks are located.
    - `socket_name` (`str`): The name of the UNIX socket to be used for
    - communication.

    #### Raises
    **RuntimeError**
    : If the received command contains unknown data.

    #### Notes
    This function creates a server that listens on a UNIX socket for commands.
    It can perform two actions based on the received command: quit or preimport
    benchmarks.

    If the command is "quit", the server stops running. If the command is
    "preimport", the function imports all the benchmarks in the specified
    directory, capturing all the I/O to a file during import. After the
    benchmarks are imported, the function sends the contents of the output file
    back through the socket.

    If the action is not "quit" or "preimport", the function assumes it is a
    command to run a specific benchmark. It then runs the benchmark and waits
    for the results. It also handles a timeout for the benchmark execution and
    sends the results back through the socket.

    The function continuously accepts new commands until it receives a "quit"
    command or a KeyboardInterrupt.

    It uses UNIX domain sockets for inter-process communication. The name of the
    socket is passed as a parameter in `args`. The socket is created, bound to
    the socket name, and set to listen for connections. When a connection is
    accepted, the command is read from the socket, parsed, and executed
    accordingly. After executing the command, the server sends back the result
    through the socket and waits for the next command.
    """
    import signal
    import socket

    (
        benchmark_dir,
        socket_name,
    ) = args

    update_sys_path(benchmark_dir)

    # Socket I/O
    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    s.bind(socket_name)
    s.listen(1)

    # Read and act on commands from socket
    while True:
        stdout_file = None

        try:
            conn, addr = s.accept()
        except KeyboardInterrupt:
            break

        try:
            fd, stdout_file = tempfile.mkstemp()
            os.close(fd)

            # Read command
            (read_size,) = struct.unpack("<Q", recvall(conn, 8))
            command_text = recvall(conn, read_size)
            command_text = command_text.decode("utf-8")

            # Parse command
            command = json.loads(command_text)
            action = command.pop("action")

            if action == "quit":
                break
            elif action == "preimport":
                # Import benchmark suite before forking.
                # Capture I/O to a file during import.
                with posix_redirect_output(stdout_file, permanent=False):
                    for _ in disc_benchmarks(benchmark_dir, ignore_import_errors=True):
                        pass

                # Report result
                with open(stdout_file, errors="replace") as f:
                    out = f.read()
                out = json.dumps(out)
                out = out.encode("utf-8")
                conn.sendall(struct.pack("<Q", len(out)))
                conn.sendall(out)
                continue

            benchmark_id = command.pop("benchmark_id")
            params_str = command.pop("params_str")
            profile_path = command.pop("profile_path")
            result_file = command.pop("result_file")
            timeout = command.pop("timeout")
            cwd = command.pop("cwd")

            if command:
                raise RuntimeError(f"Command contained unknown data: {command_text !r}")

            # Spawn benchmark
            run_args = (
                benchmark_dir,
                benchmark_id,
                params_str,
                profile_path,
                result_file,
            )
            pid = os.fork()
            if pid == 0:
                conn.close()
                sys.stdin.close()
                exitcode = 1
                try:
                    with posix_redirect_output(stdout_file, permanent=True):
                        try:
                            os.chdir(cwd)
                            _run(run_args)
                            exitcode = 0
                        except BaseException:
                            import traceback

                            traceback.print_exc()
                finally:
                    os._exit(exitcode)

            # Wait for results
            # (Poll in a loop is simplest --- also used by subprocess.py)
            start_time = wall_timer()
            is_timeout = False
            time2sleep = 1e-15
            while True:
                res, status = os.waitpid(pid, os.WNOHANG)
                if res != 0:
                    break

                if timeout is not None and wall_timer() > start_time + timeout:
                    # Timeout
                    if is_timeout:
                        os.kill(pid, signal.SIGKILL)
                    else:
                        os.kill(pid, signal.SIGTERM)
                    is_timeout = True
                time2sleep *= 1e1
                time.sleep(min(time2sleep, 0.001))

            # Report result
            with open(stdout_file, errors="replace") as f:
                out = f.read()

            # Emulate subprocess
            if os.WIFSIGNALED(status):
                retcode = -os.WTERMSIG(status)
            elif os.WIFEXITED(status):
                retcode = os.WEXITSTATUS(status)
            elif os.WIFSTOPPED(status):
                retcode = -os.WSTOPSIG(status)
            else:
                # shouldn't happen, but fail silently
                retcode = -128

            info = {"out": out, "errcode": -256 if is_timeout else retcode}

            result_text = json.dumps(info)
            result_text = result_text.encode("utf-8")

            conn.sendall(struct.pack("<Q", len(result_text)))
            conn.sendall(result_text)
        except KeyboardInterrupt:
            break
        finally:
            conn.close()
            if stdout_file is not None:
                os.unlink(stdout_file)