1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
import enum
from typing import List, Tuple, Mapping, Union, Iterator, Optional
from .common import bip32_path_from_string, write_varint
from .merkle import get_merkleized_map_commitment, MerkleTree, element_hash
from .wallet import WalletPolicy
# p2 encodes the protocol version implemented
CURRENT_PROTOCOL_VERSION = 1
def chunkify(data: bytes, chunk_len: int) -> Iterator[Tuple[bool, bytes]]:
size: int = len(data)
if size <= chunk_len:
yield True, data
return
chunk: int = size // chunk_len
remaining: int = size % chunk_len
offset: int = 0
for i in range(chunk):
yield False, data[offset: offset + chunk_len]
offset += chunk_len
if remaining:
yield True, data[offset:]
class DefaultInsType(enum.IntEnum):
GET_VERSION = 0x01
class BitcoinInsType(enum.IntEnum):
GET_EXTENDED_PUBKEY = 0x00
REGISTER_WALLET = 0x02
GET_WALLET_ADDRESS = 0x03
SIGN_PSBT = 0x04
GET_MASTER_FINGERPRINT = 0x05
SIGN_MESSAGE = 0x10
class FrameworkInsType(enum.IntEnum):
CONTINUE_INTERRUPTED = 0x01
class BitcoinCommandBuilder:
"""APDU command builder for the Bitcoin application."""
CLA_DEFAULT: int = 0xB0
CLA_BITCOIN: int = 0xE1
CLA_FRAMEWORK: int = 0xF8
def serialize(
self,
cla: int,
ins: Union[int, enum.IntEnum],
p1: int = 0,
p2: int = CURRENT_PROTOCOL_VERSION,
cdata: bytes = b"",
) -> dict:
"""Serialize the whole APDU command (header + data).
Parameters
----------
cla : int
Instruction class: CLA (1 byte)
ins : Union[int, IntEnum]
Instruction code: INS (1 byte)
p1 : int
Instruction parameter 1: P1 (1 byte).
p2 : int
Instruction parameter 2: P2 (1 byte).
cdata : bytes
Bytes of command data.
Returns
-------
dict
Dictionary representing the APDU message.
"""
return {"cla": cla, "ins": ins, "p1": p1, "p2": p2, "data": cdata}
def get_extended_pubkey(self, bip32_path: str, display: bool = False):
bip32_path: List[bytes] = bip32_path_from_string(bip32_path)
cdata: bytes = b"".join([
b'\1' if display else b'\0',
len(bip32_path).to_bytes(1, byteorder="big"),
*bip32_path
])
return self.serialize(
cla=self.CLA_BITCOIN,
ins=BitcoinInsType.GET_EXTENDED_PUBKEY,
cdata=cdata,
)
def register_wallet(self, wallet: WalletPolicy):
wallet_bytes = wallet.serialize()
return self.serialize(
cla=self.CLA_BITCOIN,
ins=BitcoinInsType.REGISTER_WALLET,
cdata=write_varint(len(wallet_bytes)) + wallet_bytes,
)
def get_wallet_address(
self,
wallet: WalletPolicy,
wallet_hmac: Optional[bytes],
address_index: int,
change: bool,
display: bool,
):
cdata: bytes = b"".join(
[
b'\1' if display else b'\0', # 1 byte
wallet.id, # 32 bytes
wallet_hmac if wallet_hmac is not None else b'\0' * 32, # 32 bytes
b"\1" if change else b"\0", # 1 byte
address_index.to_bytes(4, byteorder="big"), # 4 bytes
]
)
return self.serialize(
cla=self.CLA_BITCOIN,
ins=BitcoinInsType.GET_WALLET_ADDRESS,
cdata=cdata,
)
def sign_psbt(
self,
global_mapping: Mapping[bytes, bytes],
input_mappings: List[Mapping[bytes, bytes]],
output_mappings: List[Mapping[bytes, bytes]],
wallet: WalletPolicy,
wallet_hmac: Optional[bytes],
):
cdata = bytearray()
cdata += get_merkleized_map_commitment(global_mapping)
cdata += write_varint(len(input_mappings))
cdata += MerkleTree(
[
element_hash(get_merkleized_map_commitment(m_in))
for m_in in input_mappings
]
).root
cdata += write_varint(len(output_mappings))
cdata += MerkleTree(
[
element_hash(get_merkleized_map_commitment(m_out))
for m_out in output_mappings
]
).root
cdata += wallet.id
cdata += wallet_hmac if wallet_hmac is not None else b'\0' * 32
return self.serialize(
cla=self.CLA_BITCOIN, ins=BitcoinInsType.SIGN_PSBT, cdata=bytes(cdata)
)
def get_master_fingerprint(self):
return self.serialize(
cla=self.CLA_BITCOIN,
ins=BitcoinInsType.GET_MASTER_FINGERPRINT
)
def sign_message(self, message: bytes, bip32_path: str):
cdata = bytearray()
bip32_path: List[bytes] = bip32_path_from_string(bip32_path)
# split message in 64-byte chunks (last chunk can be smaller)
n_chunks = (len(message) + 63) // 64
chunks = [message[64 * i: 64 * i + 64] for i in range(n_chunks)]
cdata += len(bip32_path).to_bytes(1, byteorder="big")
cdata += b''.join(bip32_path)
cdata += write_varint(len(message))
cdata += MerkleTree(element_hash(c) for c in chunks).root
return self.serialize(
cla=self.CLA_BITCOIN,
ins=BitcoinInsType.SIGN_MESSAGE,
cdata=bytes(cdata)
)
def continue_interrupted(self, cdata: bytes):
"""Command builder for CONTINUE.
Returns
-------
bytes
APDU command for CONTINUE.
"""
return self.serialize(
cla=self.CLA_FRAMEWORK,
ins=FrameworkInsType.CONTINUE_INTERRUPTED,
cdata=cdata,
)
|