File: statsd.py

package info (click to toggle)
python-molotov 2.7-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,264 kB
  • sloc: python: 4,121; makefile: 60
file content (118 lines) | stat: -rw-r--r-- 2,731 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import asyncio
import functools
import os
import signal

import multiprocess


def debug(data):
    with open("/tmp/yeah.txt", "a+") as f:
        f.write(data + "\n")


# taken from aiostatsd.tests.test_client
class ServerProto:
    def __init__(self, conn):
        self.conn = conn
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        debug(data.decode("utf8"))
        self.conn.send(data)

    def disconnect(self):
        if self.transport is None:
            return
        self.transport.close()

    def error_received(self, exc):
        raise Exception(exc)

    def connection_lost(self, exc):
        if exc is not None:
            print(exc)


class UDPServer:
    def __init__(self, host, port, conn):
        self.host = host
        self.port = port
        self.incoming = asyncio.Queue()
        self.conn = conn
        self.running = False

    def stop(self, *args, **kw):
        self.running = False

    async def run(self):
        ctx = {}

        def make_proto():
            proto = ServerProto(self.conn)
            ctx["proto"] = proto
            return proto

        debug("starting")
        loop = asyncio.get_running_loop()
        transport, protocol = await loop.create_datagram_endpoint(
            make_proto, local_addr=(self.host, self.port)
        )

        if self.port == 0:
            self.port = transport.get_extra_info("socket").getsockname()[1]
        self.conn.send(self.port)

        debug(f"waiting on port {self.port}")
        self.running = True
        try:
            while self.running:
                await asyncio.sleep(1.0)
        finally:
            debug("disco")
            ctx["proto"].disconnect()


def run_server():
    parent, child = multiprocess.Pipe()
    p = multiprocess.Process(target=functools.partial(_run, child))
    p.start()
    port = parent.recv()
    print(f"Running on port {port}")
    debug(f"Running on port {port}")
    return p, port, parent


def stop_server(p, conn):
    debug("Stopping server pipe")
    debug("killing process")
    os.kill(p.pid, signal.SIGINT)
    p.join(timeout=1.0)
    res = []
    for data in conn.recv():
        res.append(data)
    conn.close()
    return res


def _run(conn):
    server = UDPServer("localhost", 0, conn)
    signal.signal(signal.SIGINT, server.stop)
    try:
        asyncio.run(server.run())
    except KeyboardInterrupt:
        debug("killed")
    conn.send("STOPPED")
    conn.close()


if __name__ == "__main__":
    try:
        p, port, conn = run_server()
        while True:
            print(conn.recv())
    finally:
        print(stop_server(p, conn))