1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
# Copyright (C) 2016-2018 The OpenTimestamps developers
#
# This file is part of python-opentimestamps.
#
# It is subject to the license terms in the LICENSE file found in the top-level
# directory of this distribution.
#
# No part of python-opentimestamps including this file, may be copied,
# modified, propagated, or distributed except according to the terms contained
# in the LICENSE file.
import binascii
import urllib.request
import fnmatch
from urllib.parse import urljoin
from opentimestamps.core.timestamp import Timestamp
from opentimestamps.core.serialize import BytesDeserializationContext
def get_sanitised_resp_msg(exp):
"""Get the sanitise response messages from a calendar response
Returns the sanitised message, with any character not in the whitelist replaced by '_'
"""
# Note how new lines are _not_ allowed: this is important, as otherwise the
# message could include a second line pretending to be something else.
WHITELIST = b'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789#-.,; '
# Two lines of text
raw_msg = bytearray(exp.read(160))
for i in range(len(raw_msg)):
if raw_msg[i] not in WHITELIST:
raw_msg[i] = ord('_')
return raw_msg.decode()
class CommitmentNotFoundError(KeyError):
def __init__(self, reason):
super().__init__(reason)
self.reason = reason
class RemoteCalendar:
"""Remote calendar server interface"""
def __init__(self, url, user_agent="python-opentimestamps"):
if not isinstance(url, str):
raise TypeError("URL must be a string")
self.url = url
self.request_headers = {"Accept": "application/vnd.opentimestamps.v1",
"User-Agent": user_agent}
def submit(self, digest, timeout=None):
"""Submit a digest to the calendar
Returns a Timestamp committing to that digest
"""
req = urllib.request.Request(urljoin(self.url, 'digest'), data=digest, headers=self.request_headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status != 200:
raise Exception("Unknown response from calendar: %d" % resp.status)
# FIXME: Not a particularly nice way of handling this, but it'll do
# the job for now.
resp_bytes = resp.read(10000)
if len(resp_bytes) > 10000:
raise Exception("Calendar response exceeded size limit")
ctx = BytesDeserializationContext(resp_bytes)
return Timestamp.deserialize(ctx, digest)
def get_timestamp(self, commitment, timeout=None):
"""Get a timestamp for a given commitment
Raises KeyError if the calendar doesn't have that commitment
"""
req = urllib.request.Request(
urljoin(self.url, 'timestamp/' + binascii.hexlify(commitment).decode('utf8')),
headers=self.request_headers)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
if resp.status == 200:
# FIXME: Not a particularly nice way of handling this, but it'll do
# the job for now.
resp_bytes = resp.read(10000)
if len(resp_bytes) > 10000:
raise Exception("Calendar response exceeded size limit")
ctx = BytesDeserializationContext(resp_bytes)
return Timestamp.deserialize(ctx, commitment)
else:
raise Exception("Unknown response from calendar: %d" % resp.status)
except urllib.error.HTTPError as exp:
if exp.code == 404:
raise CommitmentNotFoundError(get_sanitised_resp_msg(exp))
else:
raise exp
class UrlWhitelist(set):
"""Glob-matching whitelist for URL's"""
def __init__(self, urls=()):
for url in urls:
self.add(url)
def add(self, url):
if not isinstance(url, str):
raise TypeError("URL must be a string")
if url.startswith('http://') or url.startswith('https://'):
parsed_url = urllib.parse.urlparse(url)
# FIXME: should have a more friendly error message
assert not parsed_url.params and not parsed_url.query and not parsed_url.fragment
set.add(self, parsed_url)
else:
self.add('http://' + url)
self.add('https://' + url)
def __contains__(self, url):
parsed_url = urllib.parse.urlparse(url)
# FIXME: probably should tell user why...
if parsed_url.params or parsed_url.query or parsed_url.fragment:
return False
for pattern in self:
if (parsed_url.scheme == pattern.scheme and
parsed_url.path == pattern.path and
fnmatch.fnmatch(parsed_url.netloc, pattern.netloc)):
return True
else:
return False
def __repr__(self):
return 'UrlWhitelist([%s])' % ','.join("'%s'" % url.geturl() for url in self)
DEFAULT_CALENDAR_WHITELIST = \
UrlWhitelist(['https://*.calendar.opentimestamps.org', # Run by Peter Todd
'https://*.calendar.eternitywall.com', # Run by Riccardo Casatta of Eternity Wall
'https://*.calendar.catallaxy.com', # Run by Vincent Cloutier of Catallaxy
])
DEFAULT_AGGREGATORS = \
('https://a.pool.opentimestamps.org',
'https://b.pool.opentimestamps.org',
'https://a.pool.eternitywall.com',
'https://ots.btc.catallaxy.com')
|