1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
from io import BytesIO
from botocore.awsrequest import AWSResponse
class Body(BytesIO):
def stream(self, **kwargs):
contents = self.read()
while contents:
yield contents
contents = self.read()
class MockResponse:
def __init__(self, client, status_code, headers, body):
self._client = client
self._status_code = status_code
self._headers = headers
self._body = body
def __enter__(self):
self._client.meta.events.register("before-send", self)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._client.meta.events.unregister("before-send", self)
def __call__(self, request, **kwargs):
return AWSResponse(
request.url,
self._status_code,
self._headers,
Body(self._body),
)
|