File: fd_client.py

package info (click to toggle)
txdbus 1.1.0-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 996 kB
  • sloc: python: 6,658; makefile: 7
file content (70 lines) | stat: -rwxr-xr-x 1,966 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python

"""
fd_client.py

Grabs '/path/to/FDObject' on 'org.example' bus and demonstrates calling
remote methods with UNIX open file descriptors as arguments (type 'h' as
per the dbus spec).

NOTE:
Passing open UNIX filedescriptors accross RPC / ICP mechanisms such as dbus
requires the underlying transport to be a UNIX domain socket.
"""

from __future__ import print_function

from twisted.internet import defer, task

from txdbus import client


@defer.inlineCallbacks
def call_remote_verbose(obj, method, *args, **kwargs):

    print('calling %s%s' % (method, args), end=' = ')
    result = yield obj.callRemote(method, *args, **kwargs)
    print(repr(result))
    defer.returnValue(result)


@defer.inlineCallbacks
def main(reactor):

    PATH = '/path/to/FDObject'
    BUSN = 'org.example'

    try:
        bus = yield client.connect(reactor)
        print('connected to dbus')
        object = yield bus.getRemoteObject(BUSN, PATH)
        print('obtained remote object')
    except Exception as e:
        print('failed obtaining remote object: %s' % (e,))
        defer.returnValue(None)

    # Open this source file. Ask remote to read it and return byte count.
    with open(__file__, 'rb') as f:
        yield call_remote_verbose(object, 'lenFD', f.fileno())

    # Open this source file. Ask remote to read 10 bytes from it.
    with open(__file__, 'rb') as f:
        yield call_remote_verbose(object, 'readBytesFD', f.fileno(), 10)

    # Like before, now exercise passing two open UNIX FDs.
    # (will not be available under Twisted < 17.1.0)
    with open(__file__, 'rb') as f1, open(__file__, 'rb') as f2:
        fd1 = f1.fileno()
        fd2 = f2.fileno()
        try:
            yield call_remote_verbose(object, 'readBytesTwoFDs', fd1, fd2, 5)
        except Exception as e:
            print('remote call failed: %s' % (e,))

    bus.disconnect()
    print('disconnected')


if __name__ == '__main__':

    task.react(main)