1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
# Copyright (c) 2017-2022 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
# Georg Sauthoff - fix for "setup.py test" command on non-Windows
"""Stub SSPI module for unit tests"""
import sys
from .gss_stub import step
if sys.platform == 'win32': # pragma: no cover
from asyncssh.gss_win32 import ASC_RET_INTEGRITY, ISC_RET_INTEGRITY
from asyncssh.gss_win32 import SECPKG_ATTR_NATIVE_NAMES, SSPIError
class SSPIBuffer:
"""Stub class for SSPI buffer"""
def __init__(self, data):
self._data = data
@property
def Buffer(self): # pylint: disable=invalid-name
"""Return the data in the buffer"""
return self._data
class SSPIContext:
"""Stub class for SSPI security context"""
def QueryContextAttributes(self, attr): # pylint: disable=invalid-name
"""Return principal information associated with this context"""
# pylint: disable=no-self-use
if attr == SECPKG_ATTR_NATIVE_NAMES:
return ['user@TEST', 'host@TEST']
else: # pragma: no cover
return None
class SSPIAuth:
"""Stub class for SSPI authentication"""
def __init__(self, _package=None, spn=None, targetspn=None, scflags=None):
host = spn or targetspn
if 'init_error' in host:
raise SSPIError('Authentication initialization error')
if targetspn and 'no_client_integrity' in host:
scflags &= ~ISC_RET_INTEGRITY
elif spn and 'no_server_integrity' in host:
scflags &= ~ASC_RET_INTEGRITY
self._host = host[5:]
self._flags = scflags
self._ctxt = SSPIContext()
self._complete = False
self._error = False
@property
def authenticated(self):
"""Return whether authentication is complete"""
return self._complete
@property
def ctxt(self):
"""Return authentication context"""
return self._ctxt
@property
def ctxt_attr(self):
"""Return authentication flags"""
return self._flags
def reset(self):
"""Reset SSPI security context"""
self._complete = False
def authorize(self, token):
"""Perform next step in SSPI authentication"""
if self._error:
self._error = False
raise SSPIError('Token authentication error')
new_token, complete = step(self._host, token)
if complete:
self._complete = True
if new_token in (b'error', b'errtok'):
if token:
raise SSPIError('Token authentication error')
else:
self._error = True
return True, [SSPIBuffer(b'')]
else:
return bool(new_token), [SSPIBuffer(new_token)]
def sign(self, data):
"""Sign a block of data"""
# pylint: disable=no-self-use,unused-argument
if 'sign_error' in self._host:
raise SSPIError('Signing error')
return b'fail' if 'verify_error' in self._host else b''
def verify(self, data, sig):
"""Verify a signature for a block of data"""
# pylint: disable=no-self-use,unused-argument
if sig == b'fail':
raise SSPIError('Signature verification error')
|