1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
#!/usr/bin/env python3
import time
import signal
import multiprocessing as mp
import pdb # noqa
import rpyc
import traceback
def echo_once():
start = time.time()
conn = rpyc.connect("localhost", 18861, config={"sync_request_timeout": None})
cdelta = time.time() - start
addr, port = conn._channel.stream.sock.getsockname()
fileno = conn.fileno()
start = time.time()
conn.root.echo("Echo")
edelta = time.time() - start
conn.close()
return cdelta, edelta, fileno, addr, port
def echo_forever(main_event):
try:
count = 0
edelta = 0
cdelta = 0
_max = {'edelta': 0, 'cdelta': 0}
fileno = "unknown"
addr = "unknown"
port = "unknown"
cdelta = -1
edelta = -1
while main_event.is_set():
count += 1
cdelta, edelta, fileno, addr, port = echo_once()
_max['cdelta'] = cdelta
_max['edelta'] = edelta
except KeyboardInterrupt:
if main_event.is_set():
main_event.clear()
except Exception:
tb = f"EXCEPT ('{addr}', {port}) with fd {fileno} over cdelta {cdelta} and delta {edelta}\n"
tb += traceback.format_exc()
return None, tb
finally:
return _max, None
def echo_client_pool(client_limit):
try:
sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = mp.Pool(processes=client_limit)
signal.signal(signal.SIGINT, sigint)
eid_proc = {}
pool_manager = mp.Manager()
main_event = pool_manager.Event()
main_event.set()
for eid in range(client_limit):
eid_proc[eid] = pool.apply_async(func=echo_forever, args=(main_event,))
while True:
alive = len([r for r in eid_proc.values() if not r.ready()])
print('{0}/{1} alive'.format(alive, client_limit))
if alive == 1:
print('All of the client processes are dead except one. Exiting loop...')
break
else:
time.sleep(1)
res = [r.get() for r in eid_proc.values() if r.ready()]
cdelta = [_max['cdelta'] for _max, tb in res if _max]
edelta = [_max['edelta'] for _max, tb in res if _max]
if cdelta:
cdelta = max(cdelta)
else:
cdelta = "unknown"
if edelta:
edelta = max(edelta)
else:
edelta = "unknown"
time.sleep(1)
print(f"Max time to establish: {cdelta}")
print(f"Max time echo reply: {edelta}")
main_event.clear()
except KeyboardInterrupt:
main_event.clear()
for proc in eid_proc.values():
proc.terminate()
def main(client_limit):
if client_limit == 1:
echo_once()
else:
echo_client_pool(client_limit)
if __name__ == "__main__":
main(client_limit=5)
|