1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
import errno
import logging
from contextlib import contextmanager, AsyncExitStack
from botocore.exceptions import ClientError
logger = logging.getLogger("s3fs")
@contextmanager
def ignoring(*exceptions):
try:
yield
except exceptions:
pass
class S3BucketRegionCache:
# See https://github.com/aio-libs/aiobotocore/issues/866
# for details.
def __init__(self, session, **client_kwargs):
self._session = session
self._stack = AsyncExitStack()
self._client = None
self._client_kwargs = client_kwargs
self._buckets = {}
self._regions = {}
async def get_bucket_client(self, bucket_name=None):
if bucket_name in self._buckets:
return self._buckets[bucket_name]
general_client = await self.get_client()
if bucket_name is None:
return general_client
try:
response = await general_client.head_bucket(Bucket=bucket_name)
except ClientError as e:
logger.debug("RC: HEAD_BUCKET call for %r has failed", bucket_name)
response = e.response
region = (
response["ResponseMetadata"]
.get("HTTPHeaders", {})
.get("x-amz-bucket-region")
)
if not region:
logger.debug(
"RC: No region in HEAD_BUCKET call response for %r, returning the general client",
bucket_name,
)
return general_client
if region not in self._regions:
logger.debug(
"RC: Creating a new regional client for %r on the region %r",
bucket_name,
region,
)
self._regions[region] = await self._stack.enter_async_context(
self._session.create_client(
"s3", region_name=region, **self._client_kwargs
)
)
client = self._buckets[bucket_name] = self._regions[region]
return client
async def get_client(self):
if not self._client:
self._client = await self._stack.enter_async_context(
self._session.create_client("s3", **self._client_kwargs)
)
return self._client
async def clear(self):
logger.debug("RC: discarding all clients")
self._buckets.clear()
self._regions.clear()
self._client = None
await self._stack.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *exc_args):
await self.clear()
class FileExpired(IOError):
"""
Is raised, when the file content has been changed from a different process after
opening the file. Reading the file would lead to invalid or inconsistent output.
This can also be triggered by outdated file-information inside the directory cache.
In this case ``S3FileSystem.invalidate_cache`` can be used to force an update of
the file-information when opening the file.
"""
def __init__(self, filename: str, e_tag: str):
super().__init__(
errno.EBUSY,
"The remote file corresponding to filename %s and Etag %s no longer exists."
% (filename, e_tag),
)
def title_case(string):
"""
TitleCases a given string.
Parameters
----------
string : underscore separated string
"""
return "".join(x.capitalize() for x in string.split("_"))
class ParamKwargsHelper:
"""
Utility class to help extract the subset of keys that an s3 method is
actually using
Parameters
----------
s3 : boto S3FileSystem
"""
_kwarg_cache = {}
def __init__(self, s3):
self.s3 = s3
def _get_valid_keys(self, model_name):
if model_name not in self._kwarg_cache:
model = self.s3.meta.service_model.operation_model(model_name)
valid_keys = (
set(model.input_shape.members.keys())
if model.input_shape is not None
else set()
)
self._kwarg_cache[model_name] = valid_keys
return self._kwarg_cache[model_name]
def filter_dict(self, method_name, d):
model_name = title_case(method_name)
valid_keys = self._get_valid_keys(model_name)
if isinstance(d, SSEParams):
d = d.to_kwargs()
return {k: v for k, v in d.items() if k in valid_keys}
class SSEParams:
def __init__(
self,
server_side_encryption=None,
sse_customer_algorithm=None,
sse_customer_key=None,
sse_kms_key_id=None,
):
self.ServerSideEncryption = server_side_encryption
self.SSECustomerAlgorithm = sse_customer_algorithm
self.SSECustomerKey = sse_customer_key
self.SSEKMSKeyId = sse_kms_key_id
def to_kwargs(self):
return {k: v for k, v in self.__dict__.items() if v is not None}
def _get_brange(size, block):
"""
Chunk up a file into zero-based byte ranges
Parameters
----------
size : file size
block : block size
"""
for offset in range(0, size, block):
yield offset, min(offset + block - 1, size - 1)
|