File: test-simple-service-1

package info (click to toggle)
gdbuspp 3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,164 kB
  • sloc: cpp: 9,462; python: 477; sh: 215; makefile: 11
file content (646 lines) | stat: -rwxr-xr-x 25,006 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
#!/bin/python3
#  GDBus++ - glib2 GDBus C++ wrapper
#
#  SPDX-License-Identifier: AGPL-3.0-only
#
#  Copyright (C)  OpenVPN Inc <sales@openvpn.net>
#  Copyright (C)  David Sommerseth <davids@openvpn.net>

##
# @file tests/scripts/test-simple-service-1
#
# @brief Runs a series of tests against the tests/simple-service.cpp
#        test service
#

import dbus
import sys
import xmltodict



class IntrospectionParser(object):
    """
    Runs a recursive D-Bus introspection against a D-Bus service, mapping
    all the found D-Bus objects with their interfaces and the methods,
    properties and signals found there with their sigantures
    """

    ##
    #  Creates the IntrospectionParser object
    #
    # @param bus      Must be a dbus.SessionBus() or dbus.SystemBus()
    #                 connection object
    # @param service  Well-known bus name for the service to introspect
    #
    def __init__(self, bus, service):
        self.__bus = bus
        self.__service = service

        self.RunIntrospection()


    ##
    #  Internal parser helper.  This will run the Introspect D-Bus method
    #  on the given D-Bus path and parse the XML response into a Python
    #  dictionaries.  This dictionary is further processed to extract the
    #  information in a more condenced way
    #
    # @param path  D-Bus path where to start the introspection process
    #              Defaults to '/'
    #
    # @see @RunIntrospection for details on the return structure
    #
    def __inspect_path(self, path = '/'):
        obj = self.__bus.get_object(self.__service, path)
        introspect_interface = dbus.Interface(obj,
                dbus_interface='org.freedesktop.DBus.Introspectable')

        # Retrieve the D-Bus Introspection data, which is an XML string ...
        introspection_xml = introspect_interface.Introspect()
        # ... and parse that XML into a more Python compliant data structure
        parsed = xmltodict.parse(introspection_xml,
                force_list=('node','interface','method','property','signal','arg'))

        return self.__inspect_node(path, parsed['node'])


    ##
    #  Internal helper function, parsing the dictionary for a specific
    #  D-Bus path
    #
    #  If child objects are found, they will be introspected recursively
    #  by calling the @__inspect_path() method on the D-Bus path of the
    #  child object
    #
    # @param path        D-Bus path of the current node being inspected
    # @param start_node  A dictionary of the root 'node' element for a
    #                    specific D-Bus object
    #
    # @returns a partial dictionary of the currently parsed object.
    #          If children objects was found, they are appended to the
    #          result dictionary
    #
    def __inspect_node(self, path, start_node):
        ret = {}
        for node in start_node:
            if node is None:
                continue

            found_interfaces = {}
            if 'interface' in node:
                for intf in node['interface']:
                    if intf['@name'].startswith('org.freedesktop.DBus.'):
                        # These are default interfaces; skip them
                        continue

                    found_methods = {}
                    if 'method' in intf.keys():
                        for method in intf['method']:
                            m_args = {}
                            if 'arg' in method:
                                for arg in method['arg']:
                                    m_args[str(arg['@name'])] = {
                                        'type': str(arg['@type']),
                                        'dir': str(arg['@direction'])}
                            found_methods[str(method['@name'])] = m_args

                    found_props = {}
                    if 'property' in intf.keys():
                        for prop in intf['property']:
                            found_props[str(prop['@name'])] = {
                                'type': str(prop['@type']),
                                'access': str(prop['@access'])
                            }

                    found_signals = {}
                    if 'signal' in intf.keys():
                        for signal in intf['signal']:
                            s_args = {}
                            if 'arg' in signal:
                                for arg in signal['arg']:
                                    s_args[str(arg['@name'])] = {
                                        'type': str(arg['@type'])
                                    }
                            found_signals[str(signal['@name'])] = s_args;

                found_interfaces[str(intf['@name'])] = {
                    'methods': found_methods,
                    'properties': found_props,
                    'signals': found_signals
                }

            # Skip paths with no interfaces;
            # thus no methods, properties nor signals
            if len(found_interfaces) > 0:
                ret[path] = found_interfaces

            if 'node' in node:
                for child in node['node']:
                    if path == '/':
                        path = ''
                    r = self.__inspect_path(path + '/' + child['@name'])
                    if r is not None:
                        ret.update(r)
        return ret


    ##
    #  Runs a full, recursive introspection on the service
    #
    # @returns a dictionary with the following structure:
    #
    #  {"/full/dbus/path":
    #      {"interface.identifier":
    #          "methods": {
    #               "method_name": {
    #                   "argument_name": {
    #                        "type": "dbus-type-string",
    #                        "dir": "direction (in/out)"
    #                   },
    #                   ...
    #               },
    #               ....
    #           },
    #           "properties": {
    #                "property_name": {
    #                     "type": "dbus-type-string",
    #                     "access": "read/readwrite"
    #                },
    #                ...
    #           },
    #           "signals": {
    #               "signal_name": {
    #                   "arg_name": {"type": "dbus-type-string"},
    #                   ...
    #                },
    #                ...
    #           },
    #      },
    #      ...
    #  }
    #
    def RunIntrospection(self):
        self.__introspection = self.__inspect_path()
        return self.__introspection


    ##
    #  Retrieve all the "sections" in specific object path and interface
    #  group.  A section is typically the "methods", "properties" or "signals"
    #  groups
    #
    # @param  path       D-Bus path to retrieve information from
    # @param  interface  D-Bus object interface group to access
    #
    # @returns a list of strings with available sections
    #
    def GetSections(self, path, interface):
        try:
            return self.__introspection[path][interface].keys()
        except KeyError:
            return None


    ##
    #  Retrieve the type declaration fragment of a specific D-Bus section
    #  for a specific member.
    #
    # @param  path       D-Bus path to retrieve information from
    # @param  interface  D-Bus object interface group to access
    # @param  section    The introspection section to retrive data from
    # @param  name       Name of the member to retrieve the type declaration
    #
    # @returns a dictionary with the "type", "dir" or "access" members and
    #          and their assignments from the introspection
    #
    def GetFragment(self, path, interface, section, name):
        fragment = self.__introspection[path][interface]
        if section in fragment and name in fragment[section]:
            return fragment[section][name]
        else:
            return None

    ##
    #  Retrieve the last full introspection performed
    #  @see @RunIntrospection for details on the returned data
    #
    def GetIntrospection(self):
        return self.__introspection


