File: script.py

package info (click to toggle)
python-ledger-bitcoin 0.4.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 720 kB
  • sloc: python: 9,357; makefile: 2
file content (212 lines) | stat: -rw-r--r-- 6,354 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from .networks import NETWORKS
from . import base58
from . import bech32
from . import hashes
from . import compact
from .base import EmbitBase, EmbitError

SIGHASH_ALL = 1


class Script(EmbitBase):
    def __init__(self, data=b""):
        self.data = data

    def address(self, network=NETWORKS["main"]):
        script_type = self.script_type()
        data = self.data

        if script_type is None:
            raise ValueError("This type of script doesn't have address representation")

        if script_type == "p2pkh":
            d = network["p2pkh"] + data[3:23]
            return base58.encode_check(d)

        if script_type == "p2sh":
            d = network["p2sh"] + data[2:22]
            return base58.encode_check(d)

        if script_type in ["p2wpkh", "p2wsh", "p2tr"]:
            ver = data[0]
            # FIXME: should be one of OP_N
            if ver > 0:
                ver = ver % 0x50
            return bech32.encode(network["bech32"], ver, data[2:])

        # we should never get here
        raise ValueError("Unsupported script type")

    def push(self, data):
        self.data += compact.to_bytes(len(data)) + data

    def script_type(self):
        data = self.data
        # OP_DUP OP_HASH160 <20:hash160(pubkey)> OP_EQUALVERIFY OP_CHECKSIG
        if len(data) == 25 and data[:3] == b"\x76\xa9\x14" and data[-2:] == b"\x88\xac":
            return "p2pkh"
        # OP_HASH160 <20:hash160(script)> OP_EQUAL
        if len(data) == 23 and data[:2] == b"\xa9\x14" and data[-1] == 0x87:
            return "p2sh"
        # 0 <20:hash160(pubkey)>
        if len(data) == 22 and data[:2] == b"\x00\x14":
            return "p2wpkh"
        # 0 <32:sha256(script)>
        if len(data) == 34 and data[:2] == b"\x00\x20":
            return "p2wsh"
        # OP_1 <x-only-pubkey>
        if len(data) == 34 and data[:2] == b"\x51\x20":
            return "p2tr"
        # unknown type
        return None

    def write_to(self, stream):
        res = stream.write(compact.to_bytes(len(self.data)))
        res += stream.write(self.data)
        return res

    @classmethod
    def read_from(cls, stream):
        l = compact.read_from(stream)
        data = stream.read(l)
        if len(data) != l:
            raise ValueError("Cant read %d bytes" % l)
        return cls(data)

    @classmethod
    def from_address(cls, addr: str):
        """
        Decodes a bitcoin address and returns corresponding scriptpubkey.
        """
        return address_to_scriptpubkey(addr)

    def __eq__(self, other):
        return self.data == other.data

    def __ne__(self, other):
        return self.data != other.data

    def __hash__(self):
        return hash(self.data)

    def __len__(self):
        return len(self.data)


class Witness(EmbitBase):
    def __init__(self, items=[]):
        self.items = items[:]

    def write_to(self, stream):
        res = stream.write(compact.to_bytes(len(self.items)))
        for item in self.items:
            res += stream.write(compact.to_bytes(len(item)))
            res += stream.write(item)
        return res

    @classmethod
    def read_from(cls, stream):
        num = compact.read_from(stream)
        items = []
        for i in range(num):
            l = compact.read_from(stream)
            data = stream.read(l)
            items.append(data)
        return cls(items)

    def __hash__(self):
        return hash(self.items)

    def __len__(self):
        return len(self.items)


def p2pkh(pubkey):
    """Return Pay-To-Pubkey-Hash ScriptPubkey"""
    return Script(b"\x76\xa9\x14" + hashes.hash160(pubkey.sec()) + b"\x88\xac")


def p2sh(script):
    """Return Pay-To-Script-Hash ScriptPubkey"""
    return Script(b"\xa9\x14" + hashes.hash160(script.data) + b"\x87")


def p2wpkh(pubkey):
    """Return Pay-To-Witness-Pubkey-Hash ScriptPubkey"""
    return Script(b"\x00\x14" + hashes.hash160(pubkey.sec()))


def p2wsh(script):
    """Return Pay-To-Witness-Pubkey-Hash ScriptPubkey"""
    return Script(b"\x00\x20" + hashes.sha256(script.data))


def p2tr(pubkey, script_tree=None):
    """Return Pay-To-Taproot ScriptPubkey"""
    if script_tree is None:
        h = b""
    else:
        h = script_tree.tweak()
    output_pubkey = pubkey.taproot_tweak(h)
    return Script(b"\x51\x20" + output_pubkey.xonly())


def p2pkh_from_p2wpkh(script):
    """Convert p2wpkh to p2pkh script"""
    return Script(b"\x76\xa9" + script.serialize()[2:] + b"\x88\xac")


def multisig(m: int, pubkeys):
    if m <= 0 or m > 16:
        raise ValueError("m must be between 1 and 16")
    n = len(pubkeys)
    if n < m or n > 16:
        raise ValueError("Number of pubkeys must be between %d and 16" % m)
    data = bytes([80 + m])
    for pubkey in pubkeys:
        sec = pubkey.sec()
        data += bytes([len(sec)]) + sec
    # OP_m <len:pubkey> ... <len:pubkey> OP_n OP_CHECKMULTISIG
    data += bytes([80 + n, 0xAE])
    return Script(data)


def address_to_scriptpubkey(addr):
    # try with base58 address
    try:
        data = base58.decode_check(addr)
        prefix = data[:1]
        for net in NETWORKS.values():
            if prefix == net["p2pkh"]:
                return Script(b"\x76\xa9\x14" + data[1:] + b"\x88\xac")
            elif prefix == net["p2sh"]:
                return Script(b"\xa9\x14" + data[1:] + b"\x87")
    except:
        # fail - then it's bech32 address
        hrp = addr.split("1")[0]
        ver, data = bech32.decode(hrp, addr)
        if ver not in [0, 1] or len(data) not in [20, 32]:
            raise EmbitError("Invalid bech32 address")
        if ver == 1 and len(data) != 32:
            raise EmbitError("Invalid bech32 address")
        # OP_1..OP_N
        if ver > 0:
            ver += 0x50
        return Script(bytes([ver, len(data)] + data))


def script_sig_p2pkh(signature, pubkey, sighash=SIGHASH_ALL):
    sec = pubkey.sec()
    der = signature.serialize() + bytes([sighash])
    data = compact.to_bytes(len(der)) + der + compact.to_bytes(len(sec)) + sec
    return Script(data)


def script_sig_p2sh(redeem_script):
    """Creates scriptsig for p2sh"""
    # FIXME: implement for legacy p2sh as well
    return Script(redeem_script.serialize())


def witness_p2wpkh(signature, pubkey, sighash=SIGHASH_ALL):
    return Witness([signature.serialize() + bytes([sighash]), pubkey.sec()])