File: context.py

package info (click to toggle)
python-aiobotocore 2.25.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,524 kB
  • sloc: python: 15,437; makefile: 84
file content (40 lines) | stat: -rw-r--r-- 892 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from contextlib import asynccontextmanager
from copy import deepcopy
from functools import wraps

from botocore.context import (
    ClientContext,
    get_context,
    reset_context,
    set_context,
)

from ._helpers import resolve_awaitable


@asynccontextmanager
async def start_as_current_context(ctx=None):
    current = ctx or get_context()
    if current is None:
        new = ClientContext()
    else:
        new = deepcopy(current)
    token = set_context(new)
    try:
        yield
    finally:
        reset_context(token)


def with_current_context(hook=None):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with start_as_current_context():
                if hook:
                    await resolve_awaitable(hook())
                return await func(*args, **kwargs)

        return wrapper

    return decorator