import asyncio
from collections import defaultdict
import os
from typing import Optional, Iterable

from electrum.commands import Commands
from electrum.daemon import Daemon
from electrum.simple_config import SimpleConfig
from electrum.wallet import Abstract_Wallet
from electrum.lnworker import LNWallet, LNPeerManager
from electrum.lnwatcher import LNWatcher
from electrum import util
from electrum.utils.memory_leak import count_objects_in_memory

from . import ElectrumTestCase, as_testnet, restore_wallet_from_text__for_unittest


class DaemonTestCase(ElectrumTestCase):
    config: 'SimpleConfig'

    def setUp(self):
        super().setUp()
        self.config = SimpleConfig({'electrum_path': self.electrum_path})
        self.config.NETWORK_OFFLINE = True

        self.wallet_dir = os.path.dirname(self.config.get_wallet_path())
        assert "wallets" == os.path.basename(self.wallet_dir)

    async def asyncSetUp(self):
        await super().asyncSetUp()
        self.daemon = Daemon(config=self.config, listen_jsonrpc=False)
        assert self.daemon.network is None

    async def asyncTearDown(self):
        await self.daemon.stop()
        await super().asyncTearDown()

    def _restore_wallet_from_text(self, text, *, password: Optional[str], encrypt_file: bool = None, **kwargs) -> str:
        """Returns path for created wallet."""
        basename = util.get_new_wallet_name(self.wallet_dir)
        path = os.path.join(self.wallet_dir, basename)
        wallet_dict = restore_wallet_from_text__for_unittest(
            text,
            path=path,
            password=password,
            encrypt_file=encrypt_file,
            config=self.config,
            **kwargs,
        )
        # We return the path instead of the wallet object, as extreme
        # care would be needed to use the wallet object directly:
        # Unless the daemon knows about it, daemon._load_wallet might create a conflicting wallet object
        # for the same fs path, and there would be two wallet objects contending for the same file.
        return path


