File: test_singletonperthread.py

package info (click to toggle)
waagent 2.12.0.2-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,780 kB
  • sloc: python: 55,011; xml: 3,325; sh: 1,183; makefile: 22
file content (164 lines) | stat: -rw-r--r-- 6,788 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import uuid
from multiprocessing import Queue
from threading import Thread, current_thread

from azurelinuxagent.common.singletonperthread import SingletonPerThread
from tests.lib.tools import AgentTestCase, clear_singleton_instances


class Singleton(SingletonPerThread):
    """
    Since these tests deal with testing in a multithreaded environment,
    we employ the use of multiprocessing.Queue() to ensure that the data is consistent.

     This test class uses a uuid to identify an object instead of directly using object reference because
    Queue.get() returns a different object reference than what is put in it even though the object is same
    (which is verified using uuid in this test class)

    Eg:

        obj1 = WireClient("obj1")
        obj1
        <__main__.WireClient object at 0x7f5e78476198>
        q = Queue()
        q.put(obj1)
        test1 = q.get()
        test1
        <__main__.WireClient object at 0x7f5e78430630>

        test1.endpoint == obj1.endpoint
        True
    """

    def __init__(self):
        # Set the name of the object to the current thread name
        self.name = current_thread().name
        # Unique identifier for a class object
        self.uuid = str(uuid.uuid4())


class TestSingletonPerThread(AgentTestCase):

    THREAD_NAME_1 = 'thread-1'
    THREAD_NAME_2 = 'thread-2'

    def setUp(self):
        super(TestSingletonPerThread, self).setUp()
        # In a multi-threaded environment, exceptions thrown in the child thread will not be propagated to the parent
        # thread. In order to achieve that, adding all exceptions to a Queue and then checking that in parent thread.
        self.errors = Queue()
        clear_singleton_instances(Singleton)

    def _setup_multithread_and_execute(self, func1, args1, func2, args2, t1_name=None, t2_name=None):

        t1 = Thread(target=func1, args=args1)
        t2 = Thread(target=func2, args=args2)
        t1.name = t1_name if t1_name else self.THREAD_NAME_1
        t2.name = t2_name if t2_name else self.THREAD_NAME_2
        t1.start()
        t2.start()
        t1.join()
        t2.join()

        errs = []
        while not self.errors.empty():
            errs.append(self.errors.get())
        if len(errs) > 0:
            raise Exception("Errors: %s" % ' , '.join(errs))

    @staticmethod
    def _get_test_class_instance(q, err):
        try:
            obj = Singleton()
            q.put(obj)
        except Exception as e:
            err.put(str(e))

    def _parse_instances_and_return_thread_objects(self, instances, t1_name=None, t2_name=None):
        obj1, obj2 = instances.get(), instances.get()

        def check_obj(name):
            if obj1.name == name:
                return obj1
            elif obj2.name == name:
                return obj2
            else:
                return None

        t1_object = check_obj(t1_name if t1_name else self.THREAD_NAME_1)
        t2_object = check_obj(t2_name if t2_name else self.THREAD_NAME_2)

        return t1_object, t2_object

    def test_it_should_have_only_one_instance_for_same_thread(self):
        obj1 = Singleton()
        obj2 = Singleton()

        self.assertEqual(obj1.uuid, obj2.uuid)

    def test_it_should_have_multiple_instances_for_multiple_threads(self):
        instances = Queue()
        self._setup_multithread_and_execute(func1=self._get_test_class_instance,
                                            args1=(instances, self.errors),
                                            func2=self._get_test_class_instance,
                                            args2=(instances, self.errors))

        self.assertEqual(2, instances.qsize())  # Assert that there are 2 objects in the queue
        obj1, obj2 = instances.get(), instances.get()
        self.assertNotEqual(obj1.uuid, obj2.uuid)

    def test_it_should_return_existing_instance_for_new_thread_with_same_name(self):

        instances = Queue()
        self._setup_multithread_and_execute(func1=self._get_test_class_instance,
                                            args1=(instances, self.errors),
                                            func2=self._get_test_class_instance,
                                            args2=(instances, self.errors))

        t1_obj, t2_obj = self._parse_instances_and_return_thread_objects(instances)

        new_instances = Queue()
        # The 2nd call is to get new objects with the same thread name to verify if the objects are same
        self._setup_multithread_and_execute(func1=self._get_test_class_instance,
                                            args1=(new_instances, self.errors),
                                            func2=self._get_test_class_instance,
                                            args2=(new_instances, self.errors))

        new_t1_obj, new_t2_obj = self._parse_instances_and_return_thread_objects(new_instances)

        self.assertEqual(t1_obj.name, new_t1_obj.name)
        self.assertEqual(t1_obj.uuid, new_t1_obj.uuid)
        self.assertEqual(t2_obj.name, new_t2_obj.name)
        self.assertEqual(t2_obj.uuid, new_t2_obj.uuid)

    def test_singleton_object_should_match_thread_name(self):

        instances = Queue()
        t1_name = str(uuid.uuid4())
        t2_name = str(uuid.uuid4())

        test_class_obj_name = lambda t_name: "%s__%s" % (Singleton.__name__, t_name)

        self._setup_multithread_and_execute(func1=self._get_test_class_instance,
                                            args1=(instances, self.errors),
                                            func2=self._get_test_class_instance,
                                            args2=(instances, self.errors),
                                            t1_name=t1_name,
                                            t2_name=t2_name)

        singleton_instances = Singleton._instances  # pylint: disable=no-member

        # Assert instance names are consistent with the thread names
        self.assertIn(test_class_obj_name(t1_name), singleton_instances)
        self.assertIn(test_class_obj_name(t2_name), singleton_instances)

        # Assert that the objects match their respective threads
        # This function matches objects with their thread names and returns the respective object or None if not found
        t1_obj, t2_obj = self._parse_instances_and_return_thread_objects(instances, t1_name, t2_name)
        # Ensure that objects for both the threads were found
        self.assertIsNotNone(t1_obj)
        self.assertIsNotNone(t2_obj)
        # Ensure that the objects match with their respective thread objects
        self.assertEqual(singleton_instances[test_class_obj_name(t1_name)].uuid, t1_obj.uuid)
        self.assertEqual(singleton_instances[test_class_obj_name(t2_name)].uuid, t2_obj.uuid)