File: delay.py

package info (click to toggle)
python-nbxmpp 6.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,340 kB
  • sloc: python: 19,639; makefile: 4
file content (126 lines) | stat: -rw-r--r-- 3,937 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
#
# This file is part of nbxmpp.
#
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import annotations

from typing import TYPE_CHECKING

import datetime as dt
import logging

from nbxmpp.modules.base import BaseModule
from nbxmpp.modules.date_and_time import parse_datetime
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import Message
from nbxmpp.protocol import Presence
from nbxmpp.structs import MessageProperties
from nbxmpp.structs import PresenceProperties
from nbxmpp.structs import StanzaHandler

if TYPE_CHECKING:
    from nbxmpp.client import Client

log = logging.getLogger("nbxmpp.m.delay")


class Delay(BaseModule):
    def __init__(self, client: Client) -> None:
        BaseModule.__init__(self, client)

        self._client = client
        self.handlers = [
            StanzaHandler(
                name="message",
                callback=self._process_message_delay,
                ns=Namespace.DELAY2,
                priority=15,
            ),
            StanzaHandler(
                name="presence",
                callback=self._process_presence_delay,
                ns=Namespace.DELAY2,
                priority=15,
            ),
        ]

    def _process_message_delay(
        self, _client: Client, stanza: Message, properties: MessageProperties
    ) -> None:
        # Determine if delay is from the server
        # Some servers use the bare jid, others the domain
        our_jid = self._client.get_bound_jid()
        jids = [our_jid.bare, our_jid.domain]

        if properties.from_muc:
            muc_jid = properties.jid
            jids += [muc_jid.bare, muc_jid.domain]

        if properties.is_muc_subject:
            # MUC Subjects can have a delay timestamp
            # to indicate when the user has set the subject,
            # the 'from' attr on these delays is the MUC server
            # but we treat it as user timestamp
            properties.user_timestamp = parse_delay(stanza, from_=jids)

        else:
            server_delay = parse_delay(stanza, from_=jids)
            if server_delay is not None:
                properties.has_server_delay = True
                properties.timestamp = server_delay

            properties.user_timestamp = parse_delay(stanza, not_from=jids)

    @staticmethod
    def _process_presence_delay(
        _client: Client, stanza: Presence, properties: PresenceProperties
    ) -> None:
        properties.user_timestamp = parse_delay(stanza)


def parse_delay(
    stanza: Message | Presence,
    epoch: bool = True,
    convert: str = "utc",
    from_: list[str] | None = None,
    not_from: list[str] | None = None,
) -> dt.datetime | float | None:
    """
    Returns the first valid delay timestamp that matches

    :param epoch:      Returns the timestamp as epoch

    :param convert:    Converts the timestamp to either utc or local

    :param from_:      Matches only delays that have the according
                       from attr set

    :param not_from:   Matches only delays that have the according
                       from attr not set
    """
    delays = stanza.getTags("delay", namespace=Namespace.DELAY2)

    for delay in delays:
        stamp = delay.getAttr("stamp")
        if stamp is None:
            log.warning("Invalid timestamp received: %s", stamp)
            log.warning(stanza)
            continue

        delay_from = delay.getAttr("from")
        if from_ is not None:
            if delay_from not in from_:
                continue
        if not_from is not None:
            if delay_from in not_from:
                continue

        timestamp = parse_datetime(stamp, check_utc=True, epoch=epoch, convert=convert)
        if timestamp is None:
            log.warning("Invalid timestamp received: %s", stamp)
            log.warning(stanza)
            continue

        return timestamp