1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
import asyncio
import functools
import os
import signal
import multiprocess
def debug(data):
with open("/tmp/yeah.txt", "a+") as f:
f.write(data + "\n")
# taken from aiostatsd.tests.test_client
class ServerProto:
def __init__(self, conn):
self.conn = conn
self.transport = None
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
debug(data.decode("utf8"))
self.conn.send(data)
def disconnect(self):
if self.transport is None:
return
self.transport.close()
def error_received(self, exc):
raise Exception(exc)
def connection_lost(self, exc):
if exc is not None:
print(exc)
class UDPServer:
def __init__(self, host, port, conn):
self.host = host
self.port = port
self.incoming = asyncio.Queue()
self.conn = conn
self.running = False
def stop(self, *args, **kw):
self.running = False
async def run(self):
ctx = {}
def make_proto():
proto = ServerProto(self.conn)
ctx["proto"] = proto
return proto
debug("starting")
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
make_proto, local_addr=(self.host, self.port)
)
if self.port == 0:
self.port = transport.get_extra_info("socket").getsockname()[1]
self.conn.send(self.port)
debug(f"waiting on port {self.port}")
self.running = True
try:
while self.running:
await asyncio.sleep(1.0)
finally:
debug("disco")
ctx["proto"].disconnect()
def run_server():
parent, child = multiprocess.Pipe()
p = multiprocess.Process(target=functools.partial(_run, child))
p.start()
port = parent.recv()
print(f"Running on port {port}")
debug(f"Running on port {port}")
return p, port, parent
def stop_server(p, conn):
debug("Stopping server pipe")
debug("killing process")
os.kill(p.pid, signal.SIGINT)
p.join(timeout=1.0)
res = []
for data in conn.recv():
res.append(data)
conn.close()
return res
def _run(conn):
server = UDPServer("localhost", 0, conn)
signal.signal(signal.SIGINT, server.stop)
try:
asyncio.run(server.run())
except KeyboardInterrupt:
debug("killed")
conn.send("STOPPED")
conn.close()
if __name__ == "__main__":
try:
p, port, conn = run_server()
while True:
print(conn.recv())
finally:
print(stop_server(p, conn))
|