File: gssapi_stub.py

package info (click to toggle)
python-asyncssh 2.21.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,464 kB
  • sloc: python: 40,306; makefile: 11
file content (141 lines) | stat: -rw-r--r-- 3,682 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright (c) 2017-2019 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""Stub GSSAPI module for unit tests"""

from enum import IntEnum

from asyncssh.gss import GSSError

from .gss_stub import step


class Name:
    """Stub class for GSS principal name"""

    def __init__(self, base, _name_type=None):
        if 'init_error' in base:
            raise GSSError(99, 99)

        self.host = base[5:]


class Credentials:
    """Stub class for GSS credentials"""

    def __init__(self, name=None, usage=None, store=None):
        # pylint: disable=unused-argument

        self.host = name.host if name else ''
        self.server = usage == 'accept'

    @property
    def mechs(self):
        """Return GSS mechanisms available for this host"""

        if self.server:
            return [0] if 'unknown_mech' in self.host else [1, 2]
        else:
            return [2]


class RequirementFlag(IntEnum):
    """Stub class for GSS requirement flags"""

    # pylint: disable=invalid-name

    delegate_to_peer = 1
    mutual_authentication = 2
    integrity = 4


class SecurityContext:
    """Stub class for GSS security context"""

    def __init__(self, name=None, creds=None, flags=None):
        host = creds.host if creds.server else name.host

        if flags is None:
            flags = RequirementFlag.mutual_authentication | \
                    RequirementFlag.integrity

        if ((creds.server and 'no_server_integrity' in host) or
                (not creds.server and 'no_client_integrity' in host)):
            flags &= ~RequirementFlag.integrity

        self._host = host
        self._server = creds.server
        self._actual_flags = flags
        self._complete = False

    @property
    def complete(self):
        """Return whether or not GSS negotiation is complete"""

        return self._complete

    @property
    def actual_flags(self):
        """Return flags set on this context"""

        return self._actual_flags

    @property
    def initiator_name(self):
        """Return user principal associated with this context"""

        return 'user@TEST'

    @property
    def target_name(self):
        """Return host principal associated with this context"""

        return 'host@TEST'

    def step(self, token=None):
        """Perform next step in GSS security exchange"""

        token, complete = step(self._host, token)

        if complete:
            self._complete = True

        if token == b'error':
            raise GSSError(99, 99)
        elif token == b'errtok':
            raise GSSError(99, 99, token)
        else:
            return token

    def get_signature(self, _data):
        """Sign a block of data"""

        if 'sign_error' in self._host:
            raise GSSError(99, 99)

        return b'fail' if 'verify_error' in self._host else b''

    def verify_signature(self, _data, sig):
        """Verify a signature for a block of data"""

        # pylint: disable=no-self-use

        if sig == b'fail':
            raise GSSError(99, 99)