class TestUnifiedPassword(DaemonTestCase):

    def setUp(self):
        super().setUp()
        self.config.WALLET_SHOULD_USE_SINGLE_PASSWORD = True

    def _run_post_unif_sanity_checks(self, paths: Iterable[str], *, password: str):
        for path in paths:
            w = self.daemon.load_wallet(path, password)
            self.assertIsNotNone(w)
            w.check_password(password)
            self.assertTrue(w.has_storage_encryption())
            if w.can_have_keystore_encryption():
                self.assertTrue(w.has_keystore_encryption())
            if w.has_seed():
                self.assertIsInstance(w.get_seed(password), str)
        can_be_unified, is_unified, wallet_paths_can_unlock = self.daemon.check_password_for_directory(
            old_password=password,
            wallet_dir=self.wallet_dir,
        )
        self.assertEqual((True, True, len(paths)), (can_be_unified, is_unified, len(wallet_paths_can_unlock)))

    # "cannot unify pw" tests --->

    async def test_cannot_unify_two_std_wallets_both_have_ks_and_sto_enc(self):
        path1 = self._restore_wallet_from_text("9dk", password="123456", encrypt_file=True)
        path2 = self._restore_wallet_from_text("x8",  password="asdasd", encrypt_file=True)
        with open(path1, "rb") as f:
            raw1_before = f.read()
        with open(path2, "rb") as f:
            raw2_before = f.read()

        can_be_unified, is_unified, _ = self.daemon.check_password_for_directory(old_password="123456", wallet_dir=self.wallet_dir)
        self.assertEqual((False, False), (can_be_unified, is_unified))
        is_unified = self.daemon.update_password_for_directory(old_password="123456", new_password="123456")
        self.assertFalse(is_unified)
        # verify that files on disk haven't changed:
        with open(path1, "rb") as f:
            raw1_after = f.read()
        with open(path2, "rb") as f:
            raw2_after = f.read()
        self.assertEqual(raw1_before, raw1_after)
        self.assertEqual(raw2_before, raw2_after)

    async def test_cannot_unify_mixed_wallets(self):
        path1 = self._restore_wallet_from_text("9dk", password="123456", encrypt_file=True)
        path2 = self._restore_wallet_from_text("9dk",  password="asdasd", encrypt_file=False)
        path3 = self._restore_wallet_from_text("9dk",  password=None)
        with open(path1, "rb") as f:
            raw1_before = f.read()
        with open(path2, "rb") as f:
            raw2_before = f.read()
        with open(path3, "rb") as f:
            raw3_before = f.read()

        can_be_unified, is_unified, _ = self.daemon.check_password_for_directory(old_password="123456", wallet_dir=self.wallet_dir)
        self.assertEqual((False, False), (can_be_unified, is_unified))
        is_unified = self.daemon.update_password_for_directory(old_password="123456", new_password="123456")
        self.assertFalse(is_unified)
        # verify that files on disk haven't changed:
        with open(path1, "rb") as f:
            raw1_after = f.read()
        with open(path2, "rb") as f:
            raw2_after = f.read()
        with open(path3, "rb") as f:
            raw3_after = f.read()
        self.assertEqual(raw1_before, raw1_after)
        self.assertEqual(raw2_before, raw2_after)
        self.assertEqual(raw3_before, raw3_after)

    # "can unify pw" tests --->

    async def test_can_unify_two_std_wallets_both_have_ks_and_sto_enc(self):
        path1 = self._restore_wallet_from_text("9dk", password="123456", encrypt_file=True)
        path2 = self._restore_wallet_from_text("x8",  password="123456", encrypt_file=True)
        can_be_unified, is_unified, _ = self.daemon.check_password_for_directory(old_password="123456", wallet_dir=self.wallet_dir)
        self.assertEqual((True, True), (can_be_unified, is_unified))
        is_unified = self.daemon.update_password_for_directory(old_password="123456", new_password="123456")
        self.assertTrue(is_unified)
        self._run_post_unif_sanity_checks([path1, path2], password="123456")

    async def test_can_unify_two_std_wallets_one_has_ks_enc_other_has_both_enc(self):
        path1 = self._restore_wallet_from_text("9dk", password="123456", encrypt_file=True)
        path2 = self._restore_wallet_from_text("x8",  password="123456", encrypt_file=False)
        with open(path2, "rb") as f:
            raw2_before = f.read()

        can_be_unified, is_unified, _ = self.daemon.check_password_for_directory(old_password="123456", wallet_dir=self.wallet_dir)
        self.assertEqual((True, False), (can_be_unified, is_unified))
        is_unified = self.daemon.update_password_for_directory(old_password="123456", new_password="123456")
        self.assertTrue(is_unified)
        self._run_post_unif_sanity_checks([path1, path2], password="123456")
        # verify that file at path2 changed:
        with open(path2, "rb") as f:
            raw2_after = f.read()
        self.assertNotEqual(raw2_before, raw2_after)

    async def test_can_unify_two_std_wallets_one_without_password(self):
        path1 = self._restore_wallet_from_text("9dk", password=None)
        path2 = self._restore_wallet_from_text("x8",  password="123456", encrypt_file=True)
        can_be_unified, is_unified, _ = self.daemon.check_password_for_directory(old_password="123456", wallet_dir=self.wallet_dir)
        self.assertEqual((True, False), (can_be_unified, is_unified))
        is_unified = self.daemon.update_password_for_directory(old_password="123456", new_password="123456")
        self.assertTrue(is_unified)
        self._run_post_unif_sanity_checks([path1, path2], password="123456")

    @as_testnet
    async def test_can_unify_large_folder_yet_to_be_unified(self):
        paths = []
        # seed
        paths.append(self._restore_wallet_from_text("9dk", password="123456", encrypt_file=True))
        paths.append(self._restore_wallet_from_text("9dk", password="123456", encrypt_file=False))
        paths.append(self._restore_wallet_from_text("9dk", password=None))
        # xpub
        xpub = "vpub5UqWay427dCjkpE3gPKLnkBUqDRoBed1328uNrLDoTyKo6HFSs9agfDMy1VXbVtcuBVRiAZQsPPsPdu1Ge8m8qvNZPyzJ4ecPsf6U1ieW4x"
        paths.append(self._restore_wallet_from_text(xpub, password="123456", encrypt_file=True))
        paths.append(self._restore_wallet_from_text(xpub, password="123456", encrypt_file=False))
        paths.append(self._restore_wallet_from_text(xpub, password=None))
        # xprv
        xprv = "vprv9FrABTX8HFeSYL9aaMnLRcEkHBbJnBu9foDJaTvcF8SLvHx6uKqL8rtt7kTd66V4QPLfWPaCJMVZa3h9zuzLr7YFZd1uoEevqqyxp66oSbN"
        paths.append(self._restore_wallet_from_text(xprv, password="123456", encrypt_file=True))
        paths.append(self._restore_wallet_from_text(xprv, password="123456", encrypt_file=False))
        paths.append(self._restore_wallet_from_text(xprv, password=None))
        # WIFs
        wifs= "p2wpkh:cRyfp9nJ8soK1bBUJAcWbMrsJZxKJpe7HBSxz5uXVbwydvUxz9zT p2wpkh:cV6J6T2AG4oXAXdYHAV6dbzR41QnGumDSVvWrmj2yYpos81RtyBK"
        paths.append(self._restore_wallet_from_text(wifs, password="123456", encrypt_file=True))
        paths.append(self._restore_wallet_from_text(wifs, password="123456", encrypt_file=False))
        paths.append(self._restore_wallet_from_text(wifs, password=None))
        # addrs
        addrs = "tb1qq2tmmcngng78nllq2pvrkchcdukemtj5s6l0zu tb1qm7ckcjsed98zhvhv3dr56a22w3fehlkxyh4wgd"
        paths.append(self._restore_wallet_from_text(addrs, password="123456", encrypt_file=True))
        paths.append(self._restore_wallet_from_text(addrs, password="123456", encrypt_file=False))
        paths.append(self._restore_wallet_from_text(addrs, password=None))
        # do unification
        can_be_unified, is_unified, _ = self.daemon.check_password_for_directory(old_password="123456", wallet_dir=self.wallet_dir)
        self.assertEqual((True, False), (can_be_unified, is_unified))
        is_unified = self.daemon.update_password_for_directory(old_password="123456", new_password="123456")
        self.assertTrue(is_unified)
        self._run_post_unif_sanity_checks(paths, password="123456")

    # misc --->

    async def test_wallet_objects_are_properly_garbage_collected_after_check_pw_for_dir(self):
        orig_cb_count = util.callback_mgr.count_all_callbacks()
        # GC sanity-check:
        mclasses = [Abstract_Wallet, LNWallet, LNWatcher, LNPeerManager]
        objmap = count_objects_in_memory(mclasses)
        for mcls in mclasses:
            self.assertEqual(len(objmap[mcls]), 0, msg=f"too many lingering objs of type={mcls}")
        # restore some wallets
        paths = []
        paths.append(self._restore_wallet_from_text("9dk", password="123456", encrypt_file=True))
        paths.append(self._restore_wallet_from_text("9dk", password="123456", encrypt_file=False))
        paths.append(self._restore_wallet_from_text("9dk", password=None))
        paths.append(self._restore_wallet_from_text("9dk", password="123456", encrypt_file=True, passphrase="hunter2"))
        paths.append(self._restore_wallet_from_text("9dk", password="999999", encrypt_file=False, passphrase="hunter2"))
        paths.append(self._restore_wallet_from_text("9dk", password=None, passphrase="hunter2"))
        # test unification
        can_be_unified, is_unified, paths_succeeded = self.daemon.check_password_for_directory(old_password="123456", wallet_dir=self.wallet_dir)
        self.assertEqual((False, False, 5), (can_be_unified, is_unified, len(paths_succeeded)))
        # gc
        try:
            async with util.async_timeout(5):
                while True:
                    objmap = count_objects_in_memory(mclasses)
                    if sum(len(lst) for lst in objmap.values()) == 0:
                        break  # all "mclasses"-type objects have been GC-ed
                    await asyncio.sleep(0.01)
        except asyncio.TimeoutError:
            for mcls in mclasses:
                self.assertEqual(len(objmap[mcls]), 0, msg=f"too many lingering objs of type={mcls}")
        # also check callbacks have been cleaned up:
        self.assertEqual(orig_cb_count, util.callback_mgr.count_all_callbacks())