class TestMethod(object):
    """
    Tests a specific D-Bus method, where the expected input/output
    information is provided as well as test input data with the expected
    result being returned back.
    """
    def __init__(self, name, input_type, output_type, input_data, result):
        self.__name = name
        self.__in_type = input_type
        self.__out_type = output_type
        self.__in_data = input_data
        self.__expect_res = result


    def GetName(self):
        return self.__name

    def CheckIntrospection(self, introsp):
        if len(introsp) == 0:
            return True

        for argname, spec in introsp.items():
            if spec['dir'] == 'in' and argname in self.__in_type:
                if self.__in_type[argname] != spec['type']:
                    return False
            elif spec['dir'] == 'out' and argname in self.__out_type:
                if self.__out_type[argname] != spec['type']:
                    return False
        return True


    def Run(self, bus, service, path, interface):
        obj = bus.get_object(service, path)
        obj_intf = dbus.Interface(obj, dbus_interface=interface)
        method = obj_intf.get_dbus_method(self.__name)

        result = False
        try:
            result = self.__expect_res == method(self.__in_data)
        except dbus.exceptions.DBusException as err:
            print('           Calling {name} failed'.format(name=self.__name))
            print('           Error: {}'.format(str(err)))
            result = False

        text = '       {name}({in_data}) - Result: {result}'
        print(text.format(name=self.__name,
                          in_data=str(self.__in_data),
                          result=result and "Pass" or "FAIL"))
        return result


