File: nickname.py

package info (click to toggle)
python-nbxmpp 6.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,340 kB
  • sloc: python: 19,639; makefile: 4
file content (142 lines) | stat: -rw-r--r-- 4,330 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of nbxmpp.
#
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import annotations

from typing import TYPE_CHECKING

from nbxmpp.const import PresenceType
from nbxmpp.modules.base import BaseModule
from nbxmpp.modules.util import finalize
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import Message
from nbxmpp.protocol import Node
from nbxmpp.protocol import Presence
from nbxmpp.structs import MessageProperties
from nbxmpp.structs import PresenceProperties
from nbxmpp.structs import StanzaHandler
from nbxmpp.task import iq_request_task

if TYPE_CHECKING:
    from nbxmpp.client import Client


class Nickname(BaseModule):

    _depends = {"publish": "PubSub"}

    def __init__(self, client: Client) -> None:
        BaseModule.__init__(self, client)

        self._client = client
        self.handlers = [
            StanzaHandler(
                name="message",
                callback=self._process_pubsub_nickname,
                ns=Namespace.PUBSUB_EVENT,
                priority=16,
            ),
            StanzaHandler(
                name="message",
                callback=self._process_nickname,
                ns=Namespace.NICK,
                priority=40,
            ),
            StanzaHandler(
                name="presence",
                callback=self._process_nickname,
                ns=Namespace.NICK,
                priority=40,
            ),
        ]

    def _process_nickname(
        self,
        _client: Client,
        stanza: Message | Presence,
        properties: MessageProperties | PresenceProperties,
    ) -> None:
        if stanza.getName() == "message":
            properties.nickname = self._parse_nickname(stanza)

        elif stanza.getName() == "presence":
            # the nickname MUST NOT be included in presence broadcasts
            # (i.e., <presence/> stanzas with no 'type' attribute or
            # of type "unavailable").
            # Usage is not recommended in MUC, but it is a workaround
            # to allow code points forbidden in resource parts in nicknames.
            if not properties.from_muc and properties.type in (
                PresenceType.AVAILABLE,
                PresenceType.UNAVAILABLE,
            ):
                return
            properties.nickname = self._parse_nickname(stanza)

    def _process_pubsub_nickname(
        self, _client: Client, _stanza: Message, properties: MessageProperties
    ) -> None:
        if not properties.is_pubsub_event:
            return

        if properties.pubsub_event.node != Namespace.NICK:
            return

        item = properties.pubsub_event.item
        if item is None:
            # Retract, Deleted or Purged
            return

        nick = self._parse_nickname(item)
        if nick is None:
            self._log.info("Received nickname: %s - nickname removed", properties.jid)
            return

        self._log.info("Received nickname: %s - %s", properties.jid, nick)
        properties.pubsub_event = properties.pubsub_event._replace(data=nick)

    @staticmethod
    def _parse_nickname(stanza: Node) -> str | None:
        nickname = stanza.getTag("nick", namespace=Namespace.NICK)
        if nickname is None:
            return None
        return nickname.getData() or None

    @iq_request_task
    def set_nickname(self, nickname: str | None, public: bool = False):
        task = yield

        access_model = "open" if public else "presence"

        options = {
            "pubsub#persist_items": "true",
            "pubsub#access_model": access_model,
        }

        item = Node("nick", {"xmlns": Namespace.NICK})
        if nickname is not None:
            item.addData(nickname)

        result = yield self.publish(
            Namespace.NICK,
            item,
            id_="current",
            options=options,
            force_node_options=True,
        )

        yield finalize(task, result)

    @iq_request_task
    def set_access_model(self, public: bool):
        task = yield

        access_model = "open" if public else "presence"

        result = yield self._client.get_module("PubSub").set_access_model(
            Namespace.NICK, access_model
        )

        yield finalize(task, result)