1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
#!/usr/bin/python
import threading
import time
import unittest
import lcm
class TestLcmThreads(unittest.TestCase):
def test_handle_and_publish(self):
"""Worker thread calls LCM.handle(), while main thread calls
LCM.publish()
"""
lcm_obj = lcm.LCM("memq://")
def on_msg(channel, data):
on_msg.msg_handled = True
on_msg.msg_handled = False
lcm_obj.subscribe("channel", on_msg)
def thread_func():
lcm_obj.handle()
worker = threading.Thread(target=thread_func)
worker.start()
time.sleep(0.1)
lcm_obj.publish("channel", "")
worker.join()
self.assertTrue(on_msg.msg_handled)
def test_two_handle_timeout(self):
"""Check that calling LCM.handle_timeout() from two threads at once is
not allowed.
"""
lcm_obj = lcm.LCM("memq://")
def thread_func():
lcm_obj.handle_timeout(5000)
lcm_obj.subscribe("channel", lambda *x: None)
worker = threading.Thread(target=thread_func)
worker.start()
time.sleep(0.1)
with self.assertRaises(RuntimeError):
lcm_obj.handle_timeout(1000)
lcm_obj.publish("channel", "")
worker.join()
def test_two_handle(self):
"""Check that calling LCM.handle() from two threads at once is not
allowed.
"""
lcm_obj = lcm.LCM("memq://")
def thread_func():
lcm_obj.handle()
lcm_obj.subscribe("channel", lambda *x: None)
worker = threading.Thread(target=thread_func)
worker.start()
time.sleep(0.1)
with self.assertRaises(RuntimeError):
lcm_obj.handle()
lcm_obj.publish("channel", "")
worker.join()
def main():
unittest.main()
if __name__ == '__main__':
main()
|