1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
#!/usr/bin/env python
import time
import unittest
import rpyc
class MyService(rpyc.Service):
class exposed_Invoker(object):
def __init__(self, callback, interval):
self.callback = rpyc.async_(callback)
self.interval = interval
self.active = True
self.thread = rpyc.spawn(self.work)
def exposed_stop(self):
self.active = False
self.thread.join()
def work(self):
while self.active:
self.callback(time.time())
time.sleep(self.interval)
def exposed_foo(self, x):
time.sleep(2)
return x * 17
class Test_Multithreaded(unittest.TestCase):
def setUp(self):
self.conn = rpyc.connect_thread(remote_service=MyService)
self.bgserver = rpyc.BgServingThread(self.conn)
def tearDown(self):
self.bgserver.stop()
self.conn.close()
def test_invoker(self):
counter = [0]
def callback(x):
counter[0] += 1
print(f"callback {x}")
invoker = self.conn.root.Invoker(callback, 1)
# 3 * 2sec = 6 sec = ~6 calls to callback
for i in range(3):
print(f"foo{i} = {self.conn.root.foo(i)}")
invoker.stop()
print(f"callback called {counter[0]} times")
self.assertTrue(counter[0] >= 5)
if __name__ == "__main__":
unittest.main()
|