class TestCommandsWithDaemon(DaemonTestCase):
    TESTNET = True
    SEED = "bitter grass shiver impose acquire brush forget axis eager alone wine silver"

    async def test_wp_command_with_inmemory_wallet_has_password(self):
        cmds = Commands(config=self.config, daemon=self.daemon)
        wallet = restore_wallet_from_text__for_unittest(
            self.SEED,
            path=None,
            password="123456",
            config=self.config)['wallet']
        self.assertEqual(self.SEED, await cmds.getseed(wallet=wallet, password="123456"))

    async def test_wp_command_with_inmemory_wallet_no_password(self):
        cmds = Commands(config=self.config, daemon=self.daemon)
        wallet = restore_wallet_from_text__for_unittest(
            self.SEED,
            path=None,
            config=self.config)['wallet']
        self.assertEqual(self.SEED, await cmds.getseed(wallet=wallet))

    async def test_wp_command_with_diskfile_wallet_has_password(self):
        cmds = Commands(config=self.config, daemon=self.daemon)
        wpath = self._restore_wallet_from_text(self.SEED, password="123456", encrypt_file=True)
        basename = os.path.basename(wpath)
        await cmds.load_wallet(wallet_path=wpath, password="123456")
        wallet = self.daemon.get_wallet(wpath)
        self.assertIsInstance(wallet, Abstract_Wallet)
        self.assertEqual(self.SEED, await cmds.getseed(wallet_path=wpath, password="123456"))
        self.assertEqual(self.SEED, await cmds.getseed(wallet_path=basename, password='123456'))
        self.assertEqual(self.SEED, await cmds.getseed(wallet=wallet, password="123456"))

    async def test_wp_command_with_diskfile_wallet_no_password(self):
        cmds = Commands(config=self.config, daemon=self.daemon)
        wpath = self._restore_wallet_from_text(self.SEED, password=None)
        basename = os.path.basename(wpath)
        await cmds.load_wallet(wallet_path=wpath, password=None)
        wallet = self.daemon.get_wallet(wpath)
        self.assertIsInstance(wallet, Abstract_Wallet)
        self.assertEqual(self.SEED, await cmds.getseed(wallet_path=wpath))
        self.assertEqual(self.SEED, await cmds.getseed(wallet_path=basename))
        self.assertEqual(self.SEED, await cmds.getseed(wallet=wallet))


