1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
from urllib.parse import parse_qs
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_SOAP
from saml2 import pack
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.samlp import attribute_query_from_string
from saml2.samlp import logout_request_from_string
from saml2.server import Server
from saml2.soap import make_soap_enveloped_saml_thingy
from saml2.soap import parse_soap_enveloped_saml_attribute_query
from saml2.soap import parse_soap_enveloped_saml_logout_request
__author__ = "rolandh"
TYP = {"GET": [BINDING_HTTP_REDIRECT], "POST": [BINDING_HTTP_POST, BINDING_SOAP]}
AUTHN = {"class_ref": INTERNETPROTOCOLPASSWORD, "authn_auth": "http://www.example.com/login"}
def unpack_form(_str, ver="SAMLRequest"):
SR_STR = f'name="{ver}" value="'
RS_STR = 'name="RelayState" value="'
i = _str.find(SR_STR)
i += len(SR_STR)
j = _str.find('"', i)
sr = _str[i:j]
k = _str.find(RS_STR, j)
k += len(RS_STR)
l = _str.find('"', k)
rs = _str[k:l]
return {ver: sr, "RelayState": rs}
class DummyResponse:
def __init__(self, status, data, headers=None):
self.status_code = status
self.text = data
self.headers = headers or []
self.content = data
class FakeIDP(Server):
def __init__(self, config_file=""):
Server.__init__(self, config_file)
# self.sign = False
def receive(self, url, method="GET", **kwargs):
"""
Interface to receive HTTP calls on
:param url:
:param method:
:param kwargs:
:return:
"""
if method == "GET":
path, query = url.split("?")
qs_dict = parse_qs(kwargs["data"])
req = qs_dict["SAMLRequest"][0]
rstate = qs_dict["RelayState"][0]
else:
# Could be either POST or SOAP
path = url
try:
qs_dict = parse_qs(kwargs["data"])
req = qs_dict["SAMLRequest"][0]
rstate = qs_dict["RelayState"][0]
except KeyError:
req = kwargs["data"]
rstate = ""
response = ""
# Get service from path
for key, vals in self.config.getattr("endpoints", "idp").items():
for endp, binding in vals:
if path == endp:
assert binding in TYP[method]
if key == "single_sign_on_service":
return self.authn_request_endpoint(req, binding, rstate)
elif key == "single_logout_service":
return self.logout_endpoint(req, binding)
for key, vals in self.config.getattr("endpoints", "aa").items():
for endp, binding in vals:
if path == endp:
assert binding in TYP[method]
if key == "attribute_service":
return self.attribute_query_endpoint(req, binding)
return response
def authn_request_endpoint(self, req, binding, relay_state):
req = self.parse_authn_request(req, binding)
if req.message.protocol_binding == BINDING_HTTP_REDIRECT:
_binding = BINDING_HTTP_POST
else:
_binding = req.message.protocol_binding
try:
resp_args = self.response_args(req.message, [_binding])
except Exception:
raise
identity = {"surName": "Hedberg", "givenName": "Roland", "title": "supertramp", "mail": "roland@example.com"}
userid = "Pavill"
authn_resp = self.create_authn_response(identity, userid=userid, authn=AUTHN, **resp_args)
response = f"{authn_resp}"
_dict = pack.factory(_binding, response, resp_args["destination"], relay_state, "SAMLResponse")
return DummyResponse(**_dict)
def attribute_query_endpoint(self, xml_str, binding):
if binding == BINDING_SOAP:
_str = parse_soap_enveloped_saml_attribute_query(xml_str)
else:
_str = xml_str
aquery = attribute_query_from_string(_str)
extra = {"eduPersonAffiliation": "faculty"}
# userid = "Pavill"
name_id = aquery.subject.name_id
attr_resp = self.create_attribute_response(
extra, aquery.id, None, sp_entity_id=aquery.issuer.text, name_id=name_id, attributes=aquery.attribute
)
if binding == BINDING_SOAP:
# SOAP packing
# headers = {"content-type": "application/soap+xml"}
soap_message = make_soap_enveloped_saml_thingy(attr_resp)
# if self.sign and self.sec:
# _signed = self.sec.sign_statement_using_xmlsec(soap_message,
# class_name(attr_resp),
# nodeid=attr_resp.id)
# soap_message = _signed
response = f"{soap_message}"
else: # Just POST
response = f"{attr_resp}"
return DummyResponse(status=200, data=response)
def logout_endpoint(self, xml_str, binding):
if binding == BINDING_SOAP:
_str = parse_soap_enveloped_saml_logout_request(xml_str)
else:
_str = xml_str
req = logout_request_from_string(_str)
_resp = self.create_logout_response(req, [binding])
if binding == BINDING_SOAP:
# SOAP packing
# headers = {"content-type": "application/soap+xml"}
soap_message = make_soap_enveloped_saml_thingy(_resp)
# if self.sign and self.sec:
# _signed = self.sec.sign_statement_using_xmlsec(soap_message,
# class_name(attr_resp),
# nodeid=attr_resp.id)
# soap_message = _signed
response = f"{soap_message}"
else: # Just POST
response = f"{_resp}"
return DummyResponse(status=200, data=response)
|