File: _util.py

package info (click to toggle)
python-stripe 12.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,864 kB
  • sloc: python: 157,573; makefile: 13; sh: 9
file content (460 lines) | stat: -rw-r--r-- 13,744 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
import functools
import hmac
import io  # noqa: F401
import logging
import sys
import os
import re
import warnings

from stripe._api_mode import ApiMode

from urllib.parse import parse_qsl, quote_plus  # noqa: F401

from typing_extensions import Type, TYPE_CHECKING
from typing import (
    Callable,
    TypeVar,
    Union,
    overload,
    Dict,
    List,
    cast,
    Any,
    Optional,
    Mapping,
)
import typing_extensions


# Used for global variables
import stripe  # noqa: IMP101

if TYPE_CHECKING:
    from stripe._stripe_response import StripeResponse
    from stripe._stripe_object import StripeObject
    from stripe._api_requestor import _APIRequestor

STRIPE_LOG = os.environ.get("STRIPE_LOG")

logger: logging.Logger = logging.getLogger("stripe")

if TYPE_CHECKING:
    deprecated = typing_extensions.deprecated
else:
    _T = TypeVar("_T")

    # Copied from python/typing_extensions, as this was added in typing_extensions 4.5.0 which is incompatible with
    # python 3.6. We still need `deprecated = typing_extensions.deprecated` in addition to this fallback, as
    # IDEs (pylance) specially detect references to symbols defined in `typing_extensions`
    #
    # https://github.com/python/typing_extensions/blob/5d20e9eed31de88667542ba5a6f66e6dc439b681/src/typing_extensions.py#L2289-L2370
    def deprecated(
        __msg: str,
        *,
        category: Optional[Type[Warning]] = DeprecationWarning,
        stacklevel: int = 1,
    ) -> Callable[[_T], _T]:
        def decorator(__arg: _T) -> _T:
            if category is None:
                __arg.__deprecated__ = __msg
                return __arg
            elif isinstance(__arg, type):
                original_new = __arg.__new__
                has_init = __arg.__init__ is not object.__init__

                @functools.wraps(original_new)
                def __new__(cls, *args, **kwargs):
                    warnings.warn(
                        __msg, category=category, stacklevel=stacklevel + 1
                    )
                    if original_new is not object.__new__:
                        return original_new(cls, *args, **kwargs)
                    # Mirrors a similar check in object.__new__.
                    elif not has_init and (args or kwargs):
                        raise TypeError(f"{cls.__name__}() takes no arguments")
                    else:
                        return original_new(cls)

                __arg.__new__ = staticmethod(__new__)
                __arg.__deprecated__ = __new__.__deprecated__ = __msg
                return __arg
            elif callable(__arg):

                @functools.wraps(__arg)
                def wrapper(*args, **kwargs):
                    warnings.warn(
                        __msg, category=category, stacklevel=stacklevel + 1
                    )
                    return __arg(*args, **kwargs)

                __arg.__deprecated__ = wrapper.__deprecated__ = __msg
                return wrapper
            else:
                raise TypeError(
                    "@deprecated decorator with non-None category must be applied to "
                    f"a class or callable, not {__arg!r}"
                )

        return decorator


def is_appengine_dev():
    return "APPENGINE_RUNTIME" in os.environ and "Dev" in os.environ.get(
        "SERVER_SOFTWARE", ""
    )


def _console_log_level():
    if stripe.log in ["debug", "info"]:
        return stripe.log
    elif STRIPE_LOG in ["debug", "info"]:
        return STRIPE_LOG
    else:
        return None


def log_debug(message, **params):
    msg = logfmt(dict(message=message, **params))
    if _console_log_level() == "debug":
        print(msg, file=sys.stderr)
    logger.debug(msg)


def log_info(message, **params):
    msg = logfmt(dict(message=message, **params))
    if _console_log_level() in ["debug", "info"]:
        print(msg, file=sys.stderr)
    logger.info(msg)


def _test_or_live_environment():
    if stripe.api_key is None:
        return
    match = re.match(r"sk_(live|test)_", stripe.api_key)
    if match is None:
        return
    return match.groups()[0]


def dashboard_link(request_id):
    return "https://dashboard.stripe.com/{env}/logs/{reqid}".format(
        env=_test_or_live_environment() or "test", reqid=request_id
    )


