#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2025
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program.  If not, see [http://www.gnu.org/licenses/].
import datetime as dtm
import inspect
import pickle
import re
from copy import deepcopy
from pathlib import Path
from types import MappingProxyType

import pytest

from telegram import Bot, BotCommand, Chat, Message, PhotoSize, TelegramObject, User
from telegram._utils.defaultvalue import DEFAULT_FALSE, DEFAULT_NONE, DefaultValue
from telegram.ext import PicklePersistence
from telegram.warnings import PTBUserWarning
from tests.auxil.files import data_file
from tests.auxil.slots import mro_slots


def all_subclasses(cls):
    # Gets all subclasses of the specified object, recursively. from
    # https://stackoverflow.com/a/3862957/9706202
    # also includes the class itself
    return (
        set(cls.__subclasses__())
        .union([s for c in cls.__subclasses__() for s in all_subclasses(c)])
        .union({cls})
    )


TO_SUBCLASSES = sorted(all_subclasses(TelegramObject), key=lambda cls: cls.__name__)


class TestTelegramObject:
    class Sub(TelegramObject):
        def __init__(self, private, normal, b):
            super().__init__()
            self._private = private
            self.normal = normal
            self._bot = b

    class ChangingTO(TelegramObject):
        # Don't use in any tests, this is just for testing the pickle behaviour and the
        # class is altered during the test procedure
        pass

    def test_to_json(self, monkeypatch):
        class Subclass(TelegramObject):
            def __init__(self):
                super().__init__()
                self.arg = "arg"
                self.arg2 = ["arg2", "arg2"]
                self.arg3 = {"arg3": "arg3"}
                self.empty_tuple = ()

        json = Subclass().to_json()
        # Order isn't guarantied
        assert '"arg": "arg"' in json
        assert '"arg2": ["arg2", "arg2"]' in json
        assert '"arg3": {"arg3": "arg3"}' in json
        assert "empty_tuple" not in json

        # Now make sure that it doesn't work with not json stuff and that it fails loudly
        # Tuples aren't allowed as keys in json
        d = {("str", "str"): "str"}

        monkeypatch.setattr("telegram.TelegramObject.to_dict", lambda _: d)
        with pytest.raises(TypeError):
            TelegramObject().to_json()

    def test_de_json_api_kwargs(self, bot):
        to = TelegramObject.de_json(data={"foo": "bar"}, bot=bot)
        assert to.api_kwargs == {"foo": "bar"}
        assert to.get_bot() is bot

    def test_de_json_optional_bot(self):
        to = TelegramObject.de_json(data={})
        with pytest.raises(RuntimeError, match="no bot associated with it"):
            to.get_bot()

    def test_de_list(self, bot):
        class SubClass(TelegramObject):
            def __init__(self, arg: int, **kwargs):
                super().__init__(**kwargs)
                self.arg = arg

                self._id_attrs = (self.arg,)

        assert SubClass.de_list([{"arg": 1}, {"arg": 2}], bot) == (
            SubClass(1),
            SubClass(2),
        )

    def test_api_kwargs_read_only(self):
        tg_object = TelegramObject(api_kwargs={"foo": "bar"})
        tg_object._freeze()
        assert isinstance(tg_object.api_kwargs, MappingProxyType)
        with pytest.raises(TypeError):
            tg_object.api_kwargs["foo"] = "baz"
        with pytest.raises(AttributeError, match="can't be set"):
            tg_object.api_kwargs = {"foo": "baz"}

    @pytest.mark.parametrize("cls", TO_SUBCLASSES, ids=[cls.__name__ for cls in TO_SUBCLASSES])
    def test_subclasses_have_api_kwargs(self, cls):
        """Checks that all subclasses of TelegramObject have an api_kwargs argument that is
        kw-only. Also, tries to check that this argument is passed to super - by checking that
        the `__init__` contains `api_kwargs=api_kwargs`
        """
        if issubclass(cls, Bot):
            # Bot doesn't have api_kwargs, because it's not defined by TG
            return

        # only relevant for subclasses that have their own init
        if inspect.getsourcefile(cls.__init__) != inspect.getsourcefile(cls):
            return

        # Ignore classes in the test directory
        source_file = Path(inspect.getsourcefile(cls))
        parents = source_file.parents
        is_test_file = Path(__file__).parent.resolve() in parents
        if is_test_file:
            return

        # check the signature first
        signature = inspect.signature(cls)
        assert signature.parameters.get("api_kwargs").kind == inspect.Parameter.KEYWORD_ONLY

        # Now check for `api_kwargs=api_kwargs` in the source code of `__init__`
        if cls is TelegramObject:
            # TelegramObject doesn't have a super class
            return
        assert "api_kwargs=api_kwargs" in inspect.getsource(
            cls.__init__
        ), f"{cls.__name__} doesn't seem to pass `api_kwargs` to `super().__init__`"

    def test_de_json_arbitrary_exceptions(self, bot):
        class SubClass(TelegramObject):
            def __init__(self, **kwargs):
                super().__init__(**kwargs)
                raise TypeError("This is a test")

        with pytest.raises(TypeError, match="This is a test"):
            SubClass.de_json({}, bot)

    def test_to_dict_private_attribute(self):
        class TelegramObjectSubclass(TelegramObject):
            __slots__ = ("_b", "a")  # Added slots so that the attrs are converted to dict

            def __init__(self):
                super().__init__()
                self.a = 1
                self._b = 2

        subclass_instance = TelegramObjectSubclass()
        assert subclass_instance.to_dict() == {"a": 1}

    def test_to_dict_api_kwargs(self):
        to = TelegramObject(api_kwargs={"foo": "bar"})
        assert to.to_dict() == {"foo": "bar"}

    def test_to_dict_missing_attribute(self):
        message = Message(1, dtm.datetime.now(), Chat(1, "private"), from_user=User(1, "", False))
        message._unfreeze()
        del message.chat

        message_dict = message.to_dict()
        assert "chat" not in message_dict

        message_dict = message.to_dict(recursive=False)
        assert message_dict["chat"] is None

    def test_to_dict_recursion(self):
        class Recursive(TelegramObject):
            __slots__ = ("recursive",)

            def __init__(self):
                super().__init__()
                self.recursive = "recursive"

        class SubClass(TelegramObject):
            """This class doesn't have `__slots__`, so has `__dict__` instead."""

            def __init__(self):
                super().__init__()
                self.subclass = Recursive()

        to = SubClass()
        to_dict_no_recurse = to.to_dict(recursive=False)
        assert to_dict_no_recurse
        assert isinstance(to_dict_no_recurse["subclass"], Recursive)
        to_dict_recurse = to.to_dict(recursive=True)
        assert to_dict_recurse
        assert isinstance(to_dict_recurse["subclass"], dict)
        assert to_dict_recurse["subclass"]["recursive"] == "recursive"

    def test_to_dict_default_value(self):
        class SubClass(TelegramObject):
            def __init__(self):
                super().__init__()
                self.default_none = DEFAULT_NONE
                self.default_false = DEFAULT_FALSE

        to = SubClass()
        to_dict = to.to_dict()
        assert "default_none" not in to_dict
        assert to_dict["default_false"] is False

    def test_slot_behaviour(self):
        inst = TelegramObject()
        for attr in inst.__slots__:
            assert getattr(inst, attr, "err") != "err", f"got extra slot '{attr}'"
        assert len(mro_slots(inst)) == len(set(mro_slots(inst))), "duplicate slot"

    def test_meaningless_comparison(self, recwarn):
        expected_warning = "Objects of type TGO can not be meaningfully tested for equivalence."

        class TGO(TelegramObject):
            pass

        a = TGO()
        b = TGO()
        assert a == b
        assert len(recwarn) == 1
        assert str(recwarn[0].message) == expected_warning
        assert recwarn[0].category is PTBUserWarning
        assert recwarn[0].filename == __file__, "wrong stacklevel"

    def test_meaningful_comparison(self, recwarn):
        class TGO(TelegramObject):
            def __init__(self):
                self._id_attrs = (1,)

        a = TGO()
        b = TGO()
        assert a == b
        assert len(recwarn) == 0
        assert b == a
        assert len(recwarn) == 0

    def test_bot_instance_none(self):
        tg_object = TelegramObject()
        with pytest.raises(RuntimeError):
            tg_object.get_bot()

    @pytest.mark.parametrize("bot_inst", ["bot", None])
    def test_bot_instance_states(self, bot_inst):
        tg_object = TelegramObject()
        tg_object.set_bot("bot" if bot_inst == "bot" else bot_inst)
        if bot_inst == "bot":
            assert tg_object.get_bot() == "bot"
        elif bot_inst is None:
            with pytest.raises(RuntimeError):
                tg_object.get_bot()

    def test_subscription(self):
        # We test with Message because that gives us everything we want to test - easier than
        # implementing a custom subclass just for this test
        chat = Chat(2, Chat.PRIVATE)
        user = User(3, "first_name", False)
        message = Message(1, None, chat=chat, from_user=user, text="foobar")
        assert message["text"] == "foobar"
        assert message["chat"] is chat
        assert message["chat_id"] == 2
        assert message["from"] is user
        assert message["from_user"] is user
        with pytest.raises(KeyError, match="Message don't have an attribute called `no_key`"):
            message["no_key"]

    def test_pickle(self, bot):
        chat = Chat(2, Chat.PRIVATE)
        user = User(3, "first_name", False)
        date = dtm.datetime.now()
        photo = PhotoSize("file_id", "unique", 21, 21)
        photo.set_bot(bot)
        msg = Message(
            1,
            date,
            chat,
            from_user=user,
            text="foobar",
            photo=[photo],
            animation=DEFAULT_NONE,
            api_kwargs={"api": "kwargs"},
        )
        msg.set_bot(bot)

        # Test pickling of TGObjects, we choose Message since it's contains the most subclasses.
        assert msg.get_bot()
        unpickled = pickle.loads(pickle.dumps(msg))

        with pytest.raises(RuntimeError):
            unpickled.get_bot()  # There should be no bot when we pickle TGObjects

        assert unpickled.chat == chat, f"{unpickled.chat._id_attrs} != {chat._id_attrs}"
        assert unpickled.from_user == user
        assert unpickled.date == date, f"{unpickled.date} != {date}"
        assert unpickled.photo[0] == photo
        assert isinstance(unpickled.animation, DefaultValue)
        assert unpickled.animation.value is None
        assert isinstance(unpickled.api_kwargs, MappingProxyType)
        assert unpickled.api_kwargs == {"api": "kwargs"}

    def test_pickle_apply_api_kwargs(self):
        """Makes sure that when a class gets new attributes, the api_kwargs are moved to the
        new attributes on unpickling."""
        obj = self.ChangingTO(api_kwargs={"foo": "bar"})
        pickled = pickle.dumps(obj)

        self.ChangingTO.foo = None
        obj = pickle.loads(pickled)

        assert obj.foo == "bar"
        assert obj.api_kwargs == {}

    async def test_pickle_backwards_compatibility(self):
        """Test when newer versions of the library remove or add attributes from classes (which
        the old pickled versions still/don't have).
        """
        # We use a modified version of the 20.0a5 Chat class, which
        # * has an `all_members_are_admins` attribute,
        # * a non-empty `api_kwargs` dict
        # * does not have the `is_forum` attribute
        # This specific version was pickled
        # using PicklePersistence.update_chat_data and that's what we use here to test if
        # * the (now) removed attribute `all_members_are_admins` was added to api_kwargs
        # * the (now) added attribute `is_forum` does not affect the unpickling
        pp = PicklePersistence(data_file("20a5_modified_chat.pickle"))
        chat = (await pp.get_chat_data())[1]
        assert chat.id == 1
        assert chat.type == Chat.PRIVATE
        api_kwargs_expected = {
            "all_members_are_administrators": True,
            "something": "Manually inserted",
        }
        # There are older attrs in Chat's api_kwargs which are present but we don't care about them
        for k, v in api_kwargs_expected.items():
            assert chat.api_kwargs[k] == v

        with pytest.raises(AttributeError):
            # removed attribute should not be available as attribute, only though api_kwargs
            chat.all_members_are_administrators
        with pytest.raises(AttributeError):
            # New attribute should not be available either as is always the case for pickle
            chat.is_forum

        # Ensure that loading objects that were pickled before attributes were made immutable
        # are still mutable
        chat.id = 7
        assert chat.id == 7

    def test_pickle_handle_properties(self):
        # Very hard to properly test, can't use a pickle file since newer versions of the library
        # will stop having the property.
        # The code below uses exec statements to simulate library changes. There is no other way
        # to test this.
        # Original class:
        v1 = """
class PicklePropertyTest(TelegramObject):
    __slots__ = ("forward_from", "to_be_removed", "forward_date")
    def __init__(self, forward_from=None, forward_date=None, api_kwargs=None):
        super().__init__(api_kwargs=api_kwargs)
        self.forward_from = forward_from
        self.forward_date = forward_date
        self.to_be_removed = "to_be_removed"
"""
        exec(v1, globals(), None)
        old = PicklePropertyTest("old_val", "date", api_kwargs={"new_attr": 1})  # noqa: F821
        pickled_v1 = pickle.dumps(old)

        # After some API changes:
        v2 = """
class PicklePropertyTest(TelegramObject):
    __slots__ = ("_forward_from", "_date", "_new_attr")
    def __init__(self, forward_from=None, f_date=None, new_attr=None, api_kwargs=None):
        super().__init__(api_kwargs=api_kwargs)
        self._forward_from = forward_from
        self.f_date = f_date
        self._new_attr = new_attr
    @property
    def forward_from(self):
        return self._forward_from
    @property
    def forward_date(self):
        return self.f_date
    @property
    def new_attr(self):
        return self._new_attr
        """
        exec(v2, globals(), None)
        v2_unpickle = pickle.loads(pickled_v1)
        assert v2_unpickle.forward_from == "old_val" == v2_unpickle._forward_from
        with pytest.raises(AttributeError):
            # New attribute should not be available either as is always the case for pickle
            v2_unpickle.forward_date
        assert v2_unpickle.new_attr == 1 == v2_unpickle._new_attr
        assert not hasattr(v2_unpickle, "to_be_removed")
        assert v2_unpickle.api_kwargs == {"to_be_removed": "to_be_removed"}
        pickled_v2 = pickle.dumps(v2_unpickle)

        # After PTB removes the property and the attribute:
        v3 = """
class PicklePropertyTest(TelegramObject):
    __slots__ = ()
    def __init__(self, api_kwargs=None):
        super().__init__(api_kwargs=api_kwargs)
"""
        exec(v3, globals(), None)
        v3_unpickle = pickle.loads(pickled_v2)
        assert v3_unpickle.api_kwargs == {"to_be_removed": "to_be_removed"}
        assert not hasattr(v3_unpickle, "_forward_from")
        assert not hasattr(v3_unpickle, "_new_attr")

    def test_deepcopy_telegram_obj(self, bot):
        chat = Chat(2, Chat.PRIVATE)
        user = User(3, "first_name", False)
        date = dtm.datetime.now()
        photo = PhotoSize("file_id", "unique", 21, 21)
        photo.set_bot(bot)
        msg = Message(
            1, date, chat, from_user=user, text="foobar", photo=[photo], api_kwargs={"foo": "bar"}
        )
        msg.set_bot(bot)

        new_msg = deepcopy(msg)

        assert new_msg == msg
        assert new_msg is not msg

        # The same bot should be present when deepcopying.
        assert new_msg.get_bot() == bot
        assert new_msg.get_bot() is bot

        assert new_msg.date == date
        assert new_msg.date is not date
        assert new_msg.chat == chat
        assert new_msg.chat is not chat
        assert new_msg.from_user == user
        assert new_msg.from_user is not user
        assert new_msg.photo[0] == photo
        assert new_msg.photo[0] is not photo
        assert new_msg.api_kwargs == {"foo": "bar"}
        assert new_msg.api_kwargs is not msg.api_kwargs

        # check that deepcopy preserves the freezing status
        with pytest.raises(
            AttributeError, match="Attribute `text` of class `Message` can't be set!"
        ):
            new_msg.text = "new text"

        msg._unfreeze()
        new_message = deepcopy(msg)
        new_message.text = "new text"
        assert new_message.text == "new text"

    def test_deepcopy_subclass_telegram_obj(self, bot):
        s = self.Sub("private", "normal", bot)
        d = deepcopy(s)
        assert d is not s
        assert d._private == s._private  # Can't test for identity since two equal strings is True
        assert d._bot == s._bot
        assert d._bot is s._bot
        assert d.normal == s.normal

    def test_string_representation(self):
        class TGO(TelegramObject):
            def __init__(self, api_kwargs=None):
                super().__init__(api_kwargs=api_kwargs)
                self.string_attr = "string"
                self.int_attr = 42
                self.to_attr = BotCommand("command", "description")
                self.list_attr = [
                    BotCommand("command_1", "description_1"),
                    BotCommand("command_2", "description_2"),
                ]
                self.dict_attr = {
                    BotCommand("command_1", "description_1"): BotCommand(
                        "command_2", "description_2"
                    )
                }
                self.empty_tuple_attrs = ()
                self.empty_str_attribute = ""
                # Should not be included in string representation
                self.none_attr = None

        expected_without_api_kwargs = (
            "TGO(dict_attr={BotCommand(command='command_1', description='description_1'): "
            "BotCommand(command='command_2', description='description_2')}, int_attr=42, "
            "list_attr=[BotCommand(command='command_1', description='description_1'), "
            "BotCommand(command='command_2', description='description_2')], "
            "string_attr='string', to_attr=BotCommand(command='command', "
            "description='description'))"
        )
        assert str(TGO()) == expected_without_api_kwargs
        assert repr(TGO()) == expected_without_api_kwargs

        expected_with_api_kwargs = (
            "TGO(api_kwargs={'foo': 'bar'}, dict_attr={BotCommand(command='command_1', "
            "description='description_1'): BotCommand(command='command_2', "
            "description='description_2')}, int_attr=42, "
            "list_attr=[BotCommand(command='command_1', description='description_1'), "
            "BotCommand(command='command_2', description='description_2')], "
            "string_attr='string', to_attr=BotCommand(command='command', "
            "description='description'))"
        )
        assert str(TGO(api_kwargs={"foo": "bar"})) == expected_with_api_kwargs
        assert repr(TGO(api_kwargs={"foo": "bar"})) == expected_with_api_kwargs

    @pytest.mark.parametrize("cls", TO_SUBCLASSES, ids=[cls.__name__ for cls in TO_SUBCLASSES])
    def test_subclasses_are_frozen(self, cls):
        if cls is TelegramObject or cls.__name__.startswith("_"):
            # Protected classes don't need to be frozen and neither does the base class
            return

        # instantiating each subclass would be tedious as some attributes require special init
        # args. So we inspect the code instead.

        source_file = inspect.getsourcefile(cls.__init__)
        parents = Path(source_file).parents
        is_test_file = Path(__file__).parent.resolve() in parents

        if is_test_file:
            # If the class is defined in a test file, we don't want to test it.
            return

        if source_file.endswith("telegramobject.py"):
            pytest.fail(
                f"{cls.__name__} does not have its own `__init__` "
                "and can therefore not be frozen correctly"
            )

        source_lines, _ = inspect.getsourcelines(cls.__init__)

        # We use regex matching since a simple "if self._freeze() in source_lines[-1]" would also
        # allo commented lines.
        last_line_freezes = re.match(r"\s*self\.\_freeze\(\)", source_lines[-1])
        uses_with_unfrozen = re.search(
            r"\n\s*with self\.\_unfrozen\(\)\:", inspect.getsource(cls.__init__)
        )

        assert last_line_freezes or uses_with_unfrozen, f"{cls.__name__} is not frozen correctly"

    def test_freeze_unfreeze(self):
        class TestSub(TelegramObject):
            def __init__(self):
                super().__init__()
                self._protected = True
                self.public = True
                self._freeze()

        foo = TestSub()
        foo._protected = False
        assert foo._protected is False

        with pytest.raises(
            AttributeError, match="Attribute `public` of class `TestSub` can't be set!"
        ):
            foo.public = False

        with pytest.raises(
            AttributeError, match="Attribute `public` of class `TestSub` can't be deleted!"
        ):
            del foo.public

        foo._unfreeze()
        foo._protected = True
        assert foo._protected is True
        foo.public = False
        assert foo.public is False
        del foo.public
        del foo._protected
        assert not hasattr(foo, "public")
        assert not hasattr(foo, "_protected")
