File: server.py

package info (click to toggle)
python-pytest-retry 1.6.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: python: 658; makefile: 3
file content (63 lines) | stat: -rw-r--r-- 1,926 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import socket
import threading
from io import StringIO
from _pytest.terminal import TerminalReporter


class ReportHandler:
    def __init__(self) -> None:
        self.stream = StringIO()

    def build_retry_report(self, terminalreporter: TerminalReporter) -> None:
        pass

    def record_attempt(self, lines: list[str]) -> None:
        pass


class OfflineReporter(ReportHandler):
    def __init__(self) -> None:
        super().__init__()

    def record_attempt(self, lines: list[str]) -> None:
        self.stream.writelines(lines)


class ReportServer(ReportHandler):
    def __init__(self) -> None:
        super().__init__()
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setblocking(True)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    def initialize_server(self) -> int:
        self.sock.bind(("localhost", 0))
        t = threading.Thread(target=self.run_server, daemon=True)
        t.start()
        return self.sock.getsockname()[-1]

    def run_server(self) -> None:
        self.sock.listen()
        while True:
            conn, _ = self.sock.accept()

            while True:
                chunk = conn.recv(128)
                if not chunk:
                    break
                self.stream.write(chunk.decode("utf-8"))


class ClientReporter(ReportHandler):
    def __init__(self, port: int) -> None:
        super().__init__()
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setblocking(True)
        self.sock.connect(("localhost", port))

    def record_attempt(self, lines: list[str]) -> None:
        self.stream.writelines(lines)
        # Group reports for each item together before sending and resetting stream
        if not lines[1].endswith("Retrying!\n\t"):
            self.sock.sendall(self.stream.getvalue().encode("utf-8"))
            self.stream = StringIO()