def logfmt(props):
    def fmt(key, val):
        # Handle case where val is a bytes or bytesarray
        if hasattr(val, "decode"):
            val = val.decode("utf-8")
        # Check if val is already a string to avoid re-encoding into
        # ascii. Since the code is sent through 2to3, we can't just
        # use unicode(val, encoding='utf8') since it will be
        # translated incorrectly.
        if not isinstance(val, str):
            val = str(val)
        if re.search(r"\s", val):
            val = repr(val)
        # key should already be a string
        if re.search(r"\s", key):
            key = repr(key)
        return "{key}={val}".format(key=key, val=val)

    return " ".join([fmt(key, val) for key, val in sorted(props.items())])


# Borrowed from Django's source code
if hasattr(hmac, "compare_digest"):
    # Prefer the stdlib implementation, when available.
    def secure_compare(val1, val2):
        return hmac.compare_digest(val1, val2)

else:

    def secure_compare(val1, val2):
        """
        Returns True if the two strings are equal, False otherwise.
        The time taken is independent of the number of characters that match.
        For the sake of simplicity, this function executes in constant time
        only when the two strings have the same length. It short-circuits when
        they have different lengths.
        """
        if len(val1) != len(val2):
            return False
        result = 0
        if isinstance(val1, bytes) and isinstance(val2, bytes):
            for x, y in zip(val1, val2):
                result |= x ^ y
        else:
            for x, y in zip(val1, val2):
                result |= ord(cast(str, x)) ^ ord(cast(str, y))
        return result == 0


def get_thin_event_classes():
    from stripe.events._event_classes import THIN_EVENT_CLASSES

    return THIN_EVENT_CLASSES


def get_object_classes(api_mode):
    # This is here to avoid a circular dependency
    if api_mode == "V2":
        from stripe._object_classes import V2_OBJECT_CLASSES

        return V2_OBJECT_CLASSES

    from stripe._object_classes import OBJECT_CLASSES

    return OBJECT_CLASSES


Resp = Union["StripeResponse", Dict[str, Any], List["Resp"]]


@overload
def convert_to_stripe_object(
    resp: Union["StripeResponse", Dict[str, Any]],
    api_key: Optional[str] = None,
    stripe_version: Optional[str] = None,
    stripe_account: Optional[str] = None,
    params: Optional[Mapping[str, Any]] = None,
    klass_: Optional[Type["StripeObject"]] = None,
    *,
    api_mode: ApiMode = "V1",
) -> "StripeObject": ...


@overload
def convert_to_stripe_object(
    resp: List[Resp],
    api_key: Optional[str] = None,
    stripe_version: Optional[str] = None,
    stripe_account: Optional[str] = None,
    params: Optional[Mapping[str, Any]] = None,
    klass_: Optional[Type["StripeObject"]] = None,
    *,
    api_mode: ApiMode = "V1",
) -> List["StripeObject"]: ...


def convert_to_stripe_object(
    resp: Resp,
    api_key: Optional[str] = None,
    stripe_version: Optional[str] = None,
    stripe_account: Optional[str] = None,
    params: Optional[Mapping[str, Any]] = None,
    klass_: Optional[Type["StripeObject"]] = None,
    *,
    api_mode: ApiMode = "V1",
) -> Union["StripeObject", List["StripeObject"]]:
    from stripe._api_requestor import _APIRequestor

    return _convert_to_stripe_object(
        resp=resp,
        params=params,
        klass_=klass_,
        requestor=_APIRequestor._global_with_options(
            api_key=api_key,
            stripe_version=stripe_version,
            stripe_account=stripe_account,
        ),
        api_mode=api_mode,
    )


@overload
def _convert_to_stripe_object(
    *,
    resp: Union["StripeResponse", Dict[str, Any]],
    params: Optional[Mapping[str, Any]] = None,
    klass_: Optional[Type["StripeObject"]] = None,
    requestor: "_APIRequestor",
    api_mode: ApiMode,
) -> "StripeObject": ...


@overload
def _convert_to_stripe_object(
    *,
    resp: List[Resp],
    params: Optional[Mapping[str, Any]] = None,
    klass_: Optional[Type["StripeObject"]] = None,
    requestor: "_APIRequestor",
    api_mode: ApiMode,
) -> List["StripeObject"]: ...


