1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
|
import functools
import hmac
import io # noqa: F401
import logging
import sys
import os
import re
import warnings
from stripe._api_mode import ApiMode
from urllib.parse import parse_qsl, quote_plus # noqa: F401
from typing_extensions import Type, TYPE_CHECKING
from typing import (
Callable,
TypeVar,
Union,
overload,
Dict,
List,
cast,
Any,
Optional,
Mapping,
)
import typing_extensions
# Used for global variables
import stripe # noqa: IMP101
if TYPE_CHECKING:
from stripe._stripe_response import StripeResponse
from stripe._stripe_object import StripeObject
from stripe._api_requestor import _APIRequestor
STRIPE_LOG = os.environ.get("STRIPE_LOG")
logger: logging.Logger = logging.getLogger("stripe")
if TYPE_CHECKING:
deprecated = typing_extensions.deprecated
else:
_T = TypeVar("_T")
# Copied from python/typing_extensions, as this was added in typing_extensions 4.5.0 which is incompatible with
# python 3.6. We still need `deprecated = typing_extensions.deprecated` in addition to this fallback, as
# IDEs (pylance) specially detect references to symbols defined in `typing_extensions`
#
# https://github.com/python/typing_extensions/blob/5d20e9eed31de88667542ba5a6f66e6dc439b681/src/typing_extensions.py#L2289-L2370
def deprecated(
__msg: str,
*,
category: Optional[Type[Warning]] = DeprecationWarning,
stacklevel: int = 1,
) -> Callable[[_T], _T]:
def decorator(__arg: _T) -> _T:
if category is None:
__arg.__deprecated__ = __msg
return __arg
elif isinstance(__arg, type):
original_new = __arg.__new__
has_init = __arg.__init__ is not object.__init__
@functools.wraps(original_new)
def __new__(cls, *args, **kwargs):
warnings.warn(
__msg, category=category, stacklevel=stacklevel + 1
)
if original_new is not object.__new__:
return original_new(cls, *args, **kwargs)
# Mirrors a similar check in object.__new__.
elif not has_init and (args or kwargs):
raise TypeError(f"{cls.__name__}() takes no arguments")
else:
return original_new(cls)
__arg.__new__ = staticmethod(__new__)
__arg.__deprecated__ = __new__.__deprecated__ = __msg
return __arg
elif callable(__arg):
@functools.wraps(__arg)
def wrapper(*args, **kwargs):
warnings.warn(
__msg, category=category, stacklevel=stacklevel + 1
)
return __arg(*args, **kwargs)
__arg.__deprecated__ = wrapper.__deprecated__ = __msg
return wrapper
else:
raise TypeError(
"@deprecated decorator with non-None category must be applied to "
f"a class or callable, not {__arg!r}"
)
return decorator
def is_appengine_dev():
return "APPENGINE_RUNTIME" in os.environ and "Dev" in os.environ.get(
"SERVER_SOFTWARE", ""
)
def _console_log_level():
if stripe.log in ["debug", "info"]:
return stripe.log
elif STRIPE_LOG in ["debug", "info"]:
return STRIPE_LOG
else:
return None
def log_debug(message, **params):
msg = logfmt(dict(message=message, **params))
if _console_log_level() == "debug":
print(msg, file=sys.stderr)
logger.debug(msg)
def log_info(message, **params):
msg = logfmt(dict(message=message, **params))
if _console_log_level() in ["debug", "info"]:
print(msg, file=sys.stderr)
logger.info(msg)
def _test_or_live_environment():
if stripe.api_key is None:
return
match = re.match(r"sk_(live|test)_", stripe.api_key)
if match is None:
return
return match.groups()[0]
def dashboard_link(request_id):
return "https://dashboard.stripe.com/{env}/logs/{reqid}".format(
env=_test_or_live_environment() or "test", reqid=request_id
)
def logfmt(props):
def fmt(key, val):
# Handle case where val is a bytes or bytesarray
if hasattr(val, "decode"):
val = val.decode("utf-8")
# Check if val is already a string to avoid re-encoding into
# ascii. Since the code is sent through 2to3, we can't just
# use unicode(val, encoding='utf8') since it will be
# translated incorrectly.
if not isinstance(val, str):
val = str(val)
if re.search(r"\s", val):
val = repr(val)
# key should already be a string
if re.search(r"\s", key):
key = repr(key)
return "{key}={val}".format(key=key, val=val)
return " ".join([fmt(key, val) for key, val in sorted(props.items())])
# Borrowed from Django's source code
if hasattr(hmac, "compare_digest"):
# Prefer the stdlib implementation, when available.
def secure_compare(val1, val2):
return hmac.compare_digest(val1, val2)
else:
def secure_compare(val1, val2):
"""
Returns True if the two strings are equal, False otherwise.
The time taken is independent of the number of characters that match.
For the sake of simplicity, this function executes in constant time
only when the two strings have the same length. It short-circuits when
they have different lengths.
"""
if len(val1) != len(val2):
return False
result = 0
if isinstance(val1, bytes) and isinstance(val2, bytes):
for x, y in zip(val1, val2):
result |= x ^ y
else:
for x, y in zip(val1, val2):
result |= ord(cast(str, x)) ^ ord(cast(str, y))
return result == 0
def get_thin_event_classes():
from stripe.events._event_classes import THIN_EVENT_CLASSES
return THIN_EVENT_CLASSES
def get_object_classes(api_mode):
# This is here to avoid a circular dependency
if api_mode == "V2":
from stripe._object_classes import V2_OBJECT_CLASSES
return V2_OBJECT_CLASSES
from stripe._object_classes import OBJECT_CLASSES
return OBJECT_CLASSES
Resp = Union["StripeResponse", Dict[str, Any], List["Resp"]]
@overload
def convert_to_stripe_object(
resp: Union["StripeResponse", Dict[str, Any]],
api_key: Optional[str] = None,
stripe_version: Optional[str] = None,
stripe_account: Optional[str] = None,
params: Optional[Mapping[str, Any]] = None,
klass_: Optional[Type["StripeObject"]] = None,
*,
api_mode: ApiMode = "V1",
) -> "StripeObject": ...
@overload
def convert_to_stripe_object(
resp: List[Resp],
api_key: Optional[str] = None,
stripe_version: Optional[str] = None,
stripe_account: Optional[str] = None,
params: Optional[Mapping[str, Any]] = None,
klass_: Optional[Type["StripeObject"]] = None,
*,
api_mode: ApiMode = "V1",
) -> List["StripeObject"]: ...
def convert_to_stripe_object(
resp: Resp,
api_key: Optional[str] = None,
stripe_version: Optional[str] = None,
stripe_account: Optional[str] = None,
params: Optional[Mapping[str, Any]] = None,
klass_: Optional[Type["StripeObject"]] = None,
*,
api_mode: ApiMode = "V1",
) -> Union["StripeObject", List["StripeObject"]]:
from stripe._api_requestor import _APIRequestor
return _convert_to_stripe_object(
resp=resp,
params=params,
klass_=klass_,
requestor=_APIRequestor._global_with_options(
api_key=api_key,
stripe_version=stripe_version,
stripe_account=stripe_account,
),
api_mode=api_mode,
)
@overload
def _convert_to_stripe_object(
*,
resp: Union["StripeResponse", Dict[str, Any]],
params: Optional[Mapping[str, Any]] = None,
klass_: Optional[Type["StripeObject"]] = None,
requestor: "_APIRequestor",
api_mode: ApiMode,
) -> "StripeObject": ...
@overload
def _convert_to_stripe_object(
*,
resp: List[Resp],
params: Optional[Mapping[str, Any]] = None,
klass_: Optional[Type["StripeObject"]] = None,
requestor: "_APIRequestor",
api_mode: ApiMode,
) -> List["StripeObject"]: ...
def _convert_to_stripe_object(
*,
resp: Resp,
params: Optional[Mapping[str, Any]] = None,
klass_: Optional[Type["StripeObject"]] = None,
requestor: "_APIRequestor",
api_mode: ApiMode,
) -> Union["StripeObject", List["StripeObject"]]:
# If we get a StripeResponse, we'll want to return a
# StripeObject with the last_response field filled out with
# the raw API response information
stripe_response = None
# Imports here at runtime to avoid circular dependencies
from stripe._stripe_response import StripeResponse
from stripe._stripe_object import StripeObject
if isinstance(resp, StripeResponse):
stripe_response = resp
resp = cast(Resp, stripe_response.data)
if isinstance(resp, list):
return [
_convert_to_stripe_object(
resp=cast("Union[StripeResponse, Dict[str, Any]]", i),
requestor=requestor,
api_mode=api_mode,
klass_=klass_,
)
for i in resp
]
elif isinstance(resp, dict) and not isinstance(resp, StripeObject):
resp = resp.copy()
klass_name = resp.get("object")
if isinstance(klass_name, str):
if api_mode == "V2" and klass_name == "v2.core.event":
event_name = resp.get("type", "")
klass = get_thin_event_classes().get(
event_name, stripe.StripeObject
)
else:
klass = get_object_classes(api_mode).get(
klass_name, stripe.StripeObject
)
# TODO: this is a horrible hack. The API needs
# to return something for `object` here.
elif "data" in resp and "next_page_url" in resp:
klass = stripe.v2.ListObject
elif klass_ is not None:
klass = klass_
else:
klass = StripeObject
obj = klass._construct_from(
values=resp,
last_response=stripe_response,
requestor=requestor,
api_mode=api_mode,
)
# We only need to update _retrieve_params when special params were
# actually passed. Otherwise, leave it as is as the list / search result
# constructors will instantiate their own params.
if (
params is not None
and hasattr(obj, "object")
and (
(getattr(obj, "object") == "list")
or (getattr(obj, "object") == "search_result")
)
):
obj._retrieve_params = params
return obj
else:
return cast("StripeObject", resp)
def convert_to_dict(obj):
"""Converts a StripeObject back to a regular dict.
Nested StripeObjects are also converted back to regular dicts.
:param obj: The StripeObject to convert.
:returns: The StripeObject as a dict.
"""
if isinstance(obj, list):
return [convert_to_dict(i) for i in obj]
# This works by virtue of the fact that StripeObjects _are_ dicts. The dict
# comprehension returns a regular dict and recursively applies the
# conversion to each value.
elif isinstance(obj, dict):
return {k: convert_to_dict(v) for k, v in obj.items()}
else:
return obj
@overload
def populate_headers(
idempotency_key: str,
) -> Dict[str, str]: ...
@overload
def populate_headers(idempotency_key: None) -> None: ...
def populate_headers(
idempotency_key: Union[str, None],
) -> Union[Dict[str, str], None]:
if idempotency_key is not None:
return {"Idempotency-Key": idempotency_key}
return None
T = TypeVar("T")
def merge_dicts(x, y):
z = x.copy()
z.update(y)
return z
def sanitize_id(id):
quotedId = quote_plus(id)
return quotedId
def get_api_mode(url):
if url.startswith("/v2"):
return "V2"
else:
return "V1"
class class_method_variant(object):
def __init__(self, class_method_name):
self.class_method_name = class_method_name
T = TypeVar("T")
method: Any
def __call__(self, method: T) -> T:
self.method = method
return cast(T, self)
def __get__(self, obj, objtype: Optional[Type[Any]] = None):
@functools.wraps(self.method)
def _wrapper(*args, **kwargs):
if obj is not None:
# Method was called as an instance method, e.g.
# instance.method(...)
return self.method(obj, *args, **kwargs)
elif (
len(args) > 0
and objtype is not None
and isinstance(args[0], objtype)
):
# Method was called as a class method with the instance as the
# first argument, e.g. Class.method(instance, ...) which in
# Python is the same thing as calling an instance method
return self.method(args[0], *args[1:], **kwargs)
else:
# Method was called as a class method, e.g. Class.method(...)
class_method = getattr(objtype, self.class_method_name)
return class_method(*args, **kwargs)
return _wrapper
|