class TestFailingMethod(object):
    """
    This is similar to the TestMethod() class, but this tests methods
    which are expected to fail.  Such D-Bus methods should not normally
    be found in a service, but is used to test the behaviour when an internal
    error happens on the service side
    """
    def __init__(self, name, input_type, output_type, input_data):
        self.__name = name
        self.__in_type = input_type
        self.__out_type = output_type
        self.__in_data = input_data

    def GetName(self):
        return self.__name

    def CheckIntrospection(self, introsp):
        if len(introsp) == 0:
            return True

        for argname, spec in introsp.items():
            if spec['dir'] == 'in' and argname in self.__in_type:
                if self.__in_type[argname] != spec['type']:
                    return False
            elif spec['dir'] == 'out' and argname in self.__out_type:
                if self.__out_type[argname] != spec['type']:
                    return False
        return True


    def Run(self, bus, service, path, interface):
        obj = bus.get_object(service, path)
        obj_intf = dbus.Interface(obj, dbus_interface=interface)
        method = obj_intf.get_dbus_method(self.__name)

        result = False
        try:
            method(self.__in_data)
            result = False
        except dbus.exceptions.DBusException as err:
            result = True

        text = '       {name}({in_data}) - Result: {result}'
        print(text.format(name=self.__name,
                          in_data=str(self.__in_data),
                          result=result and "Pass" or "FAIL"))
        return result




class TestProperty(object):
    """
    Checks and validates specific D-Bus object properties.  If a new value
    is provided, it will also attempt changing to that value and set it back
    to the initial value.
    """


    def __init__(self, name, value_type, init_val, new_val):
        self.__name = name
        self.__type = value_type
        self.__init_val = init_val
        self.__new_val = new_val


    def GetName(self):
        return self.__name


    def CheckIntrospection(self, introsp):
        if (self.__new_val is not None and introsp["access"] == "readwrite") \
            or (self.__new_val is None and introsp["access"] == "read"):
            if introsp["type"] == self.__type:
                return True
        return False


    def Run(self, bus, service, path, interface):
        obj = bus.get_object(service, path)
        prop_intf = dbus.Interface(obj, dbus_interface="org.freedesktop.DBus.Properties")

        test_result = True
        propval = prop_intf.Get(interface, self.__name)
        result = self.__init_val == propval

        text = '       {name} ... Initial value - Result: {result}'
        print(text.format(name=self.__name, result=result and "Pass" or "FAIL"))
        if result is False:
            print('           Expected: {}'.format(self.__init_val))
            print('           Received: {}'.format(propval))
        test_result = test_result and result

        if self.__new_val is not None:
            prop_intf.Set(interface, self.__name, self.__new_val)
            propval = prop_intf.Get(interface, self.__name)
            result = self.__new_val == propval
            text = '       {name} ... Changing property - Result: {result}'
            print(text.format(name=self.__name, result=result and "Pass" or "FAIL"))

            test_result = test_result and result
            prop_intf.Set(interface, self.__name, self.__init_val)
            propval = prop_intf.Get(interface, self.__name)
            result = self.__init_val == propval
            text = '       {name} ... Changing back to initial value - Result: {result}'
            print(text.format(name=self.__name, result=result and "Pass" or "FAIL"))
            test_result = test_result and result

        return test_result



class ExpectObject(object):
    """
    Declaration a D-Bus object expected to be found in a D-Bus service, with
    a specific interface group.

    This is used to collect TestMethod, TestFailingMethod and TestProperty
    objects which will be run when exectuting the RunTests() method
    """
    def __init__(self, bus, service, path, interface):
        self.__bus = bus
        self.__service = service
        self.__path = path
        self.__interface = interface

        self.__tests = []


    def GetInfo(self):
        return (self.__path, self.__interface, "ExpectObject")

    def AddTest(self, tester_obj):
        self.__tests.append(tester_obj)

    def RunTests(self, introspection):
        test_result = True
        for runtest in self.__tests:
            name = runtest.GetName()

            try:
                for section in introspection.GetSections(self.__path, self.__interface):
                    intrsp_fragm = introspection.GetFragment(self.__path,
                                        self.__interface, section, name)
                    if intrsp_fragm is None:
                        continue

                    introsp_test = runtest.CheckIntrospection(intrsp_fragm)

                    text = '     [{section}] -- {name} ... Introspection test: {result}'
                    print(text.format(section=section, name=name,  result=introsp_test and "Pass" or "FAIL"))

                    text = '     [{section}] -- {name} ... Testing functionality:'
                    print(text.format(section=section, name=name))
                    obj_test = runtest.Run(self.__bus, self.__service, self.__path, self.__interface)
                    text = '     [{section}] -- {name} ... Functionality test restult: {result}\n'
                    print(text.format(section=section, name=name, result=obj_test and "Pass" or "FAIL"))

                    test_result = test_result and introsp_test and obj_test
            except TypeError:
                text = '     {name}, path={path}, interface={interf} ... Introspection failed'
                print(text.format(name=name, path=self.__path, interf=self.__interface))
                test_result = False
        return test_result



