1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
# Copyright (c) 2017-2019 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""Stub GSSAPI module for unit tests"""
from enum import IntEnum
from asyncssh.gss import GSSError
from .gss_stub import step
class Name:
"""Stub class for GSS principal name"""
def __init__(self, base, _name_type=None):
if 'init_error' in base:
raise GSSError(99, 99)
self.host = base[5:]
class Credentials:
"""Stub class for GSS credentials"""
def __init__(self, name=None, usage=None, store=None):
# pylint: disable=unused-argument
self.host = name.host if name else ''
self.server = usage == 'accept'
@property
def mechs(self):
"""Return GSS mechanisms available for this host"""
if self.server:
return [0] if 'unknown_mech' in self.host else [1, 2]
else:
return [2]
class RequirementFlag(IntEnum):
"""Stub class for GSS requirement flags"""
# pylint: disable=invalid-name
delegate_to_peer = 1
mutual_authentication = 2
integrity = 4
class SecurityContext:
"""Stub class for GSS security context"""
def __init__(self, name=None, creds=None, flags=None):
host = creds.host if creds.server else name.host
if flags is None:
flags = RequirementFlag.mutual_authentication | \
RequirementFlag.integrity
if ((creds.server and 'no_server_integrity' in host) or
(not creds.server and 'no_client_integrity' in host)):
flags &= ~RequirementFlag.integrity
self._host = host
self._server = creds.server
self._actual_flags = flags
self._complete = False
@property
def complete(self):
"""Return whether or not GSS negotiation is complete"""
return self._complete
@property
def actual_flags(self):
"""Return flags set on this context"""
return self._actual_flags
@property
def initiator_name(self):
"""Return user principal associated with this context"""
return 'user@TEST'
@property
def target_name(self):
"""Return host principal associated with this context"""
return 'host@TEST'
def step(self, token=None):
"""Perform next step in GSS security exchange"""
token, complete = step(self._host, token)
if complete:
self._complete = True
if token == b'error':
raise GSSError(99, 99)
elif token == b'errtok':
raise GSSError(99, 99, token)
else:
return token
def get_signature(self, _data):
"""Sign a block of data"""
if 'sign_error' in self._host:
raise GSSError(99, 99)
return b'fail' if 'verify_error' in self._host else b''
def verify_signature(self, _data, sig):
"""Verify a signature for a block of data"""
# pylint: disable=no-self-use
if sig == b'fail':
raise GSSError(99, 99)
|