File: test_gss.py

package info (click to toggle)
python-pyspnego 0.10.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,648 kB
  • sloc: python: 16,191; sh: 182; makefile: 11
file content (177 lines) | stat: -rw-r--r-- 6,881 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# -*- coding: utf-8 -*-
# Copyright: (c) 2020, Jordan Borean (@jborean93) <jborean93@gmail.com>
# MIT License (see LICENSE or https://opensource.org/licenses/MIT)

import collections
import re
import typing

import pytest

import spnego
import spnego._gss
import spnego.iov
from spnego.exceptions import InvalidCredentialError, NoContextError


def test_gss_sasl_description_fail(mocker, monkeypatch):
    gssapi = pytest.importorskip("gssapi")
    SASLResult = collections.namedtuple("SASLResult", ["mech_description"])

    mock_inquire_sasl = mocker.MagicMock()
    mock_inquire_sasl.side_effect = [Exception, SASLResult(b"result")]
    monkeypatch.setattr(gssapi.raw, "inquire_saslname_for_mech", mock_inquire_sasl)

    actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3"))
    assert actual is None

    actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3"))
    assert actual is None

    actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3.4"))
    assert actual == b"result"

    actual = spnego._gss._gss_sasl_description(gssapi.OID.from_int_seq("1.2.3.4"))
    assert actual == b"result"

    assert mock_inquire_sasl.call_count == 2


def test_build_iov_list(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")
    actual = c._build_iov_list(
        [
            (spnego.iov.BufferType.header, b"\x01"),
            (spnego.iov.BufferType.data, 1),
            (spnego.iov.BufferType.padding, True),
            spnego.iov.BufferType.header,
            spnego.iov.BufferType.stream,
            b"\x02",
        ],
        c._convert_iov_buffer,
    )

    assert len(actual) == 6
    assert actual[0] == (spnego.iov.BufferType.header, False, b"\x01")
    assert actual[1] == (spnego.iov.BufferType.data, False, b"\x00")
    assert actual[2] == (spnego.iov.BufferType.padding, True, None)
    assert actual[3] == (spnego.iov.BufferType.header, True, None)
    assert actual[4] == (spnego.iov.BufferType.stream, False, None)
    assert actual[5] == (spnego.iov.BufferType.data, False, b"\x02")


def test_gssapi_query_message_sizes_fail(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot get message sizes until context has been established"):
        c.query_message_sizes()


def test_gssapi_wrap_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot wrap until context has been established"):
        c.wrap(b"data")


def test_gssapi_wrap_iov_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot wrap until context has been established"):
        c.wrap_iov([])


def test_gssapi_wrap_winrm_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot wrap until context has been established"):
        c.wrap_winrm(b"data")


def test_gssapi_unwrap_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot unwrap until context has been established"):
        c.unwrap(b"data")


def test_gssapi_unwrap_iov_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot unwrap until context has been established"):
        c.unwrap_iov([])


def test_gssapi_unwrap_winrm_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot unwrap until context has been established"):
        c.unwrap_winrm(b"header", b"data")


def test_gssapi_sign_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot sign until context has been established"):
        c.sign(b"data")


def test_gssapi_verify_no_context(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    with pytest.raises(NoContextError, match="Cannot verify until context has been established"):
        c.verify(b"data", b"mic")


def test_build_iov_list_invalid_tuple(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    expected = "IOV entry tuple must contain 2 values, the type and data, see IOVBuffer."
    with pytest.raises(ValueError, match=expected):
        c._build_iov_list([(1, 2, 3)], c._convert_iov_buffer)  # type: ignore[list-item] # we are testing this


def test_build_iov_list_invalid_buffer_type(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    expected = "IOV entry[0] must specify the BufferType as an int"
    with pytest.raises(ValueError, match=re.escape(expected)):
        c._build_iov_list([(b"", b"")], c._convert_iov_buffer)  # type: ignore[list-item] # we are testing this


def test_build_iov_list_invalid_data(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    expected = "IOV entry[1] must specify the buffer bytes, length of the buffer, or whether it is auto allocated."
    with pytest.raises(ValueError, match=re.escape(expected)):
        c._build_iov_list([(1, "data")], c._convert_iov_buffer)  # type: ignore[list-item] # we are testing this


def test_build_iov_list_invalid_value(kerb_cred):
    c = spnego._gss.GSSAPIProxy(kerb_cred.user_princ, protocol="kerberos")

    expected = "IOV entry must be a IOVBuffer tuple, int, or bytes"
    with pytest.raises(ValueError, match=re.escape(expected)):
        c._build_iov_list([None], c._convert_iov_buffer)  # type: ignore[list-item] # we are testing this


def test_no_gssapi_library(monkeypatch):
    monkeypatch.setattr(spnego._gss, "HAS_GSSAPI", False)

    with pytest.raises(ImportError, match="GSSAPIProxy requires the Python gssapi library"):
        spnego._gss.GSSAPIProxy()


@pytest.mark.skipif(not spnego._gss.HAS_GSSAPI, reason="Requires the gssapi library to be installed for testing")
def test_gssapi_no_valid_acceptor_cred():
    server_creds: typing.List[typing.Tuple[spnego.Credential, str]] = [
        (spnego.KerberosCCache("ccache"), "kerberos"),
        (spnego.KerberosKeytab("user_princ", "ccache"), "kerberos"),
        (spnego.NTLMHash("user_princ"), "ntlm"),
    ]
    for cred, protocol in server_creds:
        if protocol not in spnego._gss.GSSAPIProxy.available_protocols():
            continue

        with pytest.raises(InvalidCredentialError, match="No applicable credentials available"):
            spnego._gss.GSSAPIProxy(cred, protocol=protocol, usage="accept")