File: workers.py

package info (click to toggle)
freenub 0.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,168 kB
  • sloc: python: 10,664; makefile: 7; sh: 6
file content (187 lines) | stat: -rw-r--r-- 7,675 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import logging
from abc import abstractmethod

from .endpoints.file_operations.get_file_url import GetFileDownloadUrl
from .enums import PNOperationType, PNStatusCategory
from .models.consumer.common import PNStatus
from .models.consumer.objects_v2.channel import PNChannelMetadataResult
from .models.consumer.objects_v2.memberships import PNMembershipResult
from .models.consumer.objects_v2.uuid import PNUUIDMetadataResult
from .models.consumer.pn_error_data import PNErrorData
from .models.consumer.pubsub import (
    PNFileMessageResult,
    PNMessageActionResult,
    PNMessageResult,
    PNPresenceEventResult,
    PNSignalMessageResult,
)
from .models.server.subscribe import PresenceEnvelope, SubscribeMessage
from .utils import strip_right

logger = logging.getLogger("pubnub")


class SubscribeMessageWorker:
    TYPE_MESSAGE = 0
    TYPE_SIGNAL = 1
    TYPE_OBJECT = 2
    TYPE_MESSAGE_ACTION = 3
    TYPE_FILE_MESSAGE = 4

    def __init__(
        self, pubnub_instance, listener_manager_instance, queue_instance, event
    ):
        # assert isinstance(pubnub_instnace, PubNubCore)
        # assert isinstance(listener_manager_instance, ListenerManager)
        # assert isinstance(queue_instance, utils.Queue)

        self._pubnub = pubnub_instance
        self._listener_manager = listener_manager_instance
        self._queue = queue_instance
        self._is_running = None
        self._event = event

    def run(self):
        self._take_message()

    @abstractmethod
    def _take_message(self):
        pass

    def _get_url_for_file_event_message(self, channel, extracted_message):
        return (
            GetFileDownloadUrl(self._pubnub)
            .channel(channel)
            .file_name(extracted_message["file"]["name"])
            .file_id(extracted_message["file"]["id"])
            .get_complete_url()
        )

    def _process_message(self, message_input):
        if self._pubnub.config.cipher_key is None:
            return message_input
        else:
            try:
                return self._pubnub.config.crypto.decrypt(
                    self._pubnub.config.cipher_key, message_input
                )
            except Exception as exception:
                logger.warning(
                    'could not decrypt message: "%s", due to error %s'
                    % (message_input, str(exception))
                )
                pn_status = PNStatus()
                pn_status.category = PNStatusCategory.PNDecryptionErrorCategory
                pn_status.error_data = PNErrorData(str(exception), exception)
                pn_status.error = True
                pn_status.operation = PNOperationType.PNSubscribeOperation
                self._listener_manager.announce_status(pn_status)
                return message_input

    def _process_incoming_payload(self, message):
        assert isinstance(message, SubscribeMessage)

        channel = message.channel
        subscription_match = message.subscription_match
        publish_meta_data = message.publish_metadata

        if channel is not None and channel == subscription_match:
            subscription_match = None

        if "-pnpres" in message.channel:
            presence_payload = PresenceEnvelope.from_json_payload(message.payload)

            stripped_presence_channel = None
            stripped_presence_subscription = None

            if channel is not None:
                stripped_presence_channel = strip_right(channel, "-pnpres")

            if subscription_match is not None:
                stripped_presence_subscription = strip_right(
                    subscription_match, "-pnpres"
                )

            pn_presence_event_result = PNPresenceEventResult(
                event=presence_payload.action,
                channel=stripped_presence_channel,
                subscription=stripped_presence_subscription,
                timetoken=publish_meta_data.publish_timetoken,
                occupancy=presence_payload.occupancy,
                uuid=presence_payload.uuid,
                timestamp=presence_payload.timestamp,
                state=presence_payload.data,
                join=message.payload.get("join", None),
                leave=message.payload.get("leave", None),
                timeout=message.payload.get("timeout", None),
            )
            self._listener_manager.announce_presence(pn_presence_event_result)
        elif message.type == SubscribeMessageWorker.TYPE_OBJECT:
            if message.payload["type"] == "channel":
                channel_result = PNChannelMetadataResult(
                    event=message.payload["event"], data=message.payload["data"]
                )
                self._listener_manager.announce_channel(channel_result)
            elif message.payload["type"] == "uuid":
                uuid_result = PNUUIDMetadataResult(
                    event=message.payload["event"], data=message.payload["data"]
                )
                self._listener_manager.announce_uuid(uuid_result)
            elif message.payload["type"] == "membership":
                membership_result = PNMembershipResult(
                    event=message.payload["event"], data=message.payload["data"]
                )
                self._listener_manager.announce_membership(membership_result)
        elif message.type == SubscribeMessageWorker.TYPE_FILE_MESSAGE:
            extracted_message = self._process_message(message.payload)
            download_url = self._get_url_for_file_event_message(
                channel, extracted_message
            )

            pn_file_result = PNFileMessageResult(
                message=extracted_message.get("message"),
                channel=channel,
                subscription=subscription_match,
                timetoken=publish_meta_data.publish_timetoken,
                publisher=message.issuing_client_id,
                file_url=download_url,
                file_id=extracted_message["file"]["id"],
                file_name=extracted_message["file"]["name"],
            )

            self._listener_manager.announce_file_message(pn_file_result)

        else:
            extracted_message = self._process_message(message.payload)
            publisher = message.issuing_client_id

            if extracted_message is None:
                logger.debug("unable to parse payload on #processIncomingMessages")

            if message.type == SubscribeMessageWorker.TYPE_SIGNAL:
                pn_signal_result = PNSignalMessageResult(
                    message=extracted_message,
                    channel=channel,
                    subscription=subscription_match,
                    timetoken=publish_meta_data.publish_timetoken,
                    publisher=publisher,
                )
                self._listener_manager.announce_signal(pn_signal_result)

            elif message.type == SubscribeMessageWorker.TYPE_MESSAGE_ACTION:
                message_action = extracted_message["data"]
                if "uuid" not in message_action:
                    message_action["uuid"] = publisher

                message_action_result = PNMessageActionResult(message_action)
                self._listener_manager.announce_message_action(message_action_result)

            else:
                pn_message_result = PNMessageResult(
                    message=extracted_message,
                    channel=channel,
                    subscription=subscription_match,
                    timetoken=publish_meta_data.publish_timetoken,
                    publisher=publisher,
                )
                self._listener_manager.announce_message(pn_message_result)