File: statemachine.py

package info (click to toggle)
freenub 0.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,168 kB
  • sloc: python: 10,664; makefile: 7; sh: 6
file content (117 lines) | stat: -rw-r--r-- 3,987 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""StateMachine."""

import logging
from typing import Optional

from pubnub.event_engine.dispatcher import Dispatcher
from pubnub.event_engine.models import effects, events, states


class StateMachine:
    _current_state: states.PNState
    _context: states.PNContext
    _effect_list: list[effects.PNEffect]
    _enabled: bool

    def __init__(
        self,
        initial_state: states.PNState,
        dispatcher_class: Optional[Dispatcher] = None,
    ) -> None:
        self._context = states.PNContext()
        self._current_state = initial_state(self._context)
        self._listeners = {}
        self._effect_list = []
        if dispatcher_class is None:
            dispatcher_class = Dispatcher
        self._dispatcher = dispatcher_class(self)
        self._enabled = True

    def get_state_name(self):
        return self._current_state.__class__.__name__

    def get_context(self) -> states.PNContext:
        return self._current_state._context

    def get_dispatcher(self) -> Dispatcher:
        return self._dispatcher

    def trigger(self, event: events.PNEvent) -> states.PNTransition:
        logging.debug(
            f"Triggered {event.__class__.__name__}({event.__dict__}) on {self.get_state_name()}"
        )
        if not self._enabled:
            logging.error("EventEngine is not enabled")
            return False
        if event.get_name() in self._current_state._transitions:
            self._effect_list.clear()
            effect = self._current_state.on_exit()
            logging.debug(f"On exit effect: {effect.__class__.__name__}")

            if effect:
                self._effect_list.append(effect)

            transition: states.PNTransition = self._current_state.on(
                event, self._context
            )

            self._current_state = transition.state(self._current_state.get_context())

            self._context = transition.context
            if transition.effect:
                if isinstance(transition.effect, list):
                    logging.debug("unpacking list")
                    for effect in transition.effect:
                        logging.debug(f"Transition effect: {effect.__class__.__name__}")
                        self._effect_list.append(effect)
                else:
                    logging.debug(
                        f"Transition effect: {transition.effect.__class__.__name__}"
                    )
                    self._effect_list.append(transition.effect)

            effect = self._current_state.on_enter(self._context)
            if effect:
                logging.debug(f"On enter effect: {effect.__class__.__name__}")
                self._effect_list.append(effect)

        else:
            # we're ignoring events unhandled
            logging.debug(
                f"unhandled event?? {event.__class__.__name__} in {self._current_state.__class__.__name__}"
            )
            self.stop()

        self.dispatch_effects()

    def dispatch_effects(self):
        for effect in self._effect_list:
            logging.debug(f"dispatching {effect.__class__.__name__} {id(effect)}")
            self._dispatcher.dispatch_effect(effect)

        self._effect_list.clear()

    def stop(self):
        self._enabled = False


if __name__ == "__main__":
    machine = StateMachine(states.UnsubscribedState)
    logging.debug(f"machine initialized. Current state: {machine.get_state_name()}")
    effect = machine.trigger(
        events.SubscriptionChangedEvent(channels=["fail"], groups=[])
    )

    machine.add_listener(
        effects.PNEffect,
        lambda x: logging.debug(f"Catch All Logger: {effect.__dict__}"),
    )

    machine.add_listener(
        effects.EmitMessagesEffect,
    )
    effect = machine.trigger(events.DisconnectEvent())
    logging.debug(
        f"SubscriptionChangedEvent triggered with channels=[`fail`].  Curr state: {machine.get_state_name()}"
    )
    logging.debug(f"effect queue: {machine._effect_list}")