def _convert_to_stripe_object(
    *,
    resp: Resp,
    params: Optional[Mapping[str, Any]] = None,
    klass_: Optional[Type["StripeObject"]] = None,
    requestor: "_APIRequestor",
    api_mode: ApiMode,
) -> Union["StripeObject", List["StripeObject"]]:
    # If we get a StripeResponse, we'll want to return a
    # StripeObject with the last_response field filled out with
    # the raw API response information
    stripe_response = None

    # Imports here at runtime to avoid circular dependencies
    from stripe._stripe_response import StripeResponse
    from stripe._stripe_object import StripeObject

    if isinstance(resp, StripeResponse):
        stripe_response = resp
        resp = cast(Resp, stripe_response.data)

    if isinstance(resp, list):
        return [
            _convert_to_stripe_object(
                resp=cast("Union[StripeResponse, Dict[str, Any]]", i),
                requestor=requestor,
                api_mode=api_mode,
                klass_=klass_,
            )
            for i in resp
        ]
    elif isinstance(resp, dict) and not isinstance(resp, StripeObject):
        resp = resp.copy()
        klass_name = resp.get("object")
        if isinstance(klass_name, str):
            if api_mode == "V2" and klass_name == "v2.core.event":
                event_name = resp.get("type", "")
                klass = get_thin_event_classes().get(
                    event_name, stripe.StripeObject
                )
            else:
                klass = get_object_classes(api_mode).get(
                    klass_name, stripe.StripeObject
                )
        # TODO: this is a horrible hack. The API needs
        # to return something for `object` here.

        elif "data" in resp and "next_page_url" in resp:
            klass = stripe.v2.ListObject
        elif klass_ is not None:
            klass = klass_
        else:
            klass = StripeObject

        obj = klass._construct_from(
            values=resp,
            last_response=stripe_response,
            requestor=requestor,
            api_mode=api_mode,
        )

        # We only need to update _retrieve_params when special params were
        # actually passed. Otherwise, leave it as is as the list / search result
        # constructors will instantiate their own params.
        if (
            params is not None
            and hasattr(obj, "object")
            and (
                (getattr(obj, "object") == "list")
                or (getattr(obj, "object") == "search_result")
            )
        ):
            obj._retrieve_params = params

        return obj
    else:
        return cast("StripeObject", resp)


def convert_to_dict(obj):
    """Converts a StripeObject back to a regular dict.

    Nested StripeObjects are also converted back to regular dicts.

    :param obj: The StripeObject to convert.

    :returns: The StripeObject as a dict.
    """
    if isinstance(obj, list):
        return [convert_to_dict(i) for i in obj]
    # This works by virtue of the fact that StripeObjects _are_ dicts. The dict
    # comprehension returns a regular dict and recursively applies the
    # conversion to each value.
    elif isinstance(obj, dict):
        return {k: convert_to_dict(v) for k, v in obj.items()}
    else:
        return obj


@overload
def populate_headers(
    idempotency_key: str,
) -> Dict[str, str]: ...


@overload
def populate_headers(idempotency_key: None) -> None: ...


def populate_headers(
    idempotency_key: Union[str, None],
) -> Union[Dict[str, str], None]:
    if idempotency_key is not None:
        return {"Idempotency-Key": idempotency_key}
    return None


T = TypeVar("T")


def merge_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


def sanitize_id(id):
    quotedId = quote_plus(id)
    return quotedId


def get_api_mode(url):
    if url.startswith("/v2"):
        return "V2"
    else:
        return "V1"


class class_method_variant(object):
    def __init__(self, class_method_name):
        self.class_method_name = class_method_name

    T = TypeVar("T")

    method: Any

    def __call__(self, method: T) -> T:
        self.method = method
        return cast(T, self)

    def __get__(self, obj, objtype: Optional[Type[Any]] = None):
        @functools.wraps(self.method)
        def _wrapper(*args, **kwargs):
            if obj is not None:
                # Method was called as an instance method, e.g.
                # instance.method(...)
                return self.method(obj, *args, **kwargs)
            elif (
                len(args) > 0
                and objtype is not None
                and isinstance(args[0], objtype)
            ):
                # Method was called as a class method with the instance as the
                # first argument, e.g. Class.method(instance, ...) which in
                # Python is the same thing as calling an instance method
                return self.method(args[0], *args[1:], **kwargs)
            else:
                # Method was called as a class method, e.g. Class.method(...)
                class_method = getattr(objtype, self.class_method_name)
                return class_method(*args, **kwargs)

        return _wrapper