File: TaskPortTest.py

package info (click to toggle)
clustershell 1.9.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,228 kB
  • sloc: python: 20,978; makefile: 149
file content (84 lines) | stat: -rw-r--r-- 2,435 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# ClusterShell test suite
# Written by S. Thiell

"""Unit test for ClusterShell inter-Task msg"""

import threading
import time
import unittest

from ClusterShell.Task import *
from ClusterShell.Event import EventHandler


class TaskPortTest(unittest.TestCase):

    def tearDown(self):
        task_cleanup()

    def testPortMsg1(self):
        """test port msg from main thread to task"""

        TaskPortTest.got_msg = False
        TaskPortTest.started = 0

        # create task in new thread
        task = Task()

        class PortHandler(EventHandler):
            def ev_port_start(self, port):
                TaskPortTest.started += 1

            def ev_msg(self, port, msg):
                # receive msg
                assert msg == "toto"
                assert task_self().thread == threading.current_thread()
                TaskPortTest.got_msg = True
                task_self().abort()

        # create non-autoclosing port
        port = task.port(handler=PortHandler())
        task.resume()
        # send msg from main thread
        port.msg("toto")
        task_wait()
        self.assertEqual(TaskPortTest.started, 1)
        self.assertTrue(TaskPortTest.got_msg)

    def testPortRemove(self):
        """test remove_port()"""

        class PortHandler(EventHandler):
            def ev_msg(self, port, msg):
                pass

        task = Task() # new thread
        port = task.port(handler=PortHandler(), autoclose=True)
        task.resume()
        task.remove_port(port)
        task_wait()

    def testPortClosed(self):
        """test port msg on closed port"""
        # test sending message to "stillborn" port
        self.port_msg_result = None

        # thread will wait a bit and send a port message
        def test_thread_start(port, test):
            time.sleep(0.5)
            test.port_msg_result = port.msg('foobar')

        class TestHandler(EventHandler):
            pass

        task = task_self()
        test_handler = TestHandler()
        task.timer(0.2, handler=test_handler, autoclose=False)
        port = task.port(handler=test_handler, autoclose=True)
        thread = threading.Thread(None, test_thread_start, args=(port, self))
        thread.daemon = True
        thread.start()
        task.resume()
        task.abort(kill=True) # will remove_port()
        thread.join()
        self.assertEqual(self.port_msg_result, False) # test vs. None and True