1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import binascii
import contextlib
import os
import random
import shutil
import tempfile
import time
from unittest import mock # noqa: F401
import botocore.loaders
from botocore.compat import parse_qs, urlparse
import aiobotocore.session
from aiobotocore.awsrequest import AioAWSResponse
_LOADER = botocore.loaders.Loader()
def random_chars(num_chars):
"""Returns random hex characters.
Useful for creating resources with random names.
"""
return binascii.hexlify(os.urandom(int(num_chars / 2))).decode('ascii')
def create_session(**kwargs):
# Create a Session object. By default,
# the _LOADER object is used as the loader
# so that we reused the same models across tests.
session = aiobotocore.session.AioSession(**kwargs)
session.register_component('data_loader', _LOADER)
session.set_config_variable('credentials_file', 'noexist/foo/botocore')
return session
@contextlib.contextmanager
def temporary_file(mode):
"""This is a cross platform temporary file creation.
tempfile.NamedTemporary file on windows creates a secure temp file
that can't be read by other processes and can't be opened a second time.
For tests, we generally *want* them to be read multiple times.
The test fixture writes the temp file contents, the test reads the
temp file.
"""
temporary_directory = tempfile.mkdtemp()
basename = f'tmpfile-{int(time.time())}-{random.randint(1, 1000)}'
full_filename = os.path.join(temporary_directory, basename)
open(full_filename, 'w').close()
try:
with open(full_filename, mode) as f:
yield f
finally:
shutil.rmtree(temporary_directory)
def _urlparse(url):
if isinstance(url, bytes):
# Not really necessary, but it helps to reduce noise on Python 2.x
url = url.decode('utf8')
return urlparse(url)
def assert_url_equal(url1, url2):
parts1 = _urlparse(url1)
parts2 = _urlparse(url2)
# Because the query string ordering isn't relevant, we have to parse
# every single part manually and then handle the query string.
assert parts1.scheme == parts2.scheme
assert parts1.netloc == parts2.netloc
assert parts1.path == parts2.path
assert parts1.params == parts2.params
assert parts1.fragment == parts2.fragment
assert parts1.username == parts2.username
assert parts1.password == parts2.password
assert parts1.hostname == parts2.hostname
assert parts1.port == parts2.port
assert parse_qs(parts1.query) == parse_qs(parts2.query)
class HTTPStubberException(Exception):
pass
class BaseHTTPStubber:
class AsyncFileWrapper:
def __init__(self, body: bytes):
self._body = body
async def read(self):
return self._body
def __init__(self, obj_with_event_emitter, strict=True):
self.reset()
self._strict = strict
self._obj_with_event_emitter = obj_with_event_emitter
def reset(self):
self.requests = []
self.responses = []
def add_response(
self, url='https://example.com', status=200, headers=None, body=b''
):
if headers is None:
headers = {}
response = AioAWSResponse(
url, status, headers, self.AsyncFileWrapper(body)
)
self.responses.append(response)
@property
def _events(self):
raise NotImplementedError('_events')
def start(self):
self._events.register('before-send', self)
def stop(self):
self._events.unregister('before-send', self)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
def __call__(self, request, **kwargs):
self.requests.append(request)
if self.responses:
response = self.responses.pop(0)
if isinstance(response, Exception):
raise response
else:
return response
elif self._strict:
raise HTTPStubberException('Insufficient responses')
else:
return None
class ClientHTTPStubber(BaseHTTPStubber):
@property
def _events(self):
return self._obj_with_event_emitter.meta.events
class SessionHTTPStubber(BaseHTTPStubber):
@property
def _events(self):
return self._obj_with_event_emitter.get_component('event_emitter')
def patch_load_service_model(
session, monkeypatch, service_model_json, ruleset_json
):
def mock_load_service_model(service_name, type_name, api_version=None):
if type_name == 'service-2':
return service_model_json
if type_name == 'endpoint-rule-set-1':
return ruleset_json
loader = session.get_component('data_loader')
monkeypatch.setattr(loader, 'load_service_model', mock_load_service_model)
|