1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
#!/usr/bin/env python
import logging
from contextlib import closing
import datetime
from unittest.mock import Mock
from unittest.mock import patch
from pathutils import full_path
from pytest import raises
from saml2 import config
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.response import AuthnResponse
from saml2.response import StatusResponse
from saml2.response import response_factory
from saml2.server import Server
from saml2.sigver import SignatureError
FALSE_ASSERT_SIGNED = full_path("saml_false_signed.xml")
TIMESLACK = 60 * 5
def _eq(l1, l2):
return set(l1) == set(l2)
IDENTITY = {
"eduPersonAffiliation": ["staff", "member"],
"surName": ["Jeter"],
"givenName": ["Derek"],
"mail": ["foo@gmail.com"],
"title": ["shortstop"],
}
AUTHN = {"class_ref": INTERNETPROTOCOLPASSWORD, "authn_auth": "http://www.example.com/login"}
class TestResponse:
def setup_class(self):
with closing(Server("idp_conf")) as server:
name_id = server.ident.transient_nameid("urn:mace:example.com:saml:roland:sp", "id12")
self._resp_ = server.create_authn_response(
IDENTITY,
in_response_to="id12",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id=name_id,
)
self._sign_resp_ = server.create_authn_response(
IDENTITY,
in_response_to="id12",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id=name_id,
sign_assertion=True,
)
self._resp_authn = server.create_authn_response(
IDENTITY,
in_response_to="id12",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id=name_id,
authn=AUTHN,
)
self._resp_issuer_none = server.create_authn_response(
IDENTITY,
in_response_to="id12",
destination="http://lingon.catalogix.se:8087/",
sp_entity_id="urn:mace:example.com:saml:roland:sp",
name_id=name_id,
)
self._resp_issuer_none.issuer = None
conf = config.SPConfig()
conf.load_file("server_conf")
self.conf = conf
def test_1(self):
xml_response = f"{self._resp_}"
resp = response_factory(
xml_response,
self.conf,
return_addrs=["http://lingon.catalogix.se:8087/"],
outstanding_queries={"id12": "http://localhost:8088/sso"},
timeslack=TIMESLACK,
decode=False,
)
assert isinstance(resp, StatusResponse)
assert isinstance(resp, AuthnResponse)
def test_2(self):
xml_response = self._sign_resp_
resp = response_factory(
xml_response,
self.conf,
return_addrs=["http://lingon.catalogix.se:8087/"],
outstanding_queries={"id12": "http://localhost:8088/sso"},
timeslack=TIMESLACK,
decode=False,
)
assert isinstance(resp, StatusResponse)
assert isinstance(resp, AuthnResponse)
def test_issuer_none(self):
xml_response = f"{self._resp_issuer_none}"
resp = response_factory(
xml_response,
self.conf,
return_addrs=["http://lingon.catalogix.se:8087/"],
outstanding_queries={"id12": "http://localhost:8088/sso"},
timeslack=TIMESLACK,
decode=False,
)
assert isinstance(resp, StatusResponse)
assert isinstance(resp, AuthnResponse)
assert resp.issuer() == ""
@patch("saml2.time_util.datetime")
def test_false_sign(self, mock_datetime, caplog):
caplog.set_level(logging.ERROR)
mock_datetime.utcnow = Mock(return_value=datetime.datetime(2016, 9, 4, 9, 59, 39))
with open(FALSE_ASSERT_SIGNED) as fp:
xml_response = fp.read()
resp = response_factory(
xml_response,
self.conf,
return_addrs=["http://lingon.catalogix.se:8087/"],
outstanding_queries={
"bahigehogffohiphlfmplepdpcohkhhmheppcdie": "http://localhost:8088/sso",
},
timeslack=TIMESLACK,
decode=False,
)
assert isinstance(resp, StatusResponse)
assert isinstance(resp, AuthnResponse)
with raises(SignatureError):
resp.verify()
assert 'The signature on the assertion cannot be verified.' in caplog.text
def test_other_response(self):
with open(full_path("attribute_response.xml")) as fp:
xml_response = fp.read()
resp = response_factory(
xml_response,
self.conf,
return_addrs=["https://myreviewroom.com/saml2/acs/"],
outstanding_queries={"id-f4d370f3d03650f3ec0da694e2348bfe": "http://localhost:8088/sso"},
timeslack=TIMESLACK,
decode=False,
)
assert isinstance(resp, StatusResponse)
assert isinstance(resp, AuthnResponse)
resp.sec.only_use_keys_in_metadata = False
resp.parse_assertion()
si = resp.session_info()
assert si
print(si["ava"])
if __name__ == "__main__":
t = TestResponse()
t.setup_class()
t.test_false_sign()
|