File: holdhandler.py

package info (click to toggle)
rust-condure 1.10.0-8
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 2,384 kB
  • sloc: python: 345; makefile: 10
file content (157 lines) | stat: -rw-r--r-- 4,064 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# this handler holds all connections open

import os
import time
import datetime
import calendar
import tnetstring
import zmq

CONN_TTL = 60000
EXPIRE_INTERVAL = 60000

instance_id = 'holdhandler.{}'.format(os.getpid()).encode('utf-8')

ctx = zmq.Context()
in_sock = ctx.socket(zmq.PULL)
in_sock.connect('ipc://client-out')
in_stream_sock = ctx.socket(zmq.ROUTER)
in_stream_sock.identity = instance_id
in_stream_sock.connect('ipc://client-out-stream')
out_sock = ctx.socket(zmq.PUB)
out_sock.connect('ipc://client-in')

poller = zmq.Poller()
poller.register(in_sock, zmq.POLLIN)
poller.register(in_stream_sock, zmq.POLLIN)

class Connection(object):
    def __init__(self, rid):
        self.rid = rid
        self.seq = 0
        self.exp_time = None

    def send_msg(self, msg):
        msg[b'from'] = instance_id
        msg[b'id'] = self.rid[1]
        msg[b'seq'] = self.seq
        self.seq += 1

        print('OUT {} {}'.format(self.rid[0], msg))
        out_sock.send(self.rid[0] + b' T' + tnetstring.dumps(msg))

    def send_header(self):
        msg = {}
        msg[b'code'] = 200
        msg[b'reason'] = b'OK'
        msg[b'headers'] = [[b'Content-Type', b'text/plain']]
        msg[b'more'] = True

        self.send_msg(msg)

    def send_body(self, data):
        msg = {}
        msg[b'body'] = data
        msg[b'more'] = True

        self.send_msg(msg)

def send_body(to_addr, conns, data):
    ids = []
    for c in conns:
        ids.append({b'id': c.rid[1], b'seq': c.seq})
        c.seq += 1

    msg = {}
    msg[b'from'] = instance_id
    msg[b'id'] = ids
    msg[b'body'] = data
    msg[b'more'] = True

    print('OUT {} {}'.format(to_addr, msg))
    out_sock.send(to_addr + b' T' + tnetstring.dumps(msg))

conns = {}
last_exp_time = int(time.time())

while True:
    socks = dict(poller.poll(1000))

    if socks.get(in_sock) == zmq.POLLIN:
        m_raw = in_sock.recv()
    elif socks.get(in_stream_sock) == zmq.POLLIN:
        m_list = in_stream_sock.recv_multipart()
        m_raw = m_list[2]
    else:
        m_raw = None

    now = int(time.time() * 1000)

    if m_raw is not None:
        req = tnetstring.loads(m_raw[1:])
        print('IN {}'.format(req))

        m_from = req[b'from']
        m_id = req[b'id']
        m_type = req.get(b'type', b'')

        ids = []
        if isinstance(m_id, list):
            for id_seq in m_id:
                ids.append(id_seq[b'id'])
        else:
            ids.append(m_id)

        new_ids = []
        known_conns = []
        for i in ids:
            rid = (m_from, i)

            c = conns.get(rid)
            if c:
                c.exp_time = now + CONN_TTL
                known_conns.append(c)
            else:
                new_ids.append(rid)

        # data
        if not m_type:
            for rid in new_ids:
                c = Connection(rid)
                conns[rid] = c
                c.exp_time = now + CONN_TTL
                c.send_header()
        elif c:
            if m_type == b'keep-alive':
                dt = datetime.datetime.utcnow()
                ts = calendar.timegm(dt.timetuple())

                body = (
                    'id: TCPKaliMsgTS-{:016x}.\n'
                    'event: message\n'
                    'data: {:04}-{:02}-{:02}T{:02}:{:02}:{:02}\n\n'
                ).format(
                    (ts * 1000000) + dt.microsecond,
                    dt.year,
                    dt.month,
                    dt.day,
                    dt.hour,
                    dt.minute,
                    dt.second
                ).encode()

                send_body(m_from, known_conns, body)
            elif m_type == b'cancel':
                for c in known_conns:
                    del conns[c.rid]

    if now >= last_exp_time + EXPIRE_INTERVAL:
        last_exp_time = now

        to_remove = []
        for rid, c in conns.items():
            if now >= c.exp_time:
                to_remove.append(rid)
        for rid in to_remove:
            print('expired {}'.format(rid))
            del conns[rid]