1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
from contextlib import asynccontextmanager
from copy import deepcopy
from functools import wraps
from botocore.context import (
ClientContext,
get_context,
reset_context,
set_context,
)
from ._helpers import resolve_awaitable
@asynccontextmanager
async def start_as_current_context(ctx=None):
current = ctx or get_context()
if current is None:
new = ClientContext()
else:
new = deepcopy(current)
token = set_context(new)
try:
yield
finally:
reset_context(token)
def with_current_context(hook=None):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with start_as_current_context():
if hook:
await resolve_awaitable(hook())
return await func(*args, **kwargs)
return wrapper
return decorator
|