1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
from typing import Union
from pubnub.enums import PNStatusCategory
from pubnub.exceptions import PubNubException
class PNEffect:
pass
class PNManageableEffect(PNEffect):
pass
class PNCancelEffect(PNEffect):
cancel_effect: str
class HandshakeEffect(PNManageableEffect):
def __init__(
self,
channels: Union[None, list[str]] = None,
groups: Union[None, list[str]] = None,
) -> None:
super().__init__()
self.channels = channels
self.groups = groups
class CancelHandshakeEffect(PNCancelEffect):
cancel_effect = HandshakeEffect.__name__
class ReceiveMessagesEffect(PNManageableEffect):
def __init__(
self,
channels: Union[None, list[str]] = None,
groups: Union[None, list[str]] = None,
timetoken: Union[None, str] = None,
region: Union[None, int] = None,
) -> None:
super().__init__()
self.channels = channels
self.groups = groups
self.timetoken = timetoken
self.region = region
class CancelReceiveMessagesEffect(PNCancelEffect):
cancel_effect = ReceiveMessagesEffect.__name__
class ReconnectEffect(PNManageableEffect):
def __init__(
self,
channels: Union[None, list[str]] = None,
groups: Union[None, list[str]] = None,
timetoken: Union[None, str] = None,
region: Union[None, int] = None,
attempts: Union[None, int] = None,
reason: Union[None, PubNubException] = None,
) -> None:
self.channels = channels
self.groups = groups
self.attempts = attempts
self.reason = reason
self.timetoken = timetoken
self.region = region
class HandshakeReconnectEffect(ReconnectEffect):
pass
class CancelHandshakeReconnectEffect(PNCancelEffect):
cancel_effect = HandshakeReconnectEffect.__name__
class ReceiveReconnectEffect(ReconnectEffect):
pass
class CancelReceiveReconnectEffect(PNCancelEffect):
cancel_effect = ReceiveReconnectEffect.__name__
class PNEmittableEffect(PNEffect):
pass
class EmitMessagesEffect(PNEmittableEffect):
def __init__(self, messages: Union[None, list[str]]) -> None:
super().__init__()
self.messages = messages
class EmitStatusEffect(PNEmittableEffect):
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
super().__init__()
self.status = status
|