File: concurrency.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (26 lines) | stat: -rw-r--r-- 691 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import RLock, Thread
from typing import Any, Callable, TypeVar

from typing_extensions import ParamSpec

from reactivex.typing import StartableTarget

_T = TypeVar("_T")
_P = ParamSpec("_P")


def default_thread_factory(target: StartableTarget) -> Thread:
    return Thread(target=target, daemon=True)


def synchronized(lock: RLock) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
    """A decorator for synchronizing access to a given function."""

    def wrapper(fn: Callable[_P, _T]) -> Callable[_P, _T]:
        def inner(*args: _P.args, **kw: _P.kwargs) -> Any:
            with lock:
                return fn(*args, **kw)

        return inner

    return wrapper