1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of nbxmpp.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
from typing import TYPE_CHECKING
from nbxmpp.modules.base import BaseModule
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import Iq
from nbxmpp.protocol import NodeProcessed
from nbxmpp.protocol import Presence
from nbxmpp.structs import DiscoIdentity
from nbxmpp.structs import DiscoInfo
from nbxmpp.structs import EntityCapsData
from nbxmpp.structs import IqProperties
from nbxmpp.structs import PresenceProperties
from nbxmpp.structs import StanzaHandler
from nbxmpp.util import compute_caps_hash
if TYPE_CHECKING:
from nbxmpp.client import Client
class EntityCaps(BaseModule):
def __init__(self, client: Client) -> None:
BaseModule.__init__(self, client)
self._client = client
self.handlers = [
StanzaHandler(
name="presence",
callback=self._process_entity_caps,
ns=Namespace.CAPS,
priority=15,
),
StanzaHandler(
name="iq",
callback=self._process_disco_info,
typ="get",
ns=Namespace.DISCO_INFO,
priority=20,
),
]
self._identities = []
self._features = []
self._uri: str | None = None
self._node: str | None = None
self._caps: DiscoInfo | None = None
self._caps_hash: str | None = None
def _process_disco_info(
self, client: Client, stanza: Iq, _properties: IqProperties
) -> None:
if self._caps is None:
return
node = stanza.getQuerynode()
if node is not None:
if self._node != node:
return
iq = stanza.buildReply("result")
if node is not None:
iq.setQuerynode(node)
query = iq.getQuery()
for identity in self._caps.identities:
query.addChild(node=identity.get_node())
for feature in self._caps.features:
query.addChild("feature", attrs={"var": feature})
self._log.info("Respond with disco info")
client.send_stanza(iq)
raise NodeProcessed
def _process_entity_caps(
self, _client: Client, stanza: Presence, properties: PresenceProperties
) -> None:
caps = stanza.getTag("c", namespace=Namespace.CAPS)
if caps is None:
return
hash_algo = caps.getAttr("hash")
if hash_algo != "sha-1":
self._log.warning("Unsupported hashing algorithm used: %s", hash_algo)
self._log.warning(stanza)
return
node = caps.getAttr("node")
if not node:
self._log.warning("node attribute missing")
self._log.warning(stanza)
return
ver = caps.getAttr("ver")
if not ver:
self._log.warning("ver attribute missing")
self._log.warning(stanza)
return
properties.entity_caps = EntityCapsData(hash=hash_algo, node=node, ver=ver)
@property
def caps(self) -> EntityCapsData | None:
if self._caps is None:
return None
assert self._uri is not None
assert self._caps_hash is not None
return EntityCapsData(hash="sha-1", node=self._uri, ver=self._caps_hash)
def set_caps(
self, identities: list[DiscoIdentity], features: list[str], uri: str
) -> None:
self._uri = uri
self._caps = DiscoInfo(None, identities, features, [])
self._caps_hash = compute_caps_hash(self._caps, compare=False)
self._node = "%s#%s" % (uri, self._caps_hash)
|