File: test_telegramobject.py

package info (click to toggle)
python-telegram-bot 22.3-1
  • links: PTS
  • area: main
  • in suites: sid
  • size: 11,060 kB
  • sloc: python: 90,298; makefile: 176; sh: 4
file content (589 lines) | stat: -rw-r--r-- 23,036 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2025
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program.  If not, see [http://www.gnu.org/licenses/].
import datetime as dtm
import inspect
import pickle
import re
from copy import deepcopy
from pathlib import Path
from types import MappingProxyType

import pytest

from telegram import Bot, BotCommand, Chat, Message, PhotoSize, TelegramObject, User
from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue
from telegram.ext import PicklePersistence
from telegram.warnings import PTBUserWarning
from tests.auxil.files import data_file
from tests.auxil.slots import mro_slots


def all_subclasses(cls):
    # Gets all subclasses of the specified object, recursively. from
    # https://stackoverflow.com/a/3862957/9706202
    # also includes the class itself
    return (
        set(cls.__subclasses__())
        .union([s for c in cls.__subclasses__() for s in all_subclasses(c)])
        .union({cls})
    )


TO_SUBCLASSES = sorted(all_subclasses(TelegramObject), key=lambda cls: cls.__name__)


class TestTelegramObject:
    class Sub(TelegramObject):
        def __init__(self, private, normal, b):
            super().__init__()
            self._private = private
            self.normal = normal
            self._bot = b

    class ChangingTO(TelegramObject):
        # Don't use in any tests, this is just for testing the pickle behaviour and the
        # class is altered during the test procedure
        pass

    def test_to_json(self, monkeypatch):
        class Subclass(TelegramObject):
            def __init__(self):
                super().__init__()
                self.arg = "arg"
                self.arg2 = ["arg2", "arg2"]
                self.arg3 = {"arg3": "arg3"}
                self.empty_tuple = ()

        json = Subclass().to_json()
        # Order isn't guarantied
        assert '"arg": "arg"' in json
        assert '"arg2": ["arg2", "arg2"]' in json
        assert '"arg3": {"arg3": "arg3"}' in json
        assert "empty_tuple" not in json

        # Now make sure that it doesn't work with not json stuff and that it fails loudly
        # Tuples aren't allowed as keys in json
        d = {("str", "str"): "str"}

        monkeypatch.setattr("telegram.TelegramObject.to_dict", lambda _: d)
        with pytest.raises(TypeError):
            TelegramObject().to_json()

    def test_de_json_api_kwargs(self, bot):
        to = TelegramObject.de_json(data={"foo": "bar"}, bot=bot)
        assert to.api_kwargs == {"foo": "bar"}
        assert to.get_bot() is bot

    def test_de_json_optional_bot(self):
        to = TelegramObject.de_json(data={})
        with pytest.raises(RuntimeError, match="no bot associated with it"):
            to.get_bot()

    def test_de_list(self, bot):
        class SubClass(TelegramObject):
            def __init__(self, arg: int, **kwargs):
                super().__init__(**kwargs)
                self.arg = arg

                self._id_attrs = (self.arg,)

        assert SubClass.de_list([{"arg": 1}, {"arg": 2}], bot) == (
            SubClass(1),
            SubClass(2),
        )

    def test_api_kwargs_read_only(self):
        tg_object = TelegramObject(api_kwargs={"foo": "bar"})
        tg_object._freeze()
        assert isinstance(tg_object.api_kwargs, MappingProxyType)
        with pytest.raises(TypeError):
            tg_object.api_kwargs["foo"] = "baz"
        with pytest.raises(AttributeError, match="can't be set"):
            tg_object.api_kwargs = {"foo": "baz"}

    @pytest.mark.parametrize("cls", TO_SUBCLASSES, ids=[cls.__name__ for cls in TO_SUBCLASSES])
    def test_subclasses_have_api_kwargs(self, cls):
        """Checks that all subclasses of TelegramObject have an api_kwargs argument that is
        kw-only. Also, tries to check that this argument is passed to super - by checking that
        the `__init__` contains `api_kwargs=api_kwargs`
        """
        if issubclass(cls, Bot):
            # Bot doesn't have api_kwargs, because it's not defined by TG
            return

        # only relevant for subclasses that have their own init
        if inspect.getsourcefile(cls.__init__) != inspect.getsourcefile(cls):
            return

        # Ignore classes in the test directory
        source_file = Path(inspect.getsourcefile(cls))
        parents = source_file.parents
        is_test_file = Path(__file__).parent.resolve() in parents
        if is_test_file:
            return

        # check the signature first
        signature = inspect.signature(cls)
        assert signature.parameters.get("api_kwargs").kind == inspect.Parameter.KEYWORD_ONLY

        # Now check for `api_kwargs=api_kwargs` in the source code of `__init__`
        if cls is TelegramObject:
            # TelegramObject doesn't have a super class
            return
        assert "api_kwargs=api_kwargs" in inspect.getsource(
            cls.__init__
        ), f"{cls.__name__} doesn't seem to pass `api_kwargs` to `super().__init__`"

    def test_de_json_arbitrary_exceptions(self, bot):
        class SubClass(TelegramObject):
            def __init__(self, **kwargs):
                super().__init__(**kwargs)
                raise TypeError("This is a test")

        with pytest.raises(TypeError, match="This is a test"):
            SubClass.de_json({}, bot)

    def test_to_dict_private_attribute(self):
        class TelegramObjectSubclass(TelegramObject):
            __slots__ = ("_b", "a")  # Added slots so that the attrs are converted to dict

            def __init__(self):
                super().__init__()
                self.a = 1
                self._b = 2

        subclass_instance = TelegramObjectSubclass()
        assert subclass_instance.to_dict() == {"a": 1}

    def test_to_dict_api_kwargs(self):
        to = TelegramObject(api_kwargs={"foo": "bar"})
        assert to.to_dict() == {"foo": "bar"}

    def test_to_dict_missing_attribute(self):
        message = Message(1, dtm.datetime.now(), Chat(1, "private"), from_user=User(1, "", False))
        message._unfreeze()
        del message.chat

        message_dict = message.to_dict()
        assert "chat" not in message_dict

        message_dict = message.to_dict(recursive=False)
        assert message_dict["chat"] is None

    def test_to_dict_recursion(self):
        class Recursive(TelegramObject):
            __slots__ = ("recursive",)

            def __init__(self):
                super().__init__()
                self.recursive = "recursive"

        class SubClass(TelegramObject):
            """This class doesn't have `__slots__`, so has `__dict__` instead."""

            def __init__(self):
                super().__init__()
                self.subclass = Recursive()

        to = SubClass()
        to_dict_no_recurse = to.to_dict(recursive=False)
        assert to_dict_no_recurse
        assert isinstance(to_dict_no_recurse["subclass"], Recursive)
        to_dict_recurse = to.to_dict(recursive=True)
        assert to_dict_recurse
        assert isinstance(to_dict_recurse["subclass"], dict)
        assert to_dict_recurse["subclass"]["recursive"] == "recursive"

    def test_to_dict_default_value(self):
        class SubClass(TelegramObject):
            def __init__(self):
                super().__init__()
                self.default_none = DEFAULT_NONE
                self.default_false = DEFAULT_FALSE

        to = SubClass()
        to_dict = to.to_dict()
        assert "default_none" not in to_dict
        assert to_dict["default_false"] is False

    def test_slot_behaviour(self):
        inst = TelegramObject()
        for attr in inst.__slots__:
            assert getattr(inst, attr, "err") != "err", f"got extra slot '{attr}'"
        assert len(mro_slots(inst)) == len(set(mro_slots(inst))), "duplicate slot"

    def test_meaningless_comparison(self, recwarn):
        expected_warning = "Objects of type TGO can not be meaningfully tested for equivalence."

        class TGO(TelegramObject):
            pass

        a = TGO()
        b = TGO()
        assert a == b
        assert len(recwarn) == 1
        assert str(recwarn[0].message) == expected_warning
        assert recwarn[0].category is PTBUserWarning
        assert recwarn[0].filename == __file__, "wrong stacklevel"

    def test_meaningful_comparison(self, recwarn):
        class TGO(TelegramObject):
            def __init__(self):
                self._id_attrs = (1,)

        a = TGO()
        b = TGO()
        assert a == b
        assert len(recwarn) == 0
        assert b == a
        assert len(recwarn) == 0

    def test_bot_instance_none(self):
        tg_object = TelegramObject()
        with pytest.raises(RuntimeError):
            tg_object.get_bot()

    @pytest.mark.parametrize("bot_inst", ["bot", None])
    def test_bot_instance_states(self, bot_inst):
        tg_object = TelegramObject()
        tg_object.set_bot("bot" if bot_inst == "bot" else bot_inst)
        if bot_inst == "bot":
            assert tg_object.get_bot() == "bot"
        elif bot_inst is None:
            with pytest.raises(RuntimeError):
                tg_object.get_bot()

    def test_subscription(self):
        # We test with Message because that gives us everything we want to test - easier than
        # implementing a custom subclass just for this test
        chat = Chat(2, Chat.PRIVATE)
        user = User(3, "first_name", False)
        message = Message(1, None, chat=chat, from_user=user, text="foobar")
        assert message["text"] == "foobar"
        assert message["chat"] is chat
        assert message["chat_id"] == 2
        assert message["from"] is user
        assert message["from_user"] is user
        with pytest.raises(KeyError, match="Message don't have an attribute called `no_key`"):
            message["no_key"]

    def test_pickle(self, bot):
        chat = Chat(2, Chat.PRIVATE)
        user = User(3, "first_name", False)
        date = dtm.datetime.now()
        photo = PhotoSize("file_id", "unique", 21, 21)
        photo.set_bot(bot)
        msg = Message(
            1,
            date,
            chat,
            from_user=user,
            text="foobar",
            photo=[photo],
            animation=DEFAULT_NONE,
            api_kwargs={"api": "kwargs"},
        )
        msg.set_bot(bot)

        # Test pickling of TGObjects, we choose Message since it's contains the most subclasses.
        assert msg.get_bot()
        unpickled = pickle.loads(pickle.dumps(msg))

        with pytest.raises(RuntimeError):
            unpickled.get_bot()  # There should be no bot when we pickle TGObjects

        assert unpickled.chat == chat, f"{unpickled.chat._id_attrs} != {chat._id_attrs}"
        assert unpickled.from_user == user
        assert unpickled.date == date, f"{unpickled.date} != {date}"
        assert unpickled.photo[0] == photo
        assert isinstance(unpickled.animation, DefaultValue)
        assert unpickled.animation.value is None
        assert isinstance(unpickled.api_kwargs, MappingProxyType)
        assert unpickled.api_kwargs == {"api": "kwargs"}

    def test_pickle_apply_api_kwargs(self):
        """Makes sure that when a class gets new attributes, the api_kwargs are moved to the
        new attributes on unpickling."""
        obj = self.ChangingTO(api_kwargs={"foo": "bar"})
        pickled = pickle.dumps(obj)

        self.ChangingTO.foo = None
        obj = pickle.loads(pickled)

        assert obj.foo == "bar"
        assert obj.api_kwargs == {}

    async def test_pickle_backwards_compatibility(self):
        """Test when newer versions of the library remove or add attributes from classes (which
        the old pickled versions still/don't have).
        """
        # We use a modified version of the 20.0a5 Chat class, which
        # * has an `all_members_are_admins` attribute,
        # * a non-empty `api_kwargs` dict
        # * does not have the `is_forum` attribute
        # This specific version was pickled
        # using PicklePersistence.update_chat_data and that's what we use here to test if
        # * the (now) removed attribute `all_members_are_admins` was added to api_kwargs
        # * the (now) added attribute `is_forum` does not affect the unpickling
        pp = PicklePersistence(data_file("20a5_modified_chat.pickle"))
        chat = (await pp.get_chat_data())[1]
        assert chat.id == 1
        assert chat.type == Chat.PRIVATE
        api_kwargs_expected = {
            "all_members_are_administrators": True,
            "something": "Manually inserted",
        }
        # There are older attrs in Chat's api_kwargs which are present but we don't care about them
        for k, v in api_kwargs_expected.items():
            assert chat.api_kwargs[k] == v

        with pytest.raises(AttributeError):
            # removed attribute should not be available as attribute, only though api_kwargs
            chat.all_members_are_administrators
        with pytest.raises(AttributeError):
            # New attribute should not be available either as is always the case for pickle
            chat.is_forum

        # Ensure that loading objects that were pickled before attributes were made immutable
        # are still mutable
        chat.id = 7
        assert chat.id == 7

    def test_pickle_handle_properties(self):
        # Very hard to properly test, can't use a pickle file since newer versions of the library
        # will stop having the property.
        # The code below uses exec statements to simulate library changes. There is no other way
        # to test this.
        # Original class:
        v1 = """
class PicklePropertyTest(TelegramObject):
    __slots__ = ("forward_from", "to_be_removed", "forward_date")
    def __init__(self, forward_from=None, forward_date=None, api_kwargs=None):
        super().__init__(api_kwargs=api_kwargs)
        self.forward_from = forward_from
        self.forward_date = forward_date
        self.to_be_removed = "to_be_removed"
"""
        exec(v1, globals(), None)
        old = PicklePropertyTest("old_val", "date", api_kwargs={"new_attr": 1})  # noqa: F821
        pickled_v1 = pickle.dumps(old)

        # After some API changes:
        v2 = """
class PicklePropertyTest(TelegramObject):
    __slots__ = ("_forward_from", "_date", "_new_attr")
    def __init__(self, forward_from=None, f_date=None, new_attr=None, api_kwargs=None):
        super().__init__(api_kwargs=api_kwargs)
        self._forward_from = forward_from
        self.f_date = f_date
        self._new_attr = new_attr
    @property
    def forward_from(self):
        return self._forward_from
    @property
    def forward_date(self):
        return self.f_date
    @property
    def new_attr(self):
        return self._new_attr
        """
        exec(v2, globals(), None)
        v2_unpickle = pickle.loads(pickled_v1)
        assert v2_unpickle.forward_from == "old_val" == v2_unpickle._forward_from
        with pytest.raises(AttributeError):
            # New attribute should not be available either as is always the case for pickle
            v2_unpickle.forward_date
        assert v2_unpickle.new_attr == 1 == v2_unpickle._new_attr
        assert not hasattr(v2_unpickle, "to_be_removed")
        assert v2_unpickle.api_kwargs == {"to_be_removed": "to_be_removed"}
        pickled_v2 = pickle.dumps(v2_unpickle)

        # After PTB removes the property and the attribute:
        v3 = """
class PicklePropertyTest(TelegramObject):
    __slots__ = ()
    def __init__(self, api_kwargs=None):
        super().__init__(api_kwargs=api_kwargs)
"""
        exec(v3, globals(), None)
        v3_unpickle = pickle.loads(pickled_v2)
        assert v3_unpickle.api_kwargs == {"to_be_removed": "to_be_removed"}
        assert not hasattr(v3_unpickle, "_forward_from")
        assert not hasattr(v3_unpickle, "_new_attr")

    def test_deepcopy_telegram_obj(self, bot):
        chat = Chat(2, Chat.PRIVATE)
        user = User(3, "first_name", False)
        date = dtm.datetime.now()
        photo = PhotoSize("file_id", "unique", 21, 21)
        photo.set_bot(bot)
        msg = Message(
            1, date, chat, from_user=user, text="foobar", photo=[photo], api_kwargs={"foo": "bar"}
        )
        msg.set_bot(bot)

        new_msg = deepcopy(msg)

        assert new_msg == msg
        assert new_msg is not msg

        # The same bot should be present when deepcopying.
        assert new_msg.get_bot() == bot
        assert new_msg.get_bot() is bot

        assert new_msg.date == date
        assert new_msg.date is not date
        assert new_msg.chat == chat
        assert new_msg.chat is not chat
        assert new_msg.from_user == user
        assert new_msg.from_user is not user
        assert new_msg.photo[0] == photo
        assert new_msg.photo[0] is not photo
        assert new_msg.api_kwargs == {"foo": "bar"}
        assert new_msg.api_kwargs is not msg.api_kwargs

        # check that deepcopy preserves the freezing status
        with pytest.raises(
            AttributeError, match="Attribute `text` of class `Message` can't be set!"
        ):
            new_msg.text = "new text"

        msg._unfreeze()
        new_message = deepcopy(msg)
        new_message.text = "new text"
        assert new_message.text == "new text"

    def test_deepcopy_subclass_telegram_obj(self, bot):
        s = self.Sub("private", "normal", bot)
        d = deepcopy(s)
        assert d is not s
        assert d._private == s._private  # Can't test for identity since two equal strings is True
        assert d._bot == s._bot
        assert d._bot is s._bot
        assert d.normal == s.normal

    def test_string_representation(self):
        class TGO(TelegramObject):
            def __init__(self, api_kwargs=None):
                super().__init__(api_kwargs=api_kwargs)
                self.string_attr = "string"
                self.int_attr = 42
                self.to_attr = BotCommand("command", "description")
                self.list_attr = [
                    BotCommand("command_1", "description_1"),
                    BotCommand("command_2", "description_2"),
                ]
                self.dict_attr = {
                    BotCommand("command_1", "description_1"): BotCommand(
                        "command_2", "description_2"
                    )
                }
                self.empty_tuple_attrs = ()
                self.empty_str_attribute = ""
                # Should not be included in string representation
                self.none_attr = None

        expected_without_api_kwargs = (
            "TGO(dict_attr={BotCommand(command='command_1', description='description_1'): "
            "BotCommand(command='command_2', description='description_2')}, int_attr=42, "
            "list_attr=[BotCommand(command='command_1', description='description_1'), "
            "BotCommand(command='command_2', description='description_2')], "
            "string_attr='string', to_attr=BotCommand(command='command', "
            "description='description'))"
        )
        assert str(TGO()) == expected_without_api_kwargs
        assert repr(TGO()) == expected_without_api_kwargs

        expected_with_api_kwargs = (
            "TGO(api_kwargs={'foo': 'bar'}, dict_attr={BotCommand(command='command_1', "
            "description='description_1'): BotCommand(command='command_2', "
            "description='description_2')}, int_attr=42, "
            "list_attr=[BotCommand(command='command_1', description='description_1'), "
            "BotCommand(command='command_2', description='description_2')], "
            "string_attr='string', to_attr=BotCommand(command='command', "
            "description='description'))"
        )
        assert str(TGO(api_kwargs={"foo": "bar"})) == expected_with_api_kwargs
        assert repr(TGO(api_kwargs={"foo": "bar"})) == expected_with_api_kwargs

    @pytest.mark.parametrize("cls", TO_SUBCLASSES, ids=[cls.__name__ for cls in TO_SUBCLASSES])
    def test_subclasses_are_frozen(self, cls):
        if cls is TelegramObject or cls.__name__.startswith("_"):
            # Protected classes don't need to be frozen and neither does the base class
            return

        # instantiating each subclass would be tedious as some attributes require special init
        # args. So we inspect the code instead.

        source_file = inspect.getsourcefile(cls.__init__)
        parents = Path(source_file).parents
        is_test_file = Path(__file__).parent.resolve() in parents

        if is_test_file:
            # If the class is defined in a test file, we don't want to test it.
            return

        if source_file.endswith("telegramobject.py"):
            pytest.fail(
                f"{cls.__name__} does not have its own `__init__` "
                "and can therefore not be frozen correctly"
            )

        source_lines, _ = inspect.getsourcelines(cls.__init__)

        # We use regex matching since a simple "if self._freeze() in source_lines[-1]" would also
        # allo commented lines.
        last_line_freezes = re.match(r"\s*self\.\_freeze\(\)", source_lines[-1])
        uses_with_unfrozen = re.search(
            r"\n\s*with self\.\_unfrozen\(\)\:", inspect.getsource(cls.__init__)
        )

        assert last_line_freezes or uses_with_unfrozen, f"{cls.__name__} is not frozen correctly"

    def test_freeze_unfreeze(self):
        class TestSub(TelegramObject):
            def __init__(self):
                super().__init__()
                self._protected = True
                self.public = True
                self._freeze()

        foo = TestSub()
        foo._protected = False
        assert foo._protected is False

        with pytest.raises(
            AttributeError, match="Attribute `public` of class `TestSub` can't be set!"
        ):
            foo.public = False

        with pytest.raises(
            AttributeError, match="Attribute `public` of class `TestSub` can't be deleted!"
        ):
            del foo.public

        foo._unfreeze()
        foo._protected = True
        assert foo._protected is True
        foo.public = False
        assert foo.public is False
        del foo.public
        del foo._protected
        assert not hasattr(foo, "public")
        assert not hasattr(foo, "_protected")