File: dovecot_auth.py

package info (click to toggle)
radicale-dovecot-auth 0.4.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 116 kB
  • sloc: python: 141; makefile: 3
file content (160 lines) | stat: -rw-r--r-- 4,802 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Dovecot authentication plugin for Radicale.
# Copyright (C) 2017-2019 Arvedui <arvedui@posteo.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from base64 import b64encode
import os
import socket

HANDSHAKE = "VERSION\t1\t1\nCPID\t{}\n"
SUPPORTED_MAJOR_VERSION = 1

AUTH_COMMAND = "AUTH\t{id}\tPLAIN\tservice={service}\tresp="


class DovecotAuthException(Exception):
    """DovecotAuth base Exception"""


class UnsupportedVersion(DovecotAuthException):
    """Thrown if the protocol version of the auth server ist not supported"""


class HandshakeFailed(DovecotAuthException):
    """Thrown if the Handshake with the auth server fails"""


class UnexpectedData(DovecotAuthException):
    """
    Thrown if there is still data the recieve buffer that was not expected
    """


class DovecotAuth:
    """
    DovecotAuth provides authentication against a Dovecot authentication
    service using the PLAIN mechanism.

    Only version 1.1 as described in the `Dovecot Wiki`_

    .. _Dovecot Wiki: https://wiki2.dovecot.org/Design/AuthProtocol

    :param service: Name of the service authentication services are provided for
    :param socket_path: Path to the unix domain socket of the auth server
    :param host: hostname of the auth server
    :param port: port of the auth server
    """

    def __init__(self, service, *,
                 socket_path=None, host=None, port=None, ssl=False):
        self.socket_path = socket_path
        self.buffer = bytes()
        self.authid = 1
        self.service = service

        if socket_path:
            self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self.socket.connect(self.socket_path)

        elif host and port:
            self.socket = socket.create_connection((host, port))

        else:
            raise RuntimeError('auth_socket path or auth_host and auth_port must be set')

        self._handshake()

    def buffer_is_empty(self):
        return len(self.buffer) == 0

    def _readline(self):
        """Read one line from the socket using a receive buffer"""

        nextlineend = self.buffer.find(b'\n')
        while nextlineend == -1:
            self.buffer += self.socket.recv(4096)

            nextlineend = self.buffer.find(b'\n')

        nextline = self.buffer[:nextlineend]
        self.buffer = self.buffer[nextlineend+1:]
        return nextline.split(b'\t')

    def _send(self, msg):
        """
        Send data via the socket

        :param msg: message to send
        """

        send_bytes = 0

        while send_bytes != len(msg):
            send_bytes += self.socket.send(msg[send_bytes:])

    def _handshake(self):
        """
        Perform handshake according to dovecot auth protocol
        """

        done = False
        plain = False

        self._send(HANDSHAKE.format(os.getpid()).encode('utf8'))
        while not done:
            command, *arguments = self._readline()

            if command == b"VERSION":
                if int(arguments[0]) != SUPPORTED_MAJOR_VERSION:
                    raise UnsupportedVersion

            elif command == b'MECH':
                if arguments[0] == b'PLAIN':
                    plain = True

            elif command == b'DONE':
                done = True

        if not plain:
            raise HandshakeFailed(
                    'auth mechanism PLAIN is not supported by dovecot')

    def authenticate(self, username, password):
        """Authenticate given credentials"""
        done = False

        credentials = "\0{username}\0{password}".format(
                username=username, password=password)
        credentials = b64encode(credentials.encode('utf8'))

        command = AUTH_COMMAND.format(id=self.authid, service=self.service)
        command = command.encode('ascii')
        command += credentials
        command += b'\n'

        self.authid += 1

        self._send(command)

        command, *arguments = self._readline()

        if not self.buffer_is_empty:
            raise UnexpectedData(
                    "Server has sent data that was not expected: {}".format(
                        self.buffer))

        if command == b'OK':
            return True
        return False