File: TargetInformationRetrieverTest.py

package info (click to toggle)
nordugrid-arc 7.1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 29,376 kB
  • sloc: cpp: 136,694; python: 12,452; perl: 12,313; php: 11,408; sh: 10,882; ansic: 3,305; makefile: 3,160; xml: 180; sql: 130; javascript: 53; sed: 30
file content (118 lines) | stat: -rw-r--r-- 5,420 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import testutils, arc, unittest, time

class TargetInformationRetrieverTest(testutils.ARCClientTestCase):

    def setUp(self):
        self.usercfg = arc.UserConfig(arc.initializeCredentialsType(arc.initializeCredentialsType.SkipCredentials))
        self.ce = arc.Endpoint()
        self.ce.URLString = "test.nordugrid.org"
        self.ce.InterfaceName = "org.nordugrid.tirtest"
        arc.TargetInformationRetrieverPluginTESTControl.delay = 0
        arc.TargetInformationRetrieverPluginTESTControl.targets = [arc.ComputingServiceType()]
        arc.TargetInformationRetrieverPluginTESTControl.status = arc.EndpointQueryingStatus(arc.EndpointQueryingStatus.SUCCESSFUL)

    def test_the_class_exists(self):
        self.expect(arc.TargetInformationRetriever).to_be_an_instance_of(type)

    def test_the_constructor(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        self.expect(retriever).to_be_an_instance_of(arc.TargetInformationRetriever)

    def test_getting_a_target(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        self.expect(container).to_be_empty()
        retriever.addEndpoint(self.ce)
        retriever.wait()
        self.expect(container).to_have(1).target()

    def test_getting_a_target_without_interfacename_specified(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        self.expect(container).to_be_empty()
        self.ce.InterfaceName = ""
        retriever.addEndpoint(self.ce)
        retriever.wait()
        self.expect(container).to_have(1).target()

    def test_getting_status(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        arc.TargetInformationRetrieverPluginTESTControl.status = arc.EndpointQueryingStatus(arc.EndpointQueryingStatus.SUCCESSFUL, "TEST")
        retriever.addEndpoint(self.ce)
        retriever.wait()
        status = retriever.getStatusOfEndpoint(self.ce)
        self.expect(status).to_be_an_instance_of(arc.EndpointQueryingStatus)
        self.expect(status).to_be(arc.EndpointQueryingStatus.SUCCESSFUL)
        self.expect(status.getDescription()).to_be("TEST")

    def test_getting_status_without_interfacename_specified(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        arc.TargetInformationRetrieverPluginTESTControl.status = arc.EndpointQueryingStatus(arc.EndpointQueryingStatus.SUCCESSFUL, "TEST")
        self.ce.InterfaceName = ""
        retriever.addEndpoint(self.ce)
        retriever.wait()
        status = retriever.getStatusOfEndpoint(self.ce)
        self.expect(status).to_be(arc.EndpointQueryingStatus.SUCCESSFUL)
        self.expect(status.getDescription()).to_be("TEST")

    def test_the_status_is_started_first(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        arc.TargetInformationRetrieverPluginTESTControl.delay = 0.1
        retriever.addEndpoint(self.ce)
        status = retriever.getStatusOfEndpoint(self.ce)
        self.expect(status).to_be(arc.EndpointQueryingStatus.STARTED)
        retriever.wait()
        status = retriever.getStatusOfEndpoint(self.ce)
        self.expect(status).to_be(arc.EndpointQueryingStatus.SUCCESSFUL)

    def test_same_endpoint_is_not_queried_twice(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        retriever.addEndpoint(self.ce)
        retriever.addEndpoint(self.ce)
        retriever.wait()
        self.expect(container).to_have(1).target()

    def test_removing_the_consumer(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        arc.TargetInformationRetrieverPluginTESTControl.delay = 0.1
        retriever.addEndpoint(self.ce)
        retriever.removeConsumer(container)
        retriever.wait()
        self.expect(container).to_have(0).targets()

    def test_deleting_the_consumer_before_the_retriever(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container = arc.ComputingServiceContainer()
        retriever.addConsumer(container)
        arc.TargetInformationRetrieverPluginTESTControl.delay = 0.1
        retriever.addEndpoint(self.ce)
        retriever.removeConsumer(container)
        del container
        retriever.wait()
        # expect it not to crash

    def test_two_consumers(self):
        retriever = arc.TargetInformationRetriever(self.usercfg)
        container1 = arc.ComputingServiceContainer()
        container2 = arc.ComputingServiceContainer()
        retriever.addConsumer(container1)
        retriever.addConsumer(container2)
        retriever.addEndpoint(self.ce)
        retriever.wait()
        self.expect(container1).to_have(1).target()
        self.expect(container2).to_have(1).target()

if __name__ == '__main__':
    unittest.main()