1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
|
from .._private.unmarshaller import Unmarshaller
from ..constants import BusType
from ..message import Message
from ..constants import MessageType, MessageFlag, NameFlag, RequestNameReply, ReleaseNameReply
from ..message_bus import BaseMessageBus
from ..errors import AuthError
from .proxy_object import ProxyObject
from .. import introspection as intr
from ..auth import Authenticator, AuthExternal
import io
from typing import Callable, Optional
# glib is optional
_import_error = None
try:
from gi.repository import GLib
_GLibSource = GLib.Source
except ImportError as e:
_import_error = e
class _GLibSource:
pass
class _MessageSource(_GLibSource):
def __init__(self, bus):
self.unmarshaller = None
self.bus = bus
def prepare(self):
return (False, -1)
def check(self):
return False
def dispatch(self, callback, user_data):
try:
while self.bus._stream.readable():
if not self.unmarshaller:
self.unmarshaller = Unmarshaller(self.bus._stream)
if self.unmarshaller.unmarshall():
callback(self.unmarshaller.message)
self.unmarshaller = None
else:
break
except Exception as e:
self.bus.disconnect()
self.bus._finalize(e)
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
class _MessageWritableSource(_GLibSource):
def __init__(self, bus):
self.bus = bus
self.buf = b''
self.message_stream = None
self.chunk_size = 128
def prepare(self):
return (False, -1)
def check(self):
return False
def dispatch(self, callback, user_data):
try:
if self.buf:
self.bus._stream.write(self.buf)
self.buf = b''
if self.message_stream:
while True:
self.buf = self.message_stream.read(self.chunk_size)
if self.buf == b'':
break
self.bus._stream.write(self.buf)
if len(self.buf) < self.chunk_size:
self.buf = b''
break
self.buf = b''
self.bus._stream.flush()
if not self.bus._buffered_messages:
return GLib.SOURCE_REMOVE
else:
message = self.bus._buffered_messages.pop(0)
self.message_stream = io.BytesIO(message._marshall())
return GLib.SOURCE_CONTINUE
except BlockingIOError:
return GLib.SOURCE_CONTINUE
except Exception as e:
self.bus._finalize(e)
return GLib.SOURCE_REMOVE
class _AuthLineSource(_GLibSource):
def __init__(self, stream):
self.stream = stream
self.buf = b''
def prepare(self):
return (False, -1)
def check(self):
return False
def dispatch(self, callback, user_data):
self.buf += self.stream.read()
if self.buf[-2:] == b'\r\n':
resp = callback(self.buf.decode()[:-2])
if resp:
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
class MessageBus(BaseMessageBus):
"""The message bus implementation for use with the GLib main loop.
The message bus class is the entry point into all the features of the
library. It sets up a connection to the DBus daemon and exposes an
interface to send and receive messages and expose services.
You must call :func:`connect() <dbus_next.glib.MessageBus.connect>` or
:func:`connect_sync() <dbus_next.glib.MessageBus.connect_sync>` before
using this message bus.
:param bus_type: The type of bus to connect to. Affects the search path for
the bus address.
:type bus_type: :class:`BusType <dbus_next.BusType>`
:param bus_address: A specific bus address to connect to. Should not be
used under normal circumstances.
:param auth: The authenticator to use, defaults to an instance of
:class:`AuthExternal <dbus_next.auth.AuthExternal>`.
:type auth: :class:`Authenticator <dbus_next.auth.Authenticator>`
:ivar connected: True if this message bus is expected to be able to send
and receive messages.
:vartype connected: bool
:ivar unique_name: The unique name of the message bus connection. It will
be :class:`None` until the message bus connects.
:vartype unique_name: str
"""
def __init__(self,
bus_address: str = None,
bus_type: BusType = BusType.SESSION,
auth: Authenticator = None):
if _import_error:
raise _import_error
super().__init__(bus_address, bus_type, ProxyObject)
self._main_context = GLib.main_context_default()
# buffer messages until connect
self._buffered_messages = []
if auth is None:
self._auth = AuthExternal()
else:
self._auth = auth
def connect(self, connect_notify: Callable[['MessageBus', Optional[Exception]], None] = None):
"""Connect this message bus to the DBus daemon.
This method or the synchronous version must be called before the
message bus can be used.
:param connect_notify: A callback that will be called with this message
bus. May return an :class:`Exception` on connection errors or
:class:`AuthError <dbus_next.AuthError>` on authorization errors.
:type callback: :class:`Callable`
"""
def authenticate_notify(exc):
if exc is not None:
if connect_notify is not None:
connect_notify(None, exc)
return
self.message_source = _MessageSource(self)
self.message_source.set_callback(self._on_message)
self.message_source.attach(self._main_context)
self.writable_source = None
self.message_source.add_unix_fd(self._fd, GLib.IO_IN)
def on_hello(reply, err):
if err:
if connect_notify:
connect_notify(reply, err)
return
self.unique_name = reply.body[0]
for m in self._buffered_messages:
self.send(m)
if connect_notify:
connect_notify(self, err)
hello_msg = Message(destination='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='Hello',
serial=self.next_serial())
self._method_return_handlers[hello_msg.serial] = on_hello
self._stream.write(hello_msg._marshall())
self._stream.flush()
self._authenticate(authenticate_notify)
def connect_sync(self) -> 'MessageBus':
"""Connect this message bus to the DBus daemon.
This method or the asynchronous version must be called before the
message bus can be used.
:returns: This message bus for convenience.
:rtype: :class:`MessageBus <dbus_next.glib.MessageBus>`
:raises:
- :class:`AuthError <dbus_next.AuthError>` - If authorization to \
the DBus daemon failed.
- :class:`Exception` - If there was a connection error.
"""
main = GLib.MainLoop()
connection_error = None
def connect_notify(bus, err):
nonlocal connection_error
connection_error = err
main.quit()
self.connect(connect_notify)
main.run()
if connection_error:
raise connection_error
return self
def call(self,
msg: Message,
reply_notify: Callable[[Optional[Message], Optional[Exception]], None] = None):
"""Send a method call and asynchronously wait for a reply from the DBus
daemon.
:param msg: The method call message to send.
:type msg: :class:`Message <dbus_next.Message>`
:param reply_notify: A callback that will be called with the reply to
this message. May return an :class:`Exception` on connection errors.
:type reply_notify: Callable
"""
self._call(msg, reply_notify)
def call_sync(self, msg: Message) -> Optional[Message]:
"""Send a method call and synchronously wait for a reply from the DBus
daemon.
:param msg: The method call message to send.
:type msg: :class:`Message <dbus_next.Message>`
:returns: A message in reply to the message sent. If the message does
not expect a reply based on the message flags or type, returns
``None`` immediately.
:rtype: :class:`Message <dbus_next.Message>`
:raises:
- :class:`DBusError <dbus_next.DBusError>` - If the service threw \
an error for the method call or returned an invalid result.
- :class:`Exception` - If a connection error occurred.
"""
if msg.flags & MessageFlag.NO_REPLY_EXPECTED or msg.message_type is not MessageType.METHOD_CALL:
self.send(msg)
return None
if not msg.serial:
msg.serial = self.next_serial()
main = GLib.MainLoop()
handler_reply = None
connection_error = None
def reply_handler(reply, err):
nonlocal handler_reply
nonlocal connection_error
handler_reply = reply
connection_error = err
main.quit()
self._method_return_handlers[msg.serial] = reply_handler
self.send(msg)
main.run()
if connection_error:
raise connection_error
return handler_reply
def introspect_sync(self, bus_name: str, path: str) -> intr.Node:
"""Get introspection data for the node at the given path from the given
bus name.
Calls the standard ``org.freedesktop.DBus.Introspectable.Introspect``
on the bus for the path.
:param bus_name: The name to introspect.
:type bus_name: str
:param path: The path to introspect.
:type path: str
:returns: The introspection data for the name at the path.
:rtype: :class:`Node <dbus_next.introspection.Node>`
:raises:
- :class:`InvalidObjectPathError <dbus_next.InvalidObjectPathError>` \
- If the given object path is not valid.
- :class:`InvalidBusNameError <dbus_next.InvalidBusNameError>` - If \
the given bus name is not valid.
- :class:`DBusError <dbus_next.DBusError>` - If the service threw \
an error for the method call or returned an invalid result.
- :class:`Exception` - If a connection error occurred.
"""
main = GLib.MainLoop()
request_result = None
request_error = None
def reply_notify(result, err):
nonlocal request_result
nonlocal request_error
request_result = result
request_error = err
main.quit()
super().introspect(bus_name, path, reply_notify)
main.run()
if request_error:
raise request_error
return request_result
def request_name_sync(self, name: str, flags: NameFlag = NameFlag.NONE) -> RequestNameReply:
"""Request that this message bus owns the given name.
:param name: The name to request.
:type name: str
:param flags: Name flags that affect the behavior of the name request.
:type flags: :class:`NameFlag <dbus_next.NameFlag>`
:returns: The reply to the name request.
:rtype: :class:`RequestNameReply <dbus_next.RequestNameReply>`
:raises:
- :class:`InvalidBusNameError <dbus_next.InvalidBusNameError>` - If \
the given bus name is not valid.
- :class:`DBusError <dbus_next.DBusError>` - If the service threw \
an error for the method call or returned an invalid result.
- :class:`Exception` - If a connection error occurred.
"""
main = GLib.MainLoop()
request_result = None
request_error = None
def reply_notify(result, err):
nonlocal request_result
nonlocal request_error
request_result = result
request_error = err
main.quit()
super().request_name(name, flags, reply_notify)
main.run()
if request_error:
raise request_error
return request_result
def release_name_sync(self, name: str) -> ReleaseNameReply:
"""Request that this message bus release the given name.
:param name: The name to release.
:type name: str
:returns: The reply to the release request.
:rtype: :class:`ReleaseNameReply <dbus_next.ReleaseNameReply>`
:raises:
- :class:`InvalidBusNameError <dbus_next.InvalidBusNameError>` - If \
the given bus name is not valid.
- :class:`DBusError <dbus_next.DBusError>` - If the service threw \
an error for the method call or returned an invalid result.
- :class:`Exception` - If a connection error occurred.
"""
main = GLib.MainLoop()
release_result = None
release_error = None
def reply_notify(result, err):
nonlocal release_result
nonlocal release_error
release_result = result
release_error = err
main.quit()
super().release_name(name, reply_notify)
main.run()
if release_error:
raise release_error
return release_result
def send(self, msg: Message):
if not msg.serial:
msg.serial = self.next_serial()
self._buffered_messages.append(msg)
if self.unique_name:
self._schedule_write()
def get_proxy_object(self, bus_name: str, path: str, introspection: intr.Node) -> ProxyObject:
return super().get_proxy_object(bus_name, path, introspection)
def _schedule_write(self):
if self.writable_source is None or self.writable_source.is_destroyed():
self.writable_source = _MessageWritableSource(self)
self.writable_source.attach(self._main_context)
self.writable_source.add_unix_fd(self._fd, GLib.IO_OUT)
def _authenticate(self, authenticate_notify):
self._stream.write(b'\0')
first_line = self._auth._authentication_start()
if first_line is not None:
if type(first_line) is not str:
raise AuthError('authenticator gave response not type str')
self._stream.write(f'{first_line}\r\n'.encode())
self._stream.flush()
def line_notify(line):
try:
resp = self._auth._receive_line(line)
self._stream.write(Authenticator._format_line(resp))
self._stream.flush()
if resp == 'BEGIN':
self._readline_source = None
authenticate_notify(None)
return True
except Exception as e:
authenticate_notify(e)
return True
readline_source = _AuthLineSource(self._stream)
readline_source.set_callback(line_notify)
readline_source.add_unix_fd(self._fd, GLib.IO_IN)
readline_source.attach(self._main_context)
# make sure it doesnt get cleaned up
self._readline_source = readline_source
|