1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
|
# Copyright (C) 2016 The OpenTimestamps developers
#
# This file is part of python-opentimestamps.
#
# It is subject to the license terms in the LICENSE file found in the top-level
# directory of this distribution.
#
# No part of python-opentimestamps including this file, may be copied,
# modified, propagated, or distributed except according to the terms contained
# in the LICENSE file.
"""Timestamp signature verification"""
import opentimestamps.core.serialize
class VerificationError(Exception):
"""Attestation verification errors"""
class TimeAttestation:
"""Time-attesting signature"""
TAG = None
TAG_SIZE = 8
# FIXME: What should this be?
MAX_PAYLOAD_SIZE = 8192
"""Maximum size of a attestation payload"""
def _serialize_payload(self, ctx):
raise NotImplementedError
def serialize(self, ctx):
ctx.write_bytes(self.TAG)
payload_ctx = opentimestamps.core.serialize.BytesSerializationContext()
self._serialize_payload(payload_ctx)
ctx.write_varbytes(payload_ctx.getbytes())
def __eq__(self, other):
"""Implementation of equality operator
WARNING: The exact behavior of this isn't yet well-defined enough to be
used for consensus-critical applications.
"""
if isinstance(other, TimeAttestation):
assert self.__class__ is not other.__class__ # should be implemented by subclass
return False
else:
return NotImplemented
def __lt__(self, other):
"""Implementation of less than operator
WARNING: The exact behavior of this isn't yet well-defined enough to be
used for consensus-critical applications.
"""
if isinstance(other, TimeAttestation):
assert self.__class__ is not other.__class__ # should be implemented by subclass
return self.TAG < other.TAG
else:
return NotImplemented
@classmethod
def deserialize(cls, ctx):
tag = ctx.read_bytes(cls.TAG_SIZE)
serialized_attestation = ctx.read_varbytes(cls.MAX_PAYLOAD_SIZE)
import opentimestamps.core.serialize
payload_ctx = opentimestamps.core.serialize.BytesDeserializationContext(serialized_attestation)
# FIXME: probably a better way to do this...
import opentimestamps.core.dubious.notary
if tag == PendingAttestation.TAG:
r = PendingAttestation.deserialize(payload_ctx)
elif tag == BitcoinBlockHeaderAttestation.TAG:
r = BitcoinBlockHeaderAttestation.deserialize(payload_ctx)
elif tag == LitecoinBlockHeaderAttestation.TAG:
r = LitecoinBlockHeaderAttestation.deserialize(payload_ctx)
elif tag == opentimestamps.core.dubious.notary.EthereumBlockHeaderAttestation.TAG:
r = opentimestamps.core.dubious.notary.EthereumBlockHeaderAttestation.deserialize(payload_ctx)
else:
return UnknownAttestation(tag, serialized_attestation)
# If attestations want to have unspecified fields for future
# upgradability they should do so explicitly.
payload_ctx.assert_eof()
return r
class UnknownAttestation(TimeAttestation):
"""Placeholder for attestations that don't support"""
def __init__(self, tag, payload):
if tag.__class__ != bytes:
raise TypeError("tag must be bytes instance; got %r" % tag.__class__)
elif len(tag) != self.TAG_SIZE:
raise ValueError("tag must be exactly %d bytes long; got %d" % (self.TAG_SIZE, len(tag)))
if payload.__class__ != bytes:
raise TypeError("payload must be bytes instance; got %r" % tag.__class__)
elif len(payload) > self.MAX_PAYLOAD_SIZE:
raise ValueError("payload must be <= %d bytes long; got %d" % (self.MAX_PAYLOAD_SIZE, len(payload)))
# FIXME: we should check that tag != one of the tags that we do know
# about; if it does the operators < and =, and hash() will likely act
# strangely
self.TAG = tag
self.payload = payload
def __repr__(self):
return 'UnknownAttestation(%r, %r)' % (self.TAG, self.payload)
def __eq__(self, other):
if other.__class__ is UnknownAttestation:
return self.TAG == other.TAG and self.payload == other.payload
else:
return super().__eq__(other)
def __lt__(self, other):
if other.__class__ is UnknownAttestation:
return (self.TAG, self.payload) < (other.TAG, other.payload)
else:
return super().__lt__(other)
def __hash__(self):
return hash((self.TAG, self.payload))
def _serialize_payload(self, ctx):
# Notice how this is write_bytes, not write_varbytes - the latter would
# incorrectly add a length header to the actual payload.
ctx.write_bytes(self.payload)
# Note how neither of these signatures actually has the time...
class PendingAttestation(TimeAttestation):
"""Pending attestation
Commitment has been recorded in a remote calendar for future attestation,
and we have a URI to find a more complete timestamp in the future.
Nothing other than the URI is recorded, nor is there provision made to add
extra metadata (other than the URI) in future upgrades. The rational here
is that remote calendars promise to keep commitments indefinitely, so from
the moment they are created it should be possible to find the commitment in
the calendar. Thus if you're not satisfied with the local verifiability of
a timestamp, the correct thing to do is just ask the remote calendar if
additional attestations are available and/or when they'll be available.
While we could additional metadata like what types of attestations the
remote calendar expects to be able to provide in the future, that metadata
can easily change in the future too. Given that we don't expect timestamps
to normally have more than a small number of remote calendar attestations,
it'd be better to have verifiers get the most recent status of such
information (possibly with appropriate negative response caching).
"""
TAG = bytes.fromhex('83dfe30d2ef90c8e')
MAX_URI_LENGTH = 1000
"""Maximum legal URI length, in bytes"""
ALLOWED_URI_CHARS = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._/:"
"""Characters allowed in URI's
Note how we've left out the characters necessary for parameters, queries,
or fragments, as well as IPv6 [] notation, percent-encoding special
characters, and @ login notation. Hopefully this keeps us out of trouble!
"""
@classmethod
def check_uri(cls, uri):
"""Check URI for validity
Raises ValueError appropriately
"""
if len(uri) > cls.MAX_URI_LENGTH:
raise ValueError("URI exceeds maximum length")
for char in uri:
if char not in cls.ALLOWED_URI_CHARS:
raise ValueError("URI contains invalid character %r" % bytes([char]))
def __init__(self, uri):
if not isinstance(uri, str):
raise TypeError("URI must be a string")
self.check_uri(uri.encode())
self.uri = uri
def __repr__(self):
return 'PendingAttestation(%r)' % self.uri
def __eq__(self, other):
if other.__class__ is PendingAttestation:
return self.uri == other.uri
else:
return super().__eq__(other)
def __lt__(self, other):
if other.__class__ is PendingAttestation:
return self.uri < other.uri
else:
return super().__lt__(other)
def __hash__(self):
return hash(self.uri)
def _serialize_payload(self, ctx):
ctx.write_varbytes(self.uri.encode())
@classmethod
def deserialize(cls, ctx):
utf8_uri = ctx.read_varbytes(cls.MAX_URI_LENGTH)
try:
cls.check_uri(utf8_uri)
except ValueError as exp:
raise opentimestamps.core.serialize.DeserializationError("Invalid URI: %r" % exp)
return PendingAttestation(utf8_uri.decode())
class BitcoinBlockHeaderAttestation(TimeAttestation):
"""Signed by the Bitcoin blockchain
The commitment digest will be the merkleroot of the blockheader.
The block height is recorded so that looking up the correct block header in
an external block header database doesn't require every header to be stored
locally (33MB and counting). (remember that a memory-constrained local
client can save an MMR that commits to all blocks, and use an external service to fill
in pruned details).
Otherwise no additional redundant data about the block header is recorded.
This is very intentional: since the attestation contains (nearly) the
absolute bare minimum amount of data, we encourage implementations to do
the correct thing and get the block header from a by-height index, check
that the merkleroots match, and then calculate the time from the header
information. Providing more data would encourage implementations to cheat.
Remember that the only thing that would invalidate the block height is a
reorg, but in the event of a reorg the merkleroot will be invalid anyway,
so there's no point to recording data in the attestation like the header
itself. At best that would just give us extra confirmation that a reorg
made the attestation invalid; reorgs deep enough to invalidate timestamps are
exceptionally rare events anyway, so better to just tell the user the timestamp
can't be verified rather than add almost-never tested code to handle that case
more gracefully.
"""
TAG = bytes.fromhex('0588960d73d71901')
def __init__(self, height):
self.height = height
def __eq__(self, other):
if other.__class__ is BitcoinBlockHeaderAttestation:
return self.height == other.height
else:
return super().__eq__(other)
def __lt__(self, other):
if other.__class__ is BitcoinBlockHeaderAttestation:
return self.height < other.height
else:
return super().__lt__(other)
def __hash__(self):
return hash(self.height)
def verify_against_blockheader(self, digest, block_header):
"""Verify attestation against a block header
Returns the block time on success; raises VerificationError on failure.
"""
if len(digest) != 32:
raise VerificationError("Expected digest with length 32 bytes; got %d bytes" % len(digest))
elif digest != block_header.hashMerkleRoot:
raise VerificationError("Digest does not match merkleroot")
return block_header.nTime
def __repr__(self):
return 'BitcoinBlockHeaderAttestation(%r)' % self.height
def _serialize_payload(self, ctx):
ctx.write_varuint(self.height)
@classmethod
def deserialize(cls, ctx):
height = ctx.read_varuint()
return BitcoinBlockHeaderAttestation(height)
class LitecoinBlockHeaderAttestation(TimeAttestation):
"""Signed by the Litecoin blockchain
Identical in design to the BitcoinBlockHeaderAttestation.
"""
TAG = bytes.fromhex('06869a0d73d71b45')
def __init__(self, height):
self.height = height
def __eq__(self, other):
if other.__class__ is LitecoinBlockHeaderAttestation:
return self.height == other.height
else:
return super().__eq__(other)
def __lt__(self, other):
if other.__class__ is LitecoinBlockHeaderAttestation:
return self.height < other.height
else:
return super().__lt__(other)
def __hash__(self):
return hash(self.height)
def verify_against_blockheader(self, digest, block_header):
"""Verify attestation against a block header
Not implemented here until there is a well-maintained Litecoin
python library
"""
raise NotImplementedError()
def __repr__(self):
return 'LitecoinBlockHeaderAttestation(%r)' % self.height
def _serialize_payload(self, ctx):
ctx.write_varuint(self.height)
@classmethod
def deserialize(cls, ctx):
height = ctx.read_varuint()
return LitecoinBlockHeaderAttestation(height)
|