File: client_legacy.py

package info (click to toggle)
python-ledger-bitcoin 0.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 716 kB
  • sloc: python: 9,357; makefile: 2
file content (356 lines) | stat: -rw-r--r-- 15,428 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""
This module provides a compatibility layer between the python client of the Ledger Nano Bitcoin app v2 and the v1.6.5,
by translating client requests to the API of the app v1.6.5.

The bulk of the code is taken from bitcoin-core/HWI, with the necessary adaptations.
https://github.com/bitcoin-core/HWI/tree/a109bcd53d24a52e72f26af3ecbabb64b292ff0c,
"""

import struct
import re
import base64

from .client_base import PartialSignature
from .client import Client, TransportClient

from typing import List, Tuple, Optional, Union

from .common import AddressType, Chain, hash160
from .key import ExtendedKey, parse_path
from .psbt import PSBT, normalize_psbt
from .wallet import WalletPolicy

from ._script import is_p2sh, is_witness, is_p2wpkh, is_p2wsh

from .btchip.btchip import btchip
from .btchip.btchipUtils import compress_public_key
from .btchip.bitcoinTransaction import bitcoinTransaction


def get_address_type_for_policy(policy: WalletPolicy) -> AddressType:
    if policy.descriptor_template in ["pkh(@0/**)", "pkh(@0/<0;1>/*)"]:
        return AddressType.LEGACY
    elif policy.descriptor_template in ["wpkh(@0/**)", "wpkh(@0/<0:1>/*)"]:
        return AddressType.WIT
    elif policy.descriptor_template in ["sh(wpkh(@0/**))", "sh(wpkh(@0/<0;1>/*))"]:
        return AddressType.SH_WIT
    else:
        raise ValueError("Invalid or unsupported policy")


# minimal checking of string keypath
# taken from HWI
def check_keypath(key_path: str) -> bool:
    parts = re.split("/", key_path)
    if parts[0] != "m":
        return False
    # strip hardening chars
    for index in parts[1:]:
        index_int = re.sub('[hH\']', '', index)
        if not index_int.isdigit():
            return False
        if int(index_int) > 0x80000000:
            return False
    return True


class DongleAdaptor:
    # TODO: type for comm_client
    def __init__(self, comm_client):
        self.comm_client = comm_client

    def exchange(self, apdu: Union[bytes, bytearray]) -> bytearray:
        cla = apdu[0]
        ins = apdu[1]
        p1 = apdu[2]
        p2 = apdu[3]
        lc = apdu[4]
        data = apdu[5:]
        assert len(data) == lc
        return bytearray(self.comm_client.apdu_exchange(cla, ins, data, p1, p2))

class LegacyClient(Client):
    """Wrapper for Ledger Bitcoin app before version 2.0.0."""

    def __init__(self, comm_client: TransportClient, chain: Chain = Chain.MAIN, debug: bool = False):
        super().__init__(comm_client, chain, debug)

        self.app = btchip(DongleAdaptor(comm_client))

        if self.app.getAppName() not in ["Bitcoin", "Bitcoin Legacy", "Bitcoin Test", "Bitcoin Test Legacy", "app"]:
            raise ValueError("Ledger is not in either the Bitcoin or Bitcoin Testnet app")

    def get_extended_pubkey(self, path: str, display: bool = False) -> str:
        # mostly taken from HWI

        path = path[2:]
        path = path.replace('h', '\'')
        path = path.replace('H', '\'')

        # This call returns raw uncompressed pubkey, chaincode
        pubkey = self.app.getWalletPublicKey(path, display)
        int_path = parse_path(path)
        if len(path) > 0:
            parent_path = ""
            for ind in path.split("/")[:-1]:
                parent_path += ind + "/"
            parent_path = parent_path[:-1]

            # Get parent key fingerprint
            parent = self.app.getWalletPublicKey(parent_path)
            fpr = hash160(compress_public_key(parent["publicKey"]))[:4]

            child = int_path[-1]
        # Special case for m
        else:
            child = 0
            fpr = b"\x00\x00\x00\x00"

        xpub = ExtendedKey(
            version=ExtendedKey.MAINNET_PUBLIC if self.chain == Chain.MAIN else ExtendedKey.TESTNET_PUBLIC,
            depth=len(path.split("/")) if len(path) > 0 else 0,
            parent_fingerprint=fpr,
            child_num=child,
            chaincode=pubkey["chainCode"],
            privkey=None,
            pubkey=compress_public_key(pubkey["publicKey"]),
        )
        return xpub.to_string()

    def register_wallet(self, wallet: WalletPolicy) -> Tuple[bytes, bytes]:
        raise NotImplementedError  # legacy app does not have this functionality

    def get_wallet_address(
        self,
        wallet: WalletPolicy,
        wallet_hmac: Optional[bytes],
        change: int,
        address_index: int,
        display: bool,
    ) -> str:
        # TODO: check keypath

        if wallet_hmac is not None or wallet.n_keys != 1:
            raise NotImplementedError("Policy wallets are only supported from version 2.0.0. Please update your Ledger hardware wallet")

        if not isinstance(wallet, WalletPolicy):
            raise ValueError("Invalid wallet policy type, it must be WalletPolicy")

        key_info = wallet.keys_info[0]
        try:
            first_slash_pos = key_info.index("/")
            key_origin_end = key_info.index("]")
        except ValueError:
            raise ValueError("Could not extract key origin information")

        if key_info[0] != '[':
            raise ValueError("Key must have key origin information")

        key_origin_path = key_info[first_slash_pos + 1: key_origin_end]

        addr_type = get_address_type_for_policy(wallet)

        p2sh_p2wpkh = addr_type == AddressType.SH_WIT
        bech32 = addr_type == AddressType.WIT
        output = self.app.getWalletPublicKey(f"{key_origin_path}/{change}/{address_index}", display, p2sh_p2wpkh or bech32, bech32)
        assert isinstance(output["address"], str)
        return output['address'][12:-2]  # HACK: A bug in getWalletPublicKey results in the address being returned as the string "bytearray(b'<address>')". This extracts the actual address to work around this.

    def sign_psbt(self, psbt: Union[PSBT, bytes, str], wallet: WalletPolicy, wallet_hmac: Optional[bytes]) -> List[Tuple[int, PartialSignature]]:
        if wallet_hmac is not None or wallet.n_keys != 1:
            raise NotImplementedError("Policy wallets are only supported from version 2.0.0. Please update your Ledger hardware wallet")

        if not isinstance(wallet, WalletPolicy):
            raise ValueError("Invalid wallet policy type, it must be WalletPolicy")

        if wallet.descriptor_template not in ["pkh(@0/**)", "pkh(@0/<0;1>/*)", "wpkh(@0/**)", "wpkh(@0/<0;1>/*)", "sh(wpkh(@0/**))", "sh(wpkh(@0/<0;1>/*))"]:
            raise NotImplementedError("Unsupported policy")

        psbt = normalize_psbt(psbt)

        # the rest of the code is basically the HWI code, and it ignores wallet

        tx = psbt

        #c_tx = tx.get_unsigned_tx()
        c_tx = tx.tx
        tx_bytes = c_tx.serialize_with_witness()

        # Master key fingerprint
        master_fpr = hash160(compress_public_key(self.app.getWalletPublicKey('')["publicKey"]))[:4]
        # An entry per input, each with 0 to many keys to sign with
        all_signature_attempts: List[List[Tuple[str, bytes]]] = [[]] * len(c_tx.vin)

        # Get the app version to determine whether to use Trusted Input for segwit
        version = self.app.getFirmwareVersion()
        use_trusted_segwit = (version['major_version'] == 1 and version['minor_version'] >= 4) or version['major_version'] > 1

        # NOTE: We only support signing Segwit inputs, where we can skip over non-segwit
        # inputs, or non-segwit inputs, where *all* inputs are non-segwit. This is due
        # to Ledger's mutually exclusive signing steps for each type.
        segwit_inputs = []
        # Legacy style inputs
        legacy_inputs = []

        has_segwit = False
        has_legacy = False

        script_codes: List[bytes] = [b""] * len(c_tx.vin)

        # Detect changepath, (p2sh-)p2(w)pkh only
        change_path = ''
        for txout, i_num in zip(c_tx.vout, range(len(c_tx.vout))):
            # Find which wallet key could be change based on hdsplit: m/.../1/k
            # Wallets shouldn't be sending to change address as user action
            # otherwise this will get confused
            for pubkey, origin in tx.outputs[i_num].hd_keypaths.items():
                if origin.fingerprint == master_fpr and len(origin.path) > 1 and origin.path[-2] == 1:
                    # For possible matches, check if pubkey matches possible template
                    if hash160(pubkey) in txout.scriptPubKey or hash160(bytearray.fromhex("0014") + hash160(pubkey)) in txout.scriptPubKey:
                        change_path = ''
                        for index in origin.path:
                            change_path += str(index) + "/"
                        change_path = change_path[:-1]

        for txin, psbt_in, i_num in zip(c_tx.vin, tx.inputs, range(len(c_tx.vin))):

            seq_hex = txin.nSequence.to_bytes(4, byteorder="little").hex()

            scriptcode = b""
            utxo = None
            if psbt_in.witness_utxo:
                utxo = psbt_in.witness_utxo
            if psbt_in.non_witness_utxo:
                if txin.prevout.hash != psbt_in.non_witness_utxo.sha256:
                    raise ValueError('Input {} has a non_witness_utxo with the wrong hash'.format(i_num))
                utxo = psbt_in.non_witness_utxo.vout[txin.prevout.n]
            if utxo is None:
                raise Exception("PSBT is missing input utxo information, cannot sign")
            scriptcode = utxo.scriptPubKey

            if is_p2sh(scriptcode):
                if len(psbt_in.redeem_script) == 0:
                    continue
                scriptcode = psbt_in.redeem_script

            is_wit, _, _ = is_witness(scriptcode)

            segwit_inputs.append({"value": txin.prevout.serialize() + struct.pack("<Q", utxo.nValue), "witness": True, "sequence": seq_hex})
            if is_wit:
                if is_p2wsh(scriptcode):
                    if len(psbt_in.witness_script) == 0:
                        continue
                    scriptcode = psbt_in.witness_script
                elif is_p2wpkh(scriptcode):
                    _, _, wit_prog = is_witness(scriptcode)
                    scriptcode = b"\x76\xa9\x14" + wit_prog + b"\x88\xac"
                else:
                    continue
                has_segwit = True
            else:
                # We only need legacy inputs in the case where all inputs are legacy, we check
                # later
                assert psbt_in.non_witness_utxo is not None
                ledger_prevtx = bitcoinTransaction(psbt_in.non_witness_utxo.serialize())
                legacy_inputs.append(self.app.getTrustedInput(ledger_prevtx, txin.prevout.n))
                legacy_inputs[-1]["sequence"] = seq_hex
                has_legacy = True

            if psbt_in.non_witness_utxo and use_trusted_segwit:
                ledger_prevtx = bitcoinTransaction(psbt_in.non_witness_utxo.serialize())
                segwit_inputs[-1].update(self.app.getTrustedInput(ledger_prevtx, txin.prevout.n))

            pubkeys = []
            signature_attempts = []

            # Save scriptcode for later signing
            script_codes[i_num] = scriptcode

            # Find which pubkeys could sign this input (should be all?)
            for pubkey in psbt_in.hd_keypaths.keys():
                if hash160(pubkey) in scriptcode or pubkey in scriptcode:
                    pubkeys.append(pubkey)

            # Figure out which keys in inputs are from our wallet
            for pubkey in pubkeys:
                keypath = psbt_in.hd_keypaths[pubkey]
                if master_fpr == keypath.fingerprint:
                    # Add the keypath strings
                    keypath_str = keypath.get_derivation_path()[2:] # Drop the leading m/
                    signature_attempts.append((keypath_str, pubkey))

            all_signature_attempts[i_num] = signature_attempts

        result: List[Tuple[int, PartialSignature]] = []

        # Sign any segwit inputs
        if has_segwit:
            # Process them up front with all scriptcodes blank
            blank_script_code = bytearray()
            for i in range(len(segwit_inputs)):
                self.app.startUntrustedTransaction(i == 0, i, segwit_inputs, script_codes[i] if use_trusted_segwit else blank_script_code, c_tx.nVersion)

            # Number of unused fields for Nano S, only changepath and transaction in bytes req
            self.app.finalizeInput(b"DUMMY", -1, -1, change_path, tx_bytes)

            # For each input we control do segwit signature
            for i in range(len(segwit_inputs)):
                for signature_attempt in all_signature_attempts[i]:
                    self.app.startUntrustedTransaction(False, 0, [segwit_inputs[i]], script_codes[i], c_tx.nVersion)

                    # tx.inputs[i].partial_sigs[signature_attempt[1]] = self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01)

                    partial_sig = PartialSignature(
                        signature=self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01),
                        pubkey=signature_attempt[1]
                    )
                    result.append((i, partial_sig))
        elif has_legacy:
            first_input = True
            # Legacy signing if all inputs are legacy
            for i in range(len(legacy_inputs)):
                for signature_attempt in all_signature_attempts[i]:
                    assert (tx.inputs[i].non_witness_utxo is not None)
                    self.app.startUntrustedTransaction(first_input, i, legacy_inputs, script_codes[i], c_tx.nVersion)
                    self.app.finalizeInput(b"DUMMY", -1, -1, change_path, tx_bytes)

                    #tx.inputs[i].partial_sigs[signature_attempt[1]] = self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01)

                    partial_sig = PartialSignature(
                        signature=self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01),
                        pubkey=signature_attempt[1]
                    )
                    result.append((i, partial_sig))

                    first_input = False

        # Send list of input signatures
        return result

    def get_master_fingerprint(self) -> bytes:
        master_pubkey = self.app.getWalletPublicKey("")
        return hash160(compress_public_key(master_pubkey["publicKey"]))[:4]

    def sign_message(self, message: Union[str, bytes], keypath: str) -> str:
        # copied verbatim from HWI

        if not check_keypath(keypath):
            raise ValueError("Invalid keypath")
        if isinstance(message, str):
            message = bytearray(message, 'utf-8')
        else:
            message = bytearray(message)
        keypath = keypath[2:]
        # First display on screen what address you're signing for
        self.app.getWalletPublicKey(keypath, True)
        self.app.signMessagePrepare(keypath, message)
        signature = self.app.signMessageSign()

        # Make signature into standard bitcoin format
        rLength = signature[3]
        r = int.from_bytes(signature[4: 4 + rLength], byteorder="big", signed=True)
        s = int.from_bytes(signature[4 + rLength + 2:], byteorder="big", signed=True)

        sig = bytearray(chr(27 + 4 + (signature[0] & 0x01)), 'utf8') + r.to_bytes(32, byteorder="big", signed=False) + s.to_bytes(32, byteorder="big", signed=False)

        return base64.b64encode(sig).decode('utf-8')