File: builders.py

package info (click to toggle)
freenub 0.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,168 kB
  • sloc: python: 10,664; makefile: 7; sh: 6
file content (69 lines) | stat: -rw-r--r-- 2,001 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from abc import ABCMeta, abstractmethod

from . import utils
from .dtos import SubscribeOperation, UnsubscribeOperation


class PubSubBuilder:
    __metaclass__ = ABCMeta

    def __init__(self, subscription_manager):
        self._subscription_manager = subscription_manager
        self._channel_subscriptions = []
        self._channel_group_subscriptions = []

    # TODO: make the 'channel' alias
    def channels(self, channels_list):
        utils.extend_list(self._channel_subscriptions, channels_list)

        return self

    def channel_groups(self, channel_groups_list):
        utils.extend_list(self._channel_group_subscriptions, channel_groups_list)

        return self

    @abstractmethod
    def execute(self):
        pass


class SubscribeBuilder(PubSubBuilder):
    def __init__(self, subscription_manager):
        super().__init__(subscription_manager)
        self._presence_enabled = False
        self._timetoken = 0

    def with_presence(self):
        self._presence_enabled = True
        return self

    def with_timetoken(self, timetoken):
        self._timetoken = timetoken
        return self

    def channel_subscriptions(self):
        return self._channel_subscriptions

    def channel_group_subscriptions(self):
        return self._channel_group_subscriptions

    def execute(self):
        subscribe_operation = SubscribeOperation(
            channels=self._channel_subscriptions,
            channel_groups=self._channel_group_subscriptions,
            timetoken=self._timetoken,
            presence_enabled=self._presence_enabled,
        )

        self._subscription_manager.adapt_subscribe_builder(subscribe_operation)


class UnsubscribeBuilder(PubSubBuilder):
    def execute(self):
        unsubscribe_operation = UnsubscribeOperation(
            channels=self._channel_subscriptions,
            channel_groups=self._channel_group_subscriptions,
        )

        self._subscription_manager.adapt_unsubscribe_builder(unsubscribe_operation)