File: test_endpoints.py

package info (click to toggle)
txdbus 1.1.0-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 996 kB
  • sloc: python: 6,658; makefile: 7
file content (107 lines) | stat: -rw-r--r-- 3,498 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os

from unittest import skip

from twisted.internet import reactor
from twisted.internet.endpoints import UNIXServerEndpoint
from twisted.trial import unittest

from txdbus import endpoints


class EndpointsTester(unittest.TestCase):

    def setUp(self):
        self.pre_ses = os.environ['DBUS_SESSION_BUS_ADDRESS']

    def tearDown(self):
        os.environ['DBUS_SESSION_BUS_ADDRESS'] = self.pre_ses

    def env(self, val):
        if val:
            os.environ['DBUS_SESSION_BUS_ADDRESS'] = val
        else:
            if 'DBUS_SESSION_BUS_ADDRESS' in os.environ:
                del os.environ['DBUS_SESSION_BUS_ADDRESS']

    def gde(self, addr, client=True):
        return endpoints.getDBusEndpoints(reactor, addr, client)

    def test_no_session(self):
        self.env(None)
        try:
            endpoints.getDBusEnvEndpoints(reactor)
            self.assertTrue(False)
        except Exception as e:
            self.assertEqual(
                str(e),
                'DBus Session environment variable not set',
            )

    def xtest_env_session(self):
        pass

    @skip("Skipping")
    def test_unix_address(self):
        e = self.gde('unix:path=/var/run/dbus/system_bus_socket')[0]
        self.assertEqual(e._path, '/var/run/dbus/system_bus_socket')
        e = self.gde('unix:tmpdir=/tmp')[0]
        self.assertTrue(e._path.startswith('/tmp/dbus-'))
        e = self.gde(
            'unix:abstract=/tmp/dbus-jgAbdgyUH7,'
            'guid=6abbe624c672777bd87ab46e00027706'
        )[0]
        self.assertEqual(e._path, '\0/tmp/dbus-jgAbdgyUH7')
        e = self.gde('unix:abstract=/tmp/dbus-jgAbdgyUH7', False)[0]
        self.assertEqual(e._address, '\0/tmp/dbus-jgAbdgyUH7')
        self.assertTrue(isinstance(e, UNIXServerEndpoint))

    @skip("Skipping")
    def test_tcp_address(self):
        e = self.gde('tcp:host=127.0.0.1,port=1234')[0]
        self.assertEqual(e._host, '127.0.0.1')
        self.assertEqual(e._port, 1234)

        e = self.gde('tcp:host=127.0.0.1,port=1234', False)[0]
        self.assertEqual(e._interface, '127.0.0.1')
        self.assertEqual(e._port, 1234)

    def test_nonce_tcp_address(self):
        e = self.gde('nonce-tcp:host=127.0.0.1,port=1234,noncefile=/foo')[0]
        self.assertEqual(e._host, '127.0.0.1')
        self.assertEqual(e._port, 1234)
        self.assertTrue('noncefile' in e.dbus_args)
        self.assertEqual(e.dbus_args['noncefile'], '/foo')

    def test_launchd_address(self):
        l = self.gde('launchd:env=foo')
        self.assertEqual(l, [])

    def test_session(self):
        self.env(
            'unix:abstract=/tmp/dbus-jgAbdgyUH7,'
            'guid=6abbe624c672777bd87ab46e00027706'
        )
        e = self.gde('session')[0]
        self.assertEqual(e._path, '\0/tmp/dbus-jgAbdgyUH7')

        self.env(None)
        self.assertRaises(Exception, self.gde, 'session')

    def test_system(self):
        e = self.gde('system')[0]
        self.assertEqual(e._path, '/var/run/dbus/system_bus_socket')

    def test_multiple_addresses(self):
        self.env(
            'unix:abstract=/tmp/dbus-jgAbdgyUH7,'
            'guid=6abbe624c672777bd87ab46e00027706;'
            'tcp:host=127.0.0.1,port=1234'
        )
        l = self.gde('session')
        self.assertTrue(len(l) == 2)
        e = l[0]
        self.assertEqual(e._path, '\0/tmp/dbus-jgAbdgyUH7')
        e = l[1]
        self.assertEqual(e._host, '127.0.0.1')
        self.assertEqual(e._port, 1234)