File: auth_keys.py

package info (click to toggle)
python-asyncssh 2.21.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,464 kB
  • sloc: python: 40,306; makefile: 11
file content (350 lines) | stat: -rw-r--r-- 11,830 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# Copyright (c) 2015-2024 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""Parser for SSH authorized_keys files"""

from typing import Dict, List, Mapping, Optional, Sequence
from typing import Set, Tuple, Union, cast

try:
    # pylint: disable=unused-import
    from .crypto import X509Name, X509NamePattern
    _x509_available = True
except ImportError: # pragma: no cover
    _x509_available = False

from .misc import ip_address, read_file
from .pattern import HostPatternList, WildcardPatternList
from .public_key import KeyImportError, SSHKey
from .public_key import SSHX509Certificate, SSHX509CertificateChain
from .public_key import import_public_key, import_certificate
from .public_key import import_certificate_subject


_EntryOptions = Mapping[str, object]

class _SSHAuthorizedKeyEntry:
    """An entry in an SSH authorized_keys list"""

    def __init__(self, line: str):
        self.key: Optional[SSHKey] = None
        self.cert: Optional[SSHX509Certificate] = None
        self.options: Dict[str, object] = {}

        try:
            self._import_key_or_cert(line)
            return
        except KeyImportError:
            pass

        line = self._parse_options(line)
        self._import_key_or_cert(line)

    def _import_key_or_cert(self, line: str) -> None:
        """Import key or certificate in this entry"""

        try:
            self.key = import_public_key(line)
            return
        except KeyImportError:
            pass

        try:
            self.cert = cast(SSHX509Certificate, import_certificate(line))

            if ('cert-authority' in self.options and
                    self.cert.subject != self.cert.issuer):
                raise ValueError('X.509 cert-authority entries must '
                                 'contain a root CA certificate')

            return
        except KeyImportError:
            pass

        if 'cert-authority' not in self.options:
            try:
                self.key = None
                self.cert = None
                self._add_subject('subject', import_certificate_subject(line))
                return
            except KeyImportError:
                pass

        raise KeyImportError('Unrecognized key, certificate, or subject')

    def _set_string(self, option: str, value: str) -> None:
        """Set an option with a string value"""

        self.options[option] = value

    def _add_environment(self, option: str, value: str) -> None:
        """Add an environment key/value pair"""

        if value.startswith('=') or '=' not in value:
            raise ValueError('Invalid environment entry in authorized_keys')

        name, value = value.split('=', 1)
        cast(Dict[str, str], self.options.setdefault(option, {}))[name] = value

    def _add_from(self, option: str, value: str) -> None:
        """Add a from host pattern"""

        from_patterns = cast(List[HostPatternList],
                             self.options.setdefault(option, []))
        from_patterns.append(HostPatternList(value))

    def _add_permitopen(self, option: str, value: str) -> None:
        """Add a permitopen host/port pair"""

        try:
            host, port_str = value.rsplit(':', 1)

            if host.startswith('[') and host.endswith(']'):
                host = host[1:-1]

            port = None if port_str == '*' else int(port_str)
        except ValueError:
            raise ValueError(f'Illegal permitopen value: {value}') from None

        permitted_opens = cast(Set[Tuple[str, Optional[int]]],
                               self.options.setdefault(option, set()))
        permitted_opens.add((host, port))

    def _add_principals(self, option: str, value: str) -> None:
        """Add a principals wildcard pattern list"""

        principal_patterns = cast(List[WildcardPatternList],
                                  self.options.setdefault(option, []))
        principal_patterns.append(WildcardPatternList(value))

    def _add_subject(self, option: str, value: str) -> None:
        """Add an X.509 subject pattern"""

        if _x509_available: # pragma: no branch
            subject_patterns = cast(List[X509NamePattern],
                                    self.options.setdefault(option, []))
            subject_patterns.append(X509NamePattern(value))

    _handlers = {
        'command':     _set_string,
        'environment': _add_environment,
        'from':        _add_from,
        'permitopen':  _add_permitopen,
        'principals':  _add_principals,
        'subject':     _add_subject
    }

    def _add_option(self) -> None:
        """Add an option value"""

        if self._option.startswith('='):
            raise ValueError('Missing option name in authorized_keys')

        if '=' in self._option:
            option, value = self._option.split('=', 1)

            handler = self._handlers.get(option)
            if handler:
                handler(self, option, value)
            else:
                values = cast(List[str], self.options.setdefault(option, []))
                values.append(value)
        else:
            self.options[self._option] = True

    def _parse_options(self, line: str) -> str:
        """Parse options in this entry"""

        self._option = ''

        idx = 0
        quoted = False
        escaped = False

        for idx, ch in enumerate(line):
            if escaped:
                self._option += ch
                escaped = False
            elif ch == '\\':
                escaped = True
            elif ch == '"':
                quoted = not quoted
            elif quoted:
                self._option += ch
            elif ch in ' \t':
                break
            elif ch == ',':
                self._add_option()
                self._option = ''
            else:
                self._option += ch

        self._add_option()

        if quoted:
            raise ValueError('Unbalanced quote in authorized_keys')
        elif escaped:
            raise ValueError('Unbalanced backslash in authorized_keys')

        return line[idx:].strip()

    def match_options(self, client_host: str, client_addr: str,
                      cert_principals: Optional[Sequence[str]],
                      cert_subject: Optional['X509Name'] = None) -> bool:
        """Match "from", "principals" and "subject" options in entry"""

        from_patterns = cast(List[HostPatternList], self.options.get('from'))

        if from_patterns:
            client_ip = ip_address(client_addr)

            if not all(pattern.matches(client_host, client_addr, client_ip)
                       for pattern in from_patterns):
                return False

        principal_patterns = cast(List[WildcardPatternList],
                                  self.options.get('principals'))

        if cert_principals is not None and principal_patterns is not None:
            if not all(any(pattern.matches(principal)
                           for principal in cert_principals)
                       for pattern in principal_patterns):
                return False

        subject_patterns = cast(List['X509NamePattern'],
                                self.options.get('subject'))

        if cert_subject is not None and subject_patterns is not None:
            if not all(pattern.matches(cert_subject)
                       for pattern in subject_patterns):
                return False

        return True


