File: __init__.py

package info (click to toggle)
electrum 4.7.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 28,528 kB
  • sloc: python: 109,677; sh: 1,487; java: 335; xml: 51; makefile: 33
file content (145 lines) | stat: -rw-r--r-- 5,049 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import asyncio
import os
import unittest
import threading
import tempfile
import shutil
import functools
import inspect
from typing import TYPE_CHECKING, List

import electrum
import electrum.logging
from electrum import constants
from electrum import util
from electrum.util import OldTaskGroup
from electrum.logging import Logger
from electrum.wallet import restore_wallet_from_text

if TYPE_CHECKING:
    from .test_lnpeer import MockLNWallet


# Set this locally to make the test suite run faster.
# If set, unit tests that would normally test functions with multiple implementations,
# will only be run once, using the fastest implementation.
# e.g. libsecp256k1 vs python-ecdsa. pycryptodomex vs pyaes.
FAST_TESTS = False


electrum.logging._configure_stderr_logging(verbosity="*")

electrum.util.AS_LIB_USER_I_WANT_TO_MANAGE_MY_OWN_ASYNCIO_LOOP = True


class ElectrumTestCase(unittest.IsolatedAsyncioTestCase, Logger):
    """Base class for our unit tests."""

    TESTNET = False
    REGTEST = False
    TEST_ANCHOR_CHANNELS = False
    # maxDiff = None  # for debugging

    # some unit tests are modifying globals... so we run sequentially:
    _test_lock = threading.Lock()

    def __init__(self, *args, **kwargs):
        Logger.__init__(self)
        unittest.IsolatedAsyncioTestCase.__init__(self, *args, **kwargs)

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        assert not (cls.REGTEST and cls.TESTNET), "regtest and testnet are mutually exclusive"
        if cls.REGTEST:
            constants.BitcoinRegtest.set_as_network()
        elif cls.TESTNET:
            constants.BitcoinTestnet.set_as_network()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()
        if cls.TESTNET or cls.REGTEST:
            constants.BitcoinMainnet.set_as_network()

    def setUp(self):
        have_lock = self._test_lock.acquire(timeout=0.1)
        if not have_lock:
            # This can happen when trying to run the tests in parallel,
            # or if a prior test raised  during `setUp` or `asyncSetUp` and never released the lock.
            raise Exception("timed out waiting for test_lock")
        super().setUp()
        self.unittest_base_path = tempfile.mkdtemp(prefix="electrum-unittest-base-")
        self.electrum_path = os.path.join(self.unittest_base_path, "electrum")
        util.make_dir(self.electrum_path)
        assert util._asyncio_event_loop is None, "global event loop already set?!"
        self._lnworkers_created = []  # type: List[MockLNWallet]

    async def asyncSetUp(self):
        await super().asyncSetUp()
        loop = util.get_asyncio_loop()
        # IsolatedAsyncioTestCase creates event loops with debug=True, which makes the tests take ~4x time
        if not (os.environ.get("PYTHONASYNCIODEBUG") or os.environ.get("PYTHONDEVMODE")):
            loop.set_debug(False)
        util._asyncio_event_loop = loop

    async def asyncTearDown(self):
        # clean up lnworkers
        async with OldTaskGroup() as group:
            for lnworker in self._lnworkers_created:
                await group.spawn(lnworker.stop())
        self._lnworkers_created.clear()
        await super().asyncTearDown()

    def tearDown(self):
        util.callback_mgr.clear_all_callbacks()
        shutil.rmtree(self.unittest_base_path)
        super().tearDown()
        util._asyncio_event_loop = None  # cleared here, at the ~last possible moment. asyncTearDown is too early.
        self._test_lock.release()

    def create_mock_lnwallet(
        self,
        *,
        name: str,
        has_anchors: bool,
    ) -> 'MockLNWallet':
        from .test_lnpeer import _create_mock_lnwallet
        data_dir = tempfile.mkdtemp(prefix="lnwallet-", dir=self.unittest_base_path)
        lnwallet = _create_mock_lnwallet(name=name, has_anchors=has_anchors, data_dir=data_dir)
        self._lnworkers_created.append(lnwallet)
        return lnwallet


def as_testnet(func):
    """Function decorator to run a single unit test in testnet mode.

    NOTE: this is inherently sequential; tests running in parallel would break things
    """
    old_net = constants.net
    if inspect.iscoroutinefunction(func):
        async def run_test(*args, **kwargs):
            try:
                constants.BitcoinTestnet.set_as_network()
                return await func(*args, **kwargs)
            finally:
                constants.net = old_net
    else:
        def run_test(*args, **kwargs):
            try:
                constants.BitcoinTestnet.set_as_network()
                return func(*args, **kwargs)
            finally:
                constants.net = old_net
    return run_test


@functools.wraps(restore_wallet_from_text)
def restore_wallet_from_text__for_unittest(*args, gap_limit=2, gap_limit_for_change=1, **kwargs):
    """much lower default gap limits (to save compute time)"""
    return restore_wallet_from_text(
        *args,
        gap_limit=gap_limit,
        gap_limit_for_change=gap_limit_for_change,
        **kwargs,
    )