File: mds.py

package info (click to toggle)
python-nbxmpp 6.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,340 kB
  • sloc: python: 19,639; makefile: 4
file content (150 lines) | stat: -rw-r--r-- 4,317 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from __future__ import annotations

from typing import TYPE_CHECKING

from nbxmpp.errors import MalformedStanzaError
from nbxmpp.modules.base import BaseModule
from nbxmpp.modules.util import finalize
from nbxmpp.modules.util import raise_if_error
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import JID
from nbxmpp.protocol import Message
from nbxmpp.protocol import Node
from nbxmpp.protocol import NodeProcessed
from nbxmpp.structs import MDSData
from nbxmpp.structs import MessageProperties
from nbxmpp.structs import StanzaHandler
from nbxmpp.task import iq_request_task

if TYPE_CHECKING:
    from nbxmpp.client import Client

MDS_OPTIONS = {
    "pubsub#persist_items": "true",
    "pubsub#max_items": "max",
    "pubsub#send_last_published_item": "never",
    "pubsub#access_model": "whitelist",
}


class MDS(BaseModule):
    """
    XEP-0490
    """

    _depends = {"publish": "PubSub"}

    def __init__(self, client: Client) -> None:
        BaseModule.__init__(self, client)

        self._client = client
        self.handlers = [
            StanzaHandler(
                name="message",
                callback=self._process_pubsub_mds,
                ns=Namespace.PUBSUB_EVENT,
                priority=16,
            ),
        ]

    def _process_pubsub_mds(
        self, _client: Client, stanza: Message, properties: MessageProperties
    ) -> None:
        if not properties.is_pubsub_event:
            return

        if properties.pubsub_event.node != Namespace.MDS:
            return

        item = properties.pubsub_event.item
        if item is None:
            return

        try:
            data = self._parse_item(item)
        except MalformedStanzaError as error:
            self._log.warning(error)
            self._log.warning(stanza)
            raise NodeProcessed

        pubsub_event = properties.pubsub_event._replace(data=data)
        self._log.info("Received MDS: %s", data)

        properties.pubsub_event = pubsub_event

    @iq_request_task
    def set_mds(self, jid: JID, stanza_id: str, by: JID | None = None):
        task = yield

        if by is None:
            own_jid = self._client.get_bound_jid()
            assert own_jid is not None
            by = own_jid.new_as_bare()

        displayed = Node("displayed", {"xmlns": Namespace.MDS})
        displayed.addChild(
            "stanza-id", namespace=Namespace.SID, attrs={"id": stanza_id, "by": str(by)}
        )

        result = yield self.publish(
            Namespace.MDS,
            displayed,
            id_=str(jid),
            options=MDS_OPTIONS,
            force_node_options=True,
        )

        yield finalize(task, result)

    @iq_request_task
    def get_mds(self):
        task = yield

        items = yield self.request_items(Namespace.MDS)

        raise_if_error(items)

        if not items:
            yield task.set_result(None)

        data: list[MDSData] = []
        for item in items:
            try:
                data.append(self._parse_item(item))
            except MalformedStanzaError as error:
                self._log.warning(error)
                self._log.warning(item)

        yield data

    def _parse_item(self, item: Node) -> MDSData:

        item_id = item.getAttr("id")
        try:
            jid = JID.from_string(item_id)
        except Exception as e:
            raise MalformedStanzaError(
                f'MDS item ID "{item_id}" is not a valid JID: {e}', item
            )

        displayed = item.getTag("displayed", namespace=Namespace.MDS)
        if not displayed:
            raise MalformedStanzaError("Bad MDS event (no displayed tag)", item)

        sid = displayed.getTag("stanza-id", namespace=Namespace.SID)
        if sid is None:
            raise MalformedStanzaError("Bad MDS event (no stanza-id tag)", item)

        stanza_id = sid.getAttr("id")
        if not stanza_id:
            raise MalformedStanzaError("Bad MDS event (no stanza-id ID)", item)

        by = sid.getAttr("by")
        try:
            stanza_id_by = JID.from_string(by)
        except Exception as e:
            raise MalformedStanzaError(
                f'MDS stanza-id by "{by}" is not a valid JID: {e}', item
            )

        return MDSData(jid=jid, stanza_id=stanza_id, stanza_id_by=stanza_id_by)