1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
# Standard library imports
import os
import unittest
from time import sleep
from threading import Thread
# ETS imports
from enthought.traits.api import Int, Str
# Local imports
from enthought.plugins.remote_editor.communication.client import Client
from enthought.plugins.remote_editor.communication.server import Server
from enthought.plugins.remote_editor.communication.util import \
get_server_port, LOCK_PATH
class TestClient(Client):
command = Str
arguments = Str
error_count = Int(0)
def handle_command(self, command, arguments):
self.command = command
self.arguments = arguments
def _error_changed(self, error):
if error:
self.error_count += 1
class TestThread(Thread):
def run(self):
self.server = Server()
self.server.init()
self.server.main()
class CommunicationTestCase(unittest.TestCase):
def setUp(self):
""" Make sure no old lock files exist prior to run.
"""
if os.path.exists(LOCK_PATH):
os.remove(LOCK_PATH)
def tearDown(self):
""" Make sure no old lock files are left around.
"""
if os.path.exists(LOCK_PATH):
os.remove(LOCK_PATH)
def testCommunication(self):
""" Can the Server communicate with Clients and handle errors
appropriately?
"""
# Test server set up
# Does the ping operation work when the Server is not running?
self.assert_(not Server.ping(get_server_port()))
# Set up server thread
serverThread = TestThread()
serverThread.setDaemon(True)
serverThread.start()
sleep(.5)
self.assert_(os.path.exists(LOCK_PATH))
# Test normal operation
self.assert_(Server.ping(get_server_port()))
client1 = TestClient(self_type='client1', other_type='client2')
client1.register()
client2 = TestClient(self_type='client2', other_type='client1')
client2.register()
sleep(.5)
self.assert_(not(client1.orphaned or client2.orphaned))
client1.send_command("foo", "bar")
sleep(.1)
self.assertEqual(client2.command, "foo")
self.assertEqual(client2.arguments, "bar")
client1.unregister()
sleep(.1)
self.assert_(client1.orphaned and client2.orphaned)
client1.register()
sleep(.1)
self.assert_(not(client1.orphaned or client2.orphaned))
# Simulated breakage -- does the Server handle unexpected communication
# failure?
# Have client1 'die'. We send the dummy command to force its connection
# loop to terminate after the call to 'stop'. (In Python we can't just
# kill the thread, which is really what we want to do in this case.)
client1._communication_thread.stop()
sleep(.1)
serverThread.server._send_to(client1._port, "dummy", "")
sleep(.1)
# The Server should inform client1 that it could not complete its
# request
client2.send_command("foo", "bar")
sleep(.1)
self.assert_(client2.orphaned)
self.assertEqual(client2.error_count, 1)
if __name__ == '__main__':
""" Run the unittest, but redirect the log to stderr for convenience.
"""
import logging
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
logging.getLogger("communication").addHandler(console)
unittest.main()
|