class ExpectMissingObject(object):
    """
    Declaration of a test to check if a D-Bus object is missing.
    """
    def __init__(self, bus, service, path, interface):
        self.__bus = bus
        self.__service = service
        self.__path = path
        self.__interface = interface


    def GetInfo(self):
        return (self.__path, self.__interface, "ExpectMissingObject")

    def RunTests(self, introspection):
        result = introspection.GetSections(self.__path, self.__interface) == None
        text = '     Checking object is not available ... Introspection test: {result}'
        print(text.format(result=result and "Pass" or "FAIL"))
        return result



class TestService(object):
    """
    Declaration of a service to test.  This contains a collection of
    ExpectedObject objects which will be accessed when running the
    RunTests() method
    """
    def __init__(self, bus, service):
        self.__bus = bus
        self.__service = service

        self.__expect_objects = []


    def ExpectObject(self, path, interface):
        expobj = ExpectObject(self.__bus, self.__service, path, interface)
        self.__expect_objects.append(expobj)
        return expobj


    def ExpectMissingObject(self, path, interface):
        expobj = ExpectMissingObject(self.__bus, self.__service, path, interface)
        self.__expect_objects.append(expobj)
        return expobj


    def RunTests(self):
        result = True
        for obj in self.__expect_objects:
            info = obj.GetInfo()

            # Retrieve a new introspection per test object; it may have
            # changed in prior tests
            introspection = IntrospectionParser(self.__bus, self.__service)
            text = '>>>  Running test: [{testname}] path={path}, interface={intf}'
            print(text.format(path=info[0], intf=info[1], testname=info[2]))
            result = result and obj.RunTests(introspection)

            print('   Test Result: {result}'.format(result=result and "Pass" or "FAIL"))
            print()
        return result




