1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from typing import Any, Dict, Optional
from urllib.parse import urlparse
from azure.core.pipeline.transport import AioHttpTransportResponse, AsyncHttpTransport
from azure.core.rest import HttpRequest
from aiohttp import ClientResponse
class ProgressTracker:
def __init__(self, total: int, step: int):
self.total = total
self.step = step
self.current = 0
async def assert_progress(self, current: int, total: Optional[int]):
if self.current != self.total:
self.current += self.step
if total:
assert self.total == total
assert self.current == current
def assert_complete(self):
assert self.total == self.current
class AsyncStream:
def __init__(self, data: bytes):
self._data = data
self._offset = 0
def __len__(self) -> int:
return len(self._data)
async def read(self, size: int = -1) -> bytes:
if size == -1:
return self._data
start = self._offset
end = self._offset + size
data = self._data[start:end]
self._offset += len(data)
return data
class MockAioHttpClientResponse(ClientResponse):
def __init__(
self, url: str,
body_bytes: bytes,
headers: Dict[str, Any],
status: int = 200,
reason: str = "OK"
) -> None:
super(MockAioHttpClientResponse).__init__()
self._url = url
self._body = body_bytes
self._headers = headers
self._cache = {}
self._loop = None
self.status = status
self.reason = reason
class MockStorageTransport(AsyncHttpTransport):
"""
This transport returns legacy http response objects from azure core and is
intended only to test our backwards compatibility support.
"""
async def send(self, request: HttpRequest, **kwargs: Any) -> AioHttpTransportResponse:
if request.method == 'GET':
# download_file
headers = {
"Content-Type": "application/octet-stream",
"Content-Range": "bytes 0-17/18",
"Content-Length": "18",
}
if "x-ms-range-get-content-md5" in request.headers:
headers["Content-MD5"] = "I3pVbaOCUTom+G9F9uKFoA=="
rest_response = AioHttpTransportResponse(
request=request,
aiohttp_response=MockAioHttpClientResponse(
request.url,
b"Hello Async World!",
headers,
),
decompress=False
)
elif request.method == 'HEAD':
# get_file_properties
rest_response = AioHttpTransportResponse(
request=request,
aiohttp_response=MockAioHttpClientResponse(
request.url,
b"",
{
"Content-Type": "application/octet-stream",
"Content-Length": "1024",
},
),
decompress=False
)
elif request.method == 'PUT':
# upload_data
rest_response = AioHttpTransportResponse(
request=request,
aiohttp_response=MockAioHttpClientResponse(
request.url,
b"",
{
"Content-Length": "0",
},
201,
"Created"
),
decompress=False
)
elif request.method == 'PATCH':
# upload_data_chunks
parsed = urlparse(request.url)
if "action=flush" in parsed.query:
rest_response = AioHttpTransportResponse(
request=request,
aiohttp_response=MockAioHttpClientResponse(
request.url,
b"",
{
"Content-Length": "0",
},
200,
"OK"
),
decompress=False
)
else:
rest_response = AioHttpTransportResponse(
request=request,
aiohttp_response=MockAioHttpClientResponse(
request.url,
b"",
{
"Content-Length": "0",
},
202,
"Accepted"
),
decompress=False
)
elif request.method == 'DELETE':
# delete_file
rest_response = AioHttpTransportResponse(
request=request,
aiohttp_response=MockAioHttpClientResponse(
request.url,
b"",
{
"Content-Length": "0",
},
202,
"Accepted"
),
decompress=False
)
else:
raise ValueError("The request is not accepted as part of MockStorageTransport.")
await rest_response.load_body()
return rest_response
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
async def open(self):
pass
async def close(self):
pass
|