1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
# -*- coding: utf-8 -*-
# Copyright: (c) 2020, Jordan Borean (@jborean93) <jborean93@gmail.com>
# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
import collections
import re
import typing
import pytest
import spnego
import spnego._gss
import spnego.iov
from spnego.exceptions import InvalidCredentialError, NoContextError
def test_gss_sasl_description_fail(mocker, monkeypatch):
gssapi = pytest.importorskip("gssapi")
SASLResult = collections.namedtuple("SASLResult", ["mech_description"])
mock_inquire_sasl = mocker.MagicMock()
mock_inquire_sasl.side_effect = [Exception, SASLResult(b"result")]
monkeypatch.setattr(gssapi.raw, "inquire_saslname_for_mech", mock_inquire_sasl)
actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3"))
assert actual is None
actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3"))
assert actual is None
actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3.4"))
assert actual == b"result"
actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3.4"))
assert actual == b"result"
assert mock_inquire_sasl.call_count == 2
def test_build_iov_list(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
actual = c._build_iov_list(
[
(spnego.iov.BufferType.header, b"\x01"),
(spnego.iov.BufferType.data, 1),
(spnego.iov.BufferType.padding, True),
spnego.iov.BufferType.header,
spnego.iov.BufferType.stream,
b"\x02",
],
c._convert_iov_buffer,
)
assert len(actual) == 6
assert actual[0] == (spnego.iov.BufferType.header, False, b"\x01")
assert actual[1] == (spnego.iov.BufferType.data, False, b"\x00")
assert actual[2] == (spnego.iov.BufferType.padding, True, None)
assert actual[3] == (spnego.iov.BufferType.header, True, None)
assert actual[4] == (spnego.iov.BufferType.stream, False, None)
assert actual[5] == (spnego.iov.BufferType.data, False, b"\x02")
def test_gssapi_query_message_sizes_fail(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot get message sizes until context has been established"):
c.query_message_sizes()
def test_gssapi_wrap_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot wrap until context has been established"):
c.wrap(b"data")
def test_gssapi_wrap_iov_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot wrap until context has been established"):
c.wrap_iov([])
def test_gssapi_wrap_winrm_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot wrap until context has been established"):
c.wrap_winrm(b"data")
def test_gssapi_unwrap_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot unwrap until context has been established"):
c.unwrap(b"data")
def test_gssapi_unwrap_iov_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot unwrap until context has been established"):
c.unwrap_iov([])
def test_gssapi_unwrap_winrm_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot unwrap until context has been established"):
c.unwrap_winrm(b"header", b"data")
def test_gssapi_sign_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot sign until context has been established"):
c.sign(b"data")
def test_gssapi_verify_no_context(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
with pytest.raises(NoContextError, match="Cannot verify until context has been established"):
c.verify(b"data", b"mic")
def test_build_iov_list_invalid_tuple(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
expected = "IOV entry tuple must contain 2 values, the type and data, see IOVBuffer."
with pytest.raises(ValueError, match=expected):
c._build_iov_list([(1, 2, 3)], c._convert_iov_buffer) # type: ignore[list-item] # we are testing this
def test_build_iov_list_invalid_buffer_type(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
expected = "IOV entry[0] must specify the BufferType as an int"
with pytest.raises(ValueError, match=re.escape(expected)):
c._build_iov_list([(b"", b"")], c._convert_iov_buffer) # type: ignore[list-item] # we are testing this
def test_build_iov_list_invalid_data(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
expected = "IOV entry[1] must specify the buffer bytes, length of the buffer, or whether it is auto allocated."
with pytest.raises(ValueError, match=re.escape(expected)):
c._build_iov_list([(1, "data")], c._convert_iov_buffer) # type: ignore[list-item] # we are testing this
def test_build_iov_list_invalid_value(kerb_cred):
c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
expected = "IOV entry must be a IOVBuffer tuple, int, or bytes"
with pytest.raises(ValueError, match=re.escape(expected)):
c._build_iov_list([None], c._convert_iov_buffer) # type: ignore[list-item] # we are testing this
def test_no_gssapi_library(monkeypatch):
monkeypatch.setattr(spnego._gss, "HAS_GSSAPI", False)
with pytest.raises(ImportError, match="GSSAPIProxy requires the Python gssapi library"):
spnego._gss.GSSAPIProxy()
@pytest.mark.skipif(not spnego._gss.HAS_GSSAPI, reason="Requires the gssapi library to be installed for testing")
def test_gssapi_no_valid_acceptor_cred():
server_creds: typing.List[typing.Tuple[spnego.Credential, str]] = [
(spnego.KerberosCCache("ccache"), "kerberos"),
(spnego.KerberosKeytab("user_princ", "ccache"), "kerberos"),
(spnego.NTLMHash("user_princ"), "ntlm"),
]
for cred, protocol in server_creds:
if protocol not in spnego._gss.GSSAPIProxy.available_protocols():
continue
with pytest.raises(InvalidCredentialError, match="No applicable credentials available"):
spnego._gss.GSSAPIProxy(cred, protocol=protocol, usage="accept")
|