1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
|
# Copyright (C) 2012 Yaco Sistemas (http://www.yaco.es)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import logging
import re
import urllib
import zlib
from functools import lru_cache, wraps
from typing import Optional
from importlib.metadata import version, PackageNotFoundError
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import resolve_url
from django.urls import NoReverseMatch
from django.utils.http import url_has_allowed_host_and_scheme
from django.utils.module_loading import import_string
from saml2.config import SPConfig
from saml2.mdstore import MetaDataMDX
from saml2.s_utils import UnknownSystemEntity
logger = logging.getLogger(__name__)
def get_custom_setting(name: str, default=None):
return getattr(settings, name, default)
def available_idps(config: SPConfig, langpref=None, idp_to_check: str = None) -> dict:
if langpref is None:
langpref = "en"
idps = set()
for metadata in config.metadata.metadata.values():
# initiate a fetch to the selected idp when using MDQ, otherwise the MetaDataMDX is an empty database
if isinstance(metadata, MetaDataMDX) and idp_to_check:
m = metadata[idp_to_check] # noqa: F841
result = metadata.any("idpsso_descriptor", "single_sign_on_service")
if result:
idps.update(result.keys())
return {idp: config.metadata.name(idp, langpref) for idp in idps}
def get_idp_sso_supported_bindings(
idp_entity_id: Optional[str] = None, config: Optional[SPConfig] = None
) -> list:
"""Returns the list of bindings supported by an IDP
This is not clear in the pysaml2 code, so wrapping it in a util"""
if config is None:
# avoid circular import
from .conf import get_config
config = get_config()
# load metadata store from config
meta = getattr(config, "metadata", {})
# if idp is None, assume only one exists so just use that
if idp_entity_id is None:
try:
idp_entity_id = list(available_idps(config).keys())[0]
except IndexError:
raise ImproperlyConfigured("No IdP configured!")
try:
return list(
meta.service(
idp_entity_id, "idpsso_descriptor", "single_sign_on_service"
).keys()
)
except UnknownSystemEntity:
raise UnknownSystemEntity
except Exception as e:
logger.exception(f"get_idp_sso_supported_bindings failed with: {e}")
def get_location(http_info):
"""Extract the redirect URL from a pysaml2 http_info object"""
try:
headers = dict(http_info["headers"])
return headers["Location"]
except KeyError:
return http_info["url"]
def get_fallback_login_redirect_url():
login_redirect_url = get_custom_setting("LOGIN_REDIRECT_URL", "/")
return resolve_url(
get_custom_setting("ACS_DEFAULT_REDIRECT_URL", login_redirect_url)
)
def validate_referral_url(request, url):
# Ensure the url is even a valid URL; sometimes the given url is a
# RelayState containing PySAML data.
# Some technically-valid urls will be fail this check, so the
# SAML_STRICT_URL_VALIDATION setting can be used to turn off this check.
# This should only happen if there is no slash, host and/or protocol in the
# given URL. A better fix would be to add those to the RelayState.
saml_strict_url_validation = getattr(settings, "SAML_STRICT_URL_VALIDATION", True)
try:
if saml_strict_url_validation:
# This will also resolve Django URL pattern names
url = resolve_url(url)
except NoReverseMatch:
logger.debug(
"Could not validate given referral url is a valid URL", exc_info=True
)
return None
# Ensure the user-originating redirection url is safe.
# By setting SAML_ALLOWED_HOSTS in settings.py the user may provide a list of "allowed"
# hostnames for post-login redirects, much like one would specify ALLOWED_HOSTS .
# If this setting is absent, the default is to use the hostname that was used for the current
# request.
saml_allowed_hosts = set(
getattr(settings, "SAML_ALLOWED_HOSTS", [request.get_host()])
)
if not url_has_allowed_host_and_scheme(url=url, allowed_hosts=saml_allowed_hosts):
logger.debug("Referral URL not in SAML_ALLOWED_HOSTS or of the origin host.")
return None
return url
def saml2_from_httpredirect_request(url):
urlquery = urllib.parse.urlparse(url).query
b64_inflated_saml2req = urllib.parse.parse_qs(urlquery)["SAMLRequest"][0]
inflated_saml2req = base64.b64decode(b64_inflated_saml2req)
deflated_saml2req = zlib.decompress(inflated_saml2req, -15)
return deflated_saml2req
def get_session_id_from_saml2(saml2_xml):
saml2_xml = saml2_xml.decode() if isinstance(saml2_xml, bytes) else saml2_xml
return re.findall(r'ID="([a-z0-9\-]*)"', saml2_xml, re.I)[0]
def get_subject_id_from_saml2(saml2_xml):
saml2_xml = saml2_xml if isinstance(saml2_xml, str) else saml2_xml.decode()
re.findall('">([a-z0-9]+)</saml:NameID>', saml2_xml)[0]
def add_param_in_url(url: str, param_key: str, param_value: str):
params = list(url.split("?"))
params.append(f"{param_key}={param_value}")
new_url = params[0] + "?" + "".join(params[1:])
return new_url
def add_idp_hinting(request, http_response) -> bool:
idphin_param = getattr(settings, "SAML2_IDPHINT_PARAM", "idphint")
urllib.parse.urlencode(request.GET)
if idphin_param not in request.GET.keys():
return False
idphint = request.GET[idphin_param]
# validation : TODO -> improve!
if idphint[0:4] != "http":
logger.warning(
f'Idp hinting: "{idphint}" doesn\'t contain a valid value.'
"idphint paramenter ignored."
)
return False
if http_response.status_code in (302, 303):
# redirect binding
# urlp = urllib.parse.urlparse(http_response.url)
new_url = add_param_in_url(http_response.url, idphin_param, idphint)
return HttpResponseRedirect(new_url)
elif http_response.status_code == 200:
# post binding
res = re.search(
r'action="(?P<url>[a-z0-9\:\/\_\-\.]*)"',
http_response.content.decode(),
re.I,
)
if not res:
return False
orig_url = res.groupdict()["url"]
#
new_url = add_param_in_url(orig_url, idphin_param, idphint)
content = http_response.content.decode().replace(orig_url, new_url).encode()
return HttpResponse(content)
else:
logger.warning(
f"Idp hinting: cannot detect request type [{http_response.status_code}]"
)
return False
@lru_cache
def get_csp_handler():
"""Returns a view decorator for CSP."""
def empty_view_decorator(view):
return view
csp_handler_string = get_custom_setting("SAML_CSP_HANDLER", None)
if csp_handler_string is None:
# No CSP handler configured, attempt to use django-csp
return _django_csp_update_decorator() or empty_view_decorator
if csp_handler_string.strip() != "":
# Non empty string is configured, attempt to import it
csp_handler = import_string(csp_handler_string)
def custom_csp_updater(f):
@wraps(f)
def wrapper(*args, **kwargs):
return csp_handler(f(*args, **kwargs))
return wrapper
return custom_csp_updater
# Fall back to empty decorator when csp_handler_string is empty
return empty_view_decorator
def _django_csp_update_decorator():
"""Returns a view CSP decorator if django-csp is available, otherwise None."""
try:
from csp.decorators import csp_update
import csp
except ModuleNotFoundError:
# If csp is not installed, do not update fields as Content-Security-Policy
# is not used
logger.warning(
"django-csp could not be found, not updating Content-Security-Policy. Please "
"make sure CSP is configured. This can be done by your reverse proxy, "
"django-csp or a custom CSP handler via SAML_CSP_HANDLER. See "
"https://djangosaml2.readthedocs.io/contents/security.html#content-security-policy"
" for more information. "
"This warning can be disabled by setting `SAML_CSP_HANDLER=''` in your settings."
)
return
else:
# autosubmit of forms uses nonce per default
# form-action https: to send data to IdPs
# Check django-csp version to determine the appropriate format
try:
csp_version = version('django-csp')
major_version = int(csp_version.split('.')[0])
# Version detection successful
if major_version >= 4:
# django-csp 4.0+ uses dict format with named 'config' parameter
return csp_update(config={"form-action": ["https:"]})
# django-csp < 4.0 uses kwargs format
return csp_update(FORM_ACTION=["https:"])
except (PackageNotFoundError, ValueError, RuntimeError, AttributeError, IndexError):
# Version detection failed, we need to try both formats
# Try v4.0+ style first because:
# 1. It has better error handling with clear messages
# 2. Newer versions are more likely to be supported in the future
# 3. If using kwargs with v4.0, it raises a specific RuntimeError we can catch
try:
return csp_update(config={"form-action": ["https:"]})
except (TypeError, RuntimeError):
# TypeErrors could happen if config is not a recognized parameter (v3.x)
# RuntimeErrors could happen in v4.0+ if we try the wrong approach
return csp_update(FORM_ACTION=["https:"])
|