1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
from .errors import AuthError
import enum
import os
# The auth interface here is unstable. I would like to eventually open this up
# for people to define their own custom authentication protocols, but I'm not
# familiar with what's needed for that exactly. To work with any message bus
# implementation would require abstracting out all the IO. Async operations
# might be challenging because different IO backends have different ways of
# doing that. I might just end up giving the raw socket and leaving it all up
# to the user, but it would be nice to have a little guidance in the interface
# since a lot of it is strongly specified. If you have a need for this, contact
# the project maintainer to help stabalize this interface.
class _AuthResponse(enum.Enum):
OK = 'OK'
REJECTED = 'REJECTED'
DATA = 'DATA'
ERROR = 'ERROR'
AGREE_UNIX_FD = 'AGREE_UNIX_FD'
@classmethod
def parse(klass, line):
args = line.split(' ')
response = klass(args[0])
return response, args[1:]
# UNSTABLE
class Authenticator:
"""The base class for authenticators for :class:`MessageBus <dbus_next.message_bus.BaseMessageBus>` authentication.
In the future, the library may allow extending this class for custom authentication protocols.
:seealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
"""
def _authentication_start(self, negotiate_unix_fd=False):
raise NotImplementedError(
'authentication_start() must be implemented in the inheriting class')
def _receive_line(self, line):
raise NotImplementedError('receive_line() must be implemented in the inheriting class')
@staticmethod
def _format_line(line):
return f'{line}\r\n'.encode()
class AuthExternal(Authenticator):
"""An authenticator class for the external auth protocol for use with the
:class:`MessageBus <dbus_next.message_bus.BaseMessageBus>`.
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
"""
def __init__(self):
self.negotiate_unix_fd = False
self.negotiating_fds = False
def _authentication_start(self, negotiate_unix_fd=False) -> str:
self.negotiate_unix_fd = negotiate_unix_fd
hex_uid = str(os.getuid()).encode().hex()
return f'AUTH EXTERNAL {hex_uid}'
def _receive_line(self, line: str):
response, args = _AuthResponse.parse(line)
if response is _AuthResponse.OK:
if self.negotiate_unix_fd:
self.negotiating_fds = True
return "NEGOTIATE_UNIX_FD"
else:
return "BEGIN"
if response is _AuthResponse.AGREE_UNIX_FD:
return "BEGIN"
raise AuthError(f'authentication failed: {response.value}: {args}')
class AuthAnnonymous(Authenticator):
"""An authenticator class for the annonymous auth protocol for use with the
:class:`MessageBus <dbus_next.message_bus.BaseMessageBus>`.
:sealso: https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol
"""
def _authentication_start(self, negotiate_unix_fd=False) -> str:
if negotiate_unix_fd:
raise AuthError(
'annonymous authentication does not support negotiating unix fds right now')
return 'AUTH ANONYMOUS'
def _receive_line(self, line: str) -> str:
response, args = _AuthResponse.parse(line)
if response != _AuthResponse.OK:
raise AuthError(f'authentication failed: {response.value}: {args}')
return 'BEGIN'
|