1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
#!/usr/bin/env python
# Copyright 2008 Mathias Hasselmann and/or Openismus
# Copyright 2008-2016 Collabora Ltd.
# SPDX-License-Identifier: MIT
from dbus.mainloop.glib import DBusGMainLoop
import dbus
import dbus.connection
import dbus.service
import dbus.server
from gi.repository import GLib
import os, sys
class TestService(dbus.service.Object):
NAME = 'org.freedesktop.dbus.TestService'
PATH = '/org/freedesktop/dbus/TestService'
def __init__(self, conn, path=PATH):
super(TestService, self).__init__(conn, path)
@dbus.service.method(dbus_interface=NAME,
out_signature='s',
in_signature='s')
def reverse(self, text):
text = list(text)
text.reverse()
return ''.join(text)
pin, pout = os.pipe()
child = os.fork()
if 0 == child:
DBusGMainLoop(set_as_default=True)
server = dbus.server.Server('unix:tmpdir=/tmp')
def new_connection(conn):
print "new connection, %r" % conn
TestService(conn)
def connection_gone(conn):
print "goodbye, %r" % conn
# Instantiate a TestService every time a connection is created
server.on_connection_added.append(new_connection)
server.on_connection_removed.append(connection_gone)
os.write(pout, server.address)
os.close(pout)
os.close(pin)
print 'server running: %s' % server.address
GLib.MainLoop().run()
print 'server quit'
print 'done?'
else:
os.waitpid(child, os.WNOHANG)
os.close(pout)
address = os.read(pin, 128)
os.close(pin)
client = dbus.connection.Connection(address)
proxy = client.get_object(TestService.NAME, TestService.PATH)
object = dbus.Interface(proxy, TestService.NAME)
while True:
line = sys.stdin.readline()
if not line: break
text = line.strip()
print 'reverse(%s): %s' % (text, object.reverse(text))
client.close()
|