File: typing.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (57 lines) | stat: -rw-r--r-- 1,367 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from threading import Thread
from typing import Callable, TypeVar, Union

from .abc.observable import Subscription
from .abc.observer import OnCompleted, OnError, OnNext
from .abc.periodicscheduler import (
    ScheduledPeriodicAction,
    ScheduledSingleOrPeriodicAction,
)
from .abc.scheduler import (
    AbsoluteOrRelativeTime,
    AbsoluteTime,
    RelativeTime,
    ScheduledAction,
)
from .abc.startable import StartableBase

_TState = TypeVar("_TState")
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")

Action = Callable[[], None]

Mapper = Callable[[_T1], _T2]
MapperIndexed = Callable[[_T1, int], _T2]
Predicate = Callable[[_T1], bool]
PredicateIndexed = Callable[[_T1, int], bool]
Comparer = Callable[[_T1, _T1], bool]
SubComparer = Callable[[_T1, _T1], int]
Accumulator = Callable[[_TState, _T1], _TState]


Startable = Union[StartableBase, Thread]
StartableTarget = Callable[..., None]
StartableFactory = Callable[[StartableTarget], Startable]

__all__ = [
    "Accumulator",
    "AbsoluteTime",
    "AbsoluteOrRelativeTime",
    "Comparer",
    "Mapper",
    "MapperIndexed",
    "OnNext",
    "OnError",
    "OnCompleted",
    "Predicate",
    "PredicateIndexed",
    "RelativeTime",
    "SubComparer",
    "ScheduledPeriodicAction",
    "ScheduledSingleOrPeriodicAction",
    "ScheduledAction",
    "Startable",
    "StartableTarget",
    "Subscription",
]