class SSHAuthorizedKeys:
    """An SSH authorized keys list"""

    def __init__(self, authorized_keys: Optional[str] = None):
        self._user_entries: List[_SSHAuthorizedKeyEntry] = []
        self._ca_entries: List[_SSHAuthorizedKeyEntry] = []
        self._x509_entries: List[_SSHAuthorizedKeyEntry] = []

        if authorized_keys:
            self.load(authorized_keys)

    def load(self, authorized_keys: str) -> None:
        """Load authorized keys data into this object"""

        for line in authorized_keys.splitlines():
            line = line.strip()
            if not line or line.startswith('#'):
                continue

            try:
                entry = _SSHAuthorizedKeyEntry(line)
            except KeyImportError:
                continue

            if entry.key:
                if 'cert-authority' in entry.options:
                    self._ca_entries.append(entry)
                else:
                    self._user_entries.append(entry)
            else:
                self._x509_entries.append(entry)

        if (not self._user_entries and not self._ca_entries and
                not self._x509_entries):
            raise ValueError('No valid entries found')

    def validate(self, key: SSHKey, client_host: str, client_addr: str,
                 cert_principals: Optional[Sequence[str]] = None,
                 ca: bool = False) -> Optional[Mapping[str, object]]:
        """Return whether a public key or CA is valid for authentication"""

        for entry in self._ca_entries if ca else self._user_entries:
            if (entry.key == key and
                    entry.match_options(client_host, client_addr,
                                        cert_principals)):
                return entry.options

        return None

    def validate_x509(self, cert: SSHX509CertificateChain, client_host: str,
                      client_addr: str) -> Tuple[Optional[_EntryOptions],
                                                 Optional[SSHX509Certificate]]:
        """Return whether an X.509 certificate is valid for authentication"""

        for entry in self._x509_entries:
            if (entry.cert and 'cert-authority' not in entry.options and
                    (cert.key != entry.cert.key or
                     cert.subject != entry.cert.subject)):
                continue # pragma: no cover (work around bug in coverage tool)

            if entry.match_options(client_host, client_addr,
                                   cert.user_principals, cert.subject):
                return entry.options, entry.cert

        return None, None

def import_authorized_keys(data: str) -> SSHAuthorizedKeys:
    """Import SSH authorized keys

       This function imports public keys and associated options in
       OpenSSH authorized keys format.

       :param data:
           The key data to import.
       :type data: `str`

       :returns: An :class:`SSHAuthorizedKeys` object

    """

    return SSHAuthorizedKeys(data)


def read_authorized_keys(filelist: Union[str, Sequence[str]]) -> \
        SSHAuthorizedKeys:
    """Read SSH authorized keys from a file or list of files

       This function reads public keys and associated options in
       OpenSSH authorized_keys format from a file or list of files.

       :param filelist:
           The file or list of files to read the keys from.
       :type filenlist: `str` or `list` of `str`

       :returns: An :class:`SSHAuthorizedKeys` object

    """

    authorized_keys = SSHAuthorizedKeys()

    if isinstance(filelist, str):
        files: Sequence[str] = [filelist]
    else:
        files = filelist

    for filename in files:
        authorized_keys.load(read_file(filename, 'r'))

    return authorized_keys