File: httpserver.py

package info (click to toggle)
python-asdf 4.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,032 kB
  • sloc: python: 24,068; makefile: 123
file content (62 lines) | stat: -rw-r--r-- 1,844 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import http.server
import os
import queue
import shutil
import socketserver
import tempfile
import threading

__all__ = ["HTTPServer"]


def run_server(tmp_path, handler_class, stop_event, queue):  # pragma: no cover
    """
    Runs an HTTP server serving files from given tmp_path in a separate
    process.  When it's ready, it sends a URL to the server over a
    queue so the main process (the HTTP client) can start making
    requests of it.
    """

    class HTTPRequestHandler(handler_class):
        def translate_path(self, path):
            path = handler_class.translate_path(self, path)
            return os.path.join(tmp_path, os.path.relpath(path, os.getcwd()))

    server = socketserver.TCPServer(("127.0.0.1", 0), HTTPRequestHandler)
    domain, port = server.server_address
    url = f"http://{domain}:{port}/"

    # Set a reasonable timeout so that invalid requests (which may occur during
    # testing) do not cause the entire test suite to hang indefinitely
    server.timeout = 0.1

    queue.put(url)

    # Using server.serve_forever does not work here since it ignores the
    # timeout value set above. Having an explicit loop also allows us to kill
    # the server from the parent thread.
    while not stop_event.is_set():
        server.handle_request()

    server.server_close()


class HTTPServer:
    handler_class = http.server.SimpleHTTPRequestHandler

    def __init__(self):
        self.tmpdir = tempfile.mkdtemp()

        q = queue.Queue()
        self.stop_event = threading.Event()

        args = (self.tmpdir, self.handler_class, self.stop_event, q)
        self.thread = threading.Thread(target=run_server, args=args)
        self.thread.start()

        self.url = q.get()

    def finalize(self):
        self.stop_event.set()
        self.thread.join()
        shutil.rmtree(self.tmpdir)