class TestLoadWallet(DaemonTestCase):

    async def test_simple_load(self):
        path1 = self._restore_wallet_from_text("9dk", password=None)
        wallet1 = self.daemon.load_wallet(path1, password=None)
        await self.daemon._stop_wallet(path1)

    async def test_password_checks_for_no_password(self):
        real_password = None
        path1 = self._restore_wallet_from_text("9dk", password=real_password)
        # load_wallet will not validate the password arg unless needed for storage.decrypt():
        wallet1 = self.daemon.load_wallet(path1, password="garbage")
        await self.daemon._stop_wallet(path1)
        # unless force_check_password is set:
        with self.assertRaises(util.InvalidPassword):
            wallet1 = self.daemon.load_wallet(path1, password="garbage", force_check_password=True)

        wallet1 = self.daemon.load_wallet(path1, password=real_password)
        await self.daemon._stop_wallet(path1)

        wallet1 = self.daemon.load_wallet(path1, password=real_password, force_check_password=True)
        await self.daemon._stop_wallet(path1)

        # load_wallet will not validate the password arg if wallet is already loaded, unless force_check_password
        wallet1 = self.daemon.load_wallet(path1, password=real_password)
        wallet1 = self.daemon.load_wallet(path1, password="garbage")
        with self.assertRaises(util.InvalidPassword):
            wallet1 = self.daemon.load_wallet(path1, password="garbage", force_check_password=True)


    async def test_password_checks_for_ks_enc(self):
        real_password = "1234"
        path1 = self._restore_wallet_from_text("9dk", password=real_password, encrypt_file=False)
        # load_wallet will not validate the password arg unless needed for storage.decrypt():
        wallet1 = self.daemon.load_wallet(path1, password="garbage")
        await self.daemon._stop_wallet(path1)
        # unless force_check_password is set:
        with self.assertRaises(util.InvalidPassword):
            wallet1 = self.daemon.load_wallet(path1, password="garbage", force_check_password=True)

        wallet1 = self.daemon.load_wallet(path1, password=real_password)
        await self.daemon._stop_wallet(path1)

        wallet1 = self.daemon.load_wallet(path1, password=real_password, force_check_password=True)
        await self.daemon._stop_wallet(path1)

        # load_wallet will not validate the password arg if wallet is already loaded, unless force_check_password
        wallet1 = self.daemon.load_wallet(path1, password=real_password)
        wallet1 = self.daemon.load_wallet(path1, password="garbage")
        with self.assertRaises(util.InvalidPassword):
            wallet1 = self.daemon.load_wallet(path1, password="garbage", force_check_password=True)


    async def test_password_checks_for_sto_enc(self):
        real_password = "1234"
        path1 = self._restore_wallet_from_text("9dk", password=real_password, encrypt_file=True)

        with self.assertRaises(util.InvalidPassword):
            wallet1 = self.daemon.load_wallet(path1, password="garbage")

        with self.assertRaises(util.InvalidPassword):
            wallet1 = self.daemon.load_wallet(path1, password="garbage", force_check_password=True)

        wallet1 = self.daemon.load_wallet(path1, password=real_password)
        await self.daemon._stop_wallet(path1)

        wallet1 = self.daemon.load_wallet(path1, password=real_password, force_check_password=True)
        await self.daemon._stop_wallet(path1)

        # load_wallet will not validate the password arg if wallet is already loaded, unless force_check_password
        wallet1 = self.daemon.load_wallet(path1, password=real_password)
        wallet1 = self.daemon.load_wallet(path1, password="garbage")
        with self.assertRaises(util.InvalidPassword):
            wallet1 = self.daemon.load_wallet(path1, password="garbage", force_check_password=True)