if __name__ == "__main__":
    """
    Main test driver for the tests/simple-service.cpp D-Bus service

    This service is expected to be found on the session bus prior to running
    this test, under the 'net.openvpn.gdbuspp.test.simple' well-known bus name
    """

    session_bus = dbus.SessionBus()
    tests = TestService(session_bus, 'net.openvpn.gdbuspp.test.simple')

    #
    #  Test methods expecting to fail
    #  Run these tests early, to see if it triggers a service crash
    #
    failing_methods = tests.ExpectObject('/gdbuspp/tests/simple1/method_failures','gdbuspp.test.simple1')
    failing_methods.AddTest(TestFailingMethod('InputMismatch',{'not_an_int': 's'}, {'failed': 'b'}, dbus.String('not a number')))
    failing_methods.AddTest(TestFailingMethod('OutputMismatch',{}, {'failed': 'b'}, None))



    #
    #  Test various methods
    #
    simple1_methods = tests.ExpectObject('/gdbuspp/tests/simple1/methods', 'gdbuspp.test.simple1')
    simple1_methods.AddTest(TestMethod('MethodNoArgs',
                                        {},
                                        {},
                                        None,
                                        None))

    simple1_methods.AddTest(TestMethod('GetCallerBusName',
                                        {},
                                        {'caller_busname': 's'},
                                        None,
                                        dbus.String(session_bus.get_unique_name())))

    simple1_methods.AddTest(TestMethod('StringLength',
                                        {'string': 's'},
                                        {'length': 'i'},
                                        dbus.String('A little and short test'),
                                        dbus.Int32(23)))

    simple1_methods.AddTest(TestMethod('CreateSimpleObject',
                                        {'string': 's'},
                                        {'path': 'o'},
                                        dbus.String('testscript1'),
                                        dbus.ObjectPath('/gdbuspp/tests/simple1/childs/testscript1')))

    # Preparing a child object for the simple1_child test further below
    simple1_methods.AddTest(TestMethod('CreateSimpleObject',
                                        {'string': 's'},
                                        {'path': 'o'},
                                        dbus.String('child_test'),
                                        dbus.ObjectPath('/gdbuspp/tests/simple1/childs/child_test')))

    simple1_methods.AddTest(TestMethod('RemoveSimpleObject',
                                        {'string': 's'},
                                        None,
                                        dbus.String('testscript1'),
                                        None))

    tests.ExpectMissingObject('/gdbuspp/tests/simple1/childs/testscript1', 'gdbuspp.test.simple1.child')

    simple1_methods.AddTest(TestMethod('GetRemovedObjects',
                                        {},
                                        {'paths': 'ao'},
                                        None,
                                        dbus.Array([dbus.ObjectPath('/gdbuspp/tests/simple1/childs/testscript1')], signature=dbus.Signature('o'))))

    #
    #  Testing a child object
    #
    simple1_child = tests.ExpectObject('/gdbuspp/tests/simple1/childs/child_test', 'gdbuspp.test.simple1.child')
    simple1_child.AddTest(TestMethod('GetMyName',
                                       {},
                                       {'name': 's'},
                                       None,
                                       dbus.String('child_test')))
    simple1_child.AddTest(TestProperty('my_path', 'o', '/gdbuspp/tests/simple1/childs/child_test', None))
    simple1_child.AddTest(TestMethod('RemoveMe',
                                       {'name': 's'},
                                       {},
                                       dbus.String('child_test'),
                                       None))
    tests.ExpectMissingObject('/gdbuspp/tests/simple1/childs/child_test', 'gdbuspp.test.simple1.child')

    #
    #  Test properties storing single data types
    #
    simple1_props = tests.ExpectObject('/gdbuspp/tests/simple1/properties', 'gdbuspp.test.simple1')
    simple1_props.AddTest(TestProperty('bool_val', 'b', dbus.Boolean(True), dbus.Boolean(False)))
    simple1_props.AddTest(TestProperty('int_val', 'i', dbus.Int32(-345), dbus.Int32(65000)))
    simple1_props.AddTest(TestProperty('uint_val', 'u', dbus.UInt32(123), dbus.UInt32(16)))
    simple1_props.AddTest(TestProperty('long_val', 'x', dbus.Int64(9223372036854775807), dbus.Int64(-9223372036854775806)))
    simple1_props.AddTest(TestProperty('ulong_val', 't', dbus.UInt64(18446744073709551615), dbus.UInt64(32)))
    simple1_props.AddTest(TestProperty('string_val', 's', dbus.String('Initial string'), dbus.String('Test script string value')))

    #
    #  Test properties storing arrays of various types
    #
    int_array_init = dbus.Array([dbus.Int32(-10), dbus.Int32(3), dbus.Int32(16), dbus.Int32(9388)], 'i')
    int_array_new  = dbus.Array([dbus.Int32(-9481), dbus.Int32(3134), dbus.Int32(4), dbus.Int32(-10394)], 'i')
    simple1_props.AddTest(TestProperty('int_array', 'ai', int_array_init, int_array_new))

    uint_array_init = dbus.Array([dbus.UInt32(0), dbus.UInt32(15), dbus.UInt32(16), dbus.UInt32(31), dbus.UInt32(32), dbus.UInt32(65534), dbus.UInt32(65535)], 'u')
    uint_array_new  = dbus.Array([dbus.UInt32(9481), dbus.UInt32(333134), dbus.UInt32(14), dbus.UInt32(85710394)], 'u')
    simple1_props.AddTest(TestProperty('uint_array', 'au', uint_array_init, uint_array_new))

    string_array_init = dbus.Array([dbus.String('line 1'), dbus.String('line 2'), dbus.String('line 3')])
    string_array_new  = dbus.Array([dbus.String('text line A'),dbus.String('text line B'),dbus.String('TEXT LINE c'),dbus.String('extended array')])
    simple1_props.AddTest(TestProperty('string_array', 'as', string_array_init, string_array_new))

    #
    #  Test property storing a struct of more data types
    #
    complex_init = dbus.Struct([dbus.Boolean(True), dbus.Int32(98877), dbus.String('Initial complex string value')])
    complex_new  = dbus.Struct([dbus.Boolean(False), dbus.Int32(-304922), dbus.String('New complex value set by test script')])
    simple1_props.AddTest(TestProperty('complex', '(bis)', complex_init, complex_new))
    simple1_props.AddTest(TestProperty('complex_readonly', '(bis)', complex_init, None))

    ##
    ##  Run all the tests
    ##
    test_res = tests.RunTests()
    print("========  OVERALL RESULT: %s" % (test_res and "PASS" or "FAILED"))

    if test_res == True:
        sys.exit(0)
    else:
        sys.exit(1)