1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import json
import copy
import re
from azure_devtools.scenario_tests import RecordingProcessor
from azure_devtools.scenario_tests.utilities import (
is_text_payload,
_get_content_type
)
from urllib.parse import urlparse, parse_qs, urlencode, quote_plus
def _is_merge_patch_payload(entity):
router_additional_accepted_content_type = "application/merge-patch+json"
content_type = _get_content_type(entity)
return content_type.startswith(router_additional_accepted_content_type)
def _is_text_payload_internal(entity):
return is_text_payload(entity)
def sanitize_query_params(value, # type: str
exceptions, # type: List[str]
replacement, # type: str
**kwargs):
parsed_url = urlparse(value)
qs = parse_qs(parsed_url.query)
for k in qs.keys():
if k not in exceptions:
qs[k] = replacement
parsed_url = parsed_url._replace(query = urlencode(qs, doseq = True)) # cSpell:disable-line
return parsed_url.geturl()
class RouterHeaderSanitizer(RecordingProcessor):
def __init__(self,
headers=None, # type: List[str]
replacement="REDACTED"):
self._headers = headers if headers else []
self._replacement = replacement
def process_request(self, request):
return request
def process_response(self, response):
for h in self._headers:
self.replace_header_fn(response, h, lambda v: self._replacement)
return response
class RouterQuerySanitizer(RecordingProcessor):
def __init__(self, exceptions=None, replacement="sanitized"):
if not exceptions:
self._exceptions = []
self._exceptions = exceptions
self._sanitized_value = replacement
def process_request(self, request):
request.uri = sanitize_query_params(request.uri,
exceptions = self._exceptions,
replacement = self._sanitized_value)
return request
def process_response(self,
response):
if 'url' in response:
response['url'] = sanitize_query_params(response['url'],
exceptions = self._exceptions,
replacement = self._sanitized_value)
return response
class RouterURIIdentityReplacer(RecordingProcessor):
def process_request(self, request):
request.uri = re.sub('/routing/classificationPolicies/([^/?]+)', '/routing/classificationPolicies/sanitized',
request.uri)
request.uri = re.sub('/routing/distributionPolicies/([^/?]+)', '/routing/distributionPolicies/sanitized',
request.uri)
request.uri = re.sub('/routing/exceptionPolicies/([^/?]+)', '/routing/exceptionPolicies/sanitized',
request.uri)
request.uri = re.sub('/routing/jobs/([^/?]+)', '/routing/jobs/sanitized',
request.uri)
request.uri = re.sub('/offers/([^/?]+):', '/offers/sanitized:',
request.uri)
request.uri = re.sub('/routing/queues/([^/?]+)', '/routing/queues/sanitized',
request.uri)
request.uri = re.sub('/routing/workers/([^/?]+)', '/routing/workers/sanitized',
request.uri)
return request
def process_response(self, response):
if 'url' in response:
response['url'] = re.sub('/routing/classificationPolicies/([^/?]+)',
'/routing/classificationPolicies/sanitized',
response['url'])
response['url'] = re.sub('/routing/distributionPolicies/([^/?]+)', '/routing/distributionPolicies/sanitized',
response['url'])
response['url'] = re.sub('/routing/exceptionPolicies/([^/?]+)', '/routing/exceptionPolicies/sanitized',
response['url'])
response['url'] = re.sub('/routing/jobs/([^/?]+)', '/routing/jobs/sanitized',
response['url'])
response['url'] = re.sub('/offers/([^/?]+):', '/offers/sanitized:',
response['url'])
response['url'] = re.sub('/routing/queues/([^/?]+)', '/routing/queues/sanitized',
response['url'])
response['url'] = re.sub('/routing/workers/([^/?]+)', '/routing/workers/sanitized',
response['url'])
return response
class RouterScrubber(RecordingProcessor):
"""Sanitize the sensitive info inside request or response bodies"""
def __init__(self, keys=None, replacement="sanitized", max_depth=16):
self._replacement = replacement
self._keys = keys if keys else []
self.max_depth = max_depth
def process_request(self, request):
if _is_text_payload_internal(request) and request.body:
try:
body = json.loads(request.body.decode())
except (KeyError, ValueError) as e:
raise e
body = self._scrub(body, 0)
request.body = json.dumps(body).encode()
return request
def process_response(self, response):
if _is_text_payload_internal(response) and 'body' in response:
import six
try:
if isinstance(response['body'], dict) \
and 'string' in response['body']:
body = response["body"]["string"]
if body == b"":
return response
body_is_string = isinstance(body, six.string_types)
if body_is_string and body and not body.isspace():
body = json.loads(body)
body = self._scrub(body, 0)
response["body"]["string"] = json.dumps(body).encode('utf-8')
except (KeyError, ValueError) as e:
raise e
return response
def _scrub(self, x, depth):
if depth > self.max_depth:
raise ValueError("Max depth reached")
ret = copy.deepcopy(x)
# Handle dictionaries, lits & tuples. Scrub all values
if isinstance(x, dict):
for k, v in ret.items():
if k in self._keys:
ret[k] = self._replacement
else:
ret[k] = self._scrub(v, depth + 1)
if isinstance(x, (list, tuple)):
for k, v in enumerate(ret):
ret[k] = self._scrub(v, depth + 1)
# Finished scrubbing
return ret
|