1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
# pylint: disable=no-self-use
from typing import ( # pylint: disable=unused-import
Any, Dict, Optional, Tuple, Union,
TYPE_CHECKING)
try:
from urllib.parse import quote
except ImportError:
from urllib2 import quote # type: ignore
from azure.core import MatchConditions
from ._models import (
ContainerEncryptionScope,
DelimitedJsonDialect)
from ._generated.models import (
ModifiedAccessConditions,
SourceModifiedAccessConditions,
CpkScopeInfo,
ContainerCpkScopeInfo,
QueryFormat,
QuerySerialization,
DelimitedTextConfiguration,
JsonTextConfiguration,
ArrowConfiguration,
QueryFormatType,
BlobTag,
BlobTags, LeaseAccessConditions
)
if TYPE_CHECKING:
from ._lease import BlobLeaseClient
_SUPPORTED_API_VERSIONS = [
'2019-02-02',
'2019-07-07',
'2019-10-10',
'2019-12-12',
'2020-02-10',
'2020-04-08',
'2020-06-12',
'2020-08-04',
'2020-10-02',
'2020-12-06',
'2021-02-12',
'2021-04-10',
'2021-06-08',
'2021-08-06'
]
def _get_match_headers(kwargs, match_param, etag_param):
# type: (Dict[str, Any], str, str) -> Tuple(Dict[str, Any], Optional[str], Optional[str])
if_match = None
if_none_match = None
match_condition = kwargs.pop(match_param, None)
if match_condition == MatchConditions.IfNotModified:
if_match = kwargs.pop(etag_param, None)
if not if_match:
raise ValueError("'{}' specified without '{}'.".format(match_param, etag_param))
elif match_condition == MatchConditions.IfPresent:
if_match = '*'
elif match_condition == MatchConditions.IfModified:
if_none_match = kwargs.pop(etag_param, None)
if not if_none_match:
raise ValueError("'{}' specified without '{}'.".format(match_param, etag_param))
elif match_condition == MatchConditions.IfMissing:
if_none_match = '*'
elif match_condition is None:
if kwargs.get(etag_param):
raise ValueError("'{}' specified without '{}'.".format(etag_param, match_param))
else:
raise TypeError("Invalid match condition: {}".format(match_condition))
return if_match, if_none_match
def get_access_conditions(lease):
# type: (Optional[Union[BlobLeaseClient, str]]) -> Union[LeaseAccessConditions, None]
try:
lease_id = lease.id # type: ignore
except AttributeError:
lease_id = lease # type: ignore
return LeaseAccessConditions(lease_id=lease_id) if lease_id else None
def get_modify_conditions(kwargs):
# type: (Dict[str, Any]) -> ModifiedAccessConditions
if_match, if_none_match = _get_match_headers(kwargs, 'match_condition', 'etag')
return ModifiedAccessConditions(
if_modified_since=kwargs.pop('if_modified_since', None),
if_unmodified_since=kwargs.pop('if_unmodified_since', None),
if_match=if_match or kwargs.pop('if_match', None),
if_none_match=if_none_match or kwargs.pop('if_none_match', None),
if_tags=kwargs.pop('if_tags_match_condition', None)
)
def get_source_conditions(kwargs):
# type: (Dict[str, Any]) -> SourceModifiedAccessConditions
if_match, if_none_match = _get_match_headers(kwargs, 'source_match_condition', 'source_etag')
return SourceModifiedAccessConditions(
source_if_modified_since=kwargs.pop('source_if_modified_since', None),
source_if_unmodified_since=kwargs.pop('source_if_unmodified_since', None),
source_if_match=if_match or kwargs.pop('source_if_match', None),
source_if_none_match=if_none_match or kwargs.pop('source_if_none_match', None),
source_if_tags=kwargs.pop('source_if_tags_match_condition', None)
)
def get_cpk_scope_info(kwargs):
# type: (Dict[str, Any]) -> CpkScopeInfo
if 'encryption_scope' in kwargs:
return CpkScopeInfo(encryption_scope=kwargs.pop('encryption_scope'))
return None
def get_container_cpk_scope_info(kwargs):
# type: (Dict[str, Any]) -> ContainerCpkScopeInfo
encryption_scope = kwargs.pop('container_encryption_scope', None)
if encryption_scope:
if isinstance(encryption_scope, ContainerEncryptionScope):
return ContainerCpkScopeInfo(
default_encryption_scope=encryption_scope.default_encryption_scope,
prevent_encryption_scope_override=encryption_scope.prevent_encryption_scope_override
)
if isinstance(encryption_scope, dict):
return ContainerCpkScopeInfo(
default_encryption_scope=encryption_scope['default_encryption_scope'],
prevent_encryption_scope_override=encryption_scope.get('prevent_encryption_scope_override')
)
raise TypeError("Container encryption scope must be dict or type ContainerEncryptionScope.")
return None
def get_api_version(kwargs):
# type: (Dict[str, Any]) -> str
api_version = kwargs.get('api_version', None)
if api_version and api_version not in _SUPPORTED_API_VERSIONS:
versions = '\n'.join(_SUPPORTED_API_VERSIONS)
raise ValueError("Unsupported API version '{}'. Please select from:\n{}".format(api_version, versions))
return api_version or _SUPPORTED_API_VERSIONS[-1]
def serialize_blob_tags_header(tags=None):
# type: (Optional[Dict[str, str]]) -> str
if tags is None:
return None
components = list()
if tags:
for key, value in tags.items():
components.append(quote(key, safe='.-'))
components.append('=')
components.append(quote(value, safe='.-'))
components.append('&')
if components:
del components[-1]
return ''.join(components)
def serialize_blob_tags(tags=None):
# type: (Optional[Dict[str, str]]) -> Union[BlobTags, None]
tag_list = list()
if tags:
tag_list = [BlobTag(key=k, value=v) for k, v in tags.items()]
return BlobTags(blob_tag_set=tag_list)
def serialize_query_format(formater):
if formater == "ParquetDialect":
qq_format = QueryFormat(
type=QueryFormatType.PARQUET,
parquet_text_configuration=' '
)
elif isinstance(formater, DelimitedJsonDialect):
serialization_settings = JsonTextConfiguration(
record_separator=formater.delimiter
)
qq_format = QueryFormat(
type=QueryFormatType.json,
json_text_configuration=serialization_settings)
elif hasattr(formater, 'quotechar'): # This supports a csv.Dialect as well
try:
headers = formater.has_header
except AttributeError:
headers = False
serialization_settings = DelimitedTextConfiguration(
column_separator=formater.delimiter,
field_quote=formater.quotechar,
record_separator=formater.lineterminator,
escape_char=formater.escapechar,
headers_present=headers
)
qq_format = QueryFormat(
type=QueryFormatType.delimited,
delimited_text_configuration=serialization_settings
)
elif isinstance(formater, list):
serialization_settings = ArrowConfiguration(
schema=formater
)
qq_format = QueryFormat(
type=QueryFormatType.arrow,
arrow_configuration=serialization_settings)
elif not formater:
return None
else:
raise TypeError("Format must be DelimitedTextDialect or DelimitedJsonDialect or ParquetDialect.")
return QuerySerialization(format=qq_format)
|