1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
import re
import base64
import sys
from braintree.exceptions.invalid_signature_error import InvalidSignatureError
from braintree.util.crypto import Crypto
from braintree.util.xml_util import XmlUtil
from braintree.webhook_notification import WebhookNotification
if sys.version_info[0] == 2:
text_type = unicode
else:
text_type = str
class WebhookNotificationGateway(object):
def __init__(self, gateway):
self.gateway = gateway
self.config = gateway.config
def parse(self, signature, payload):
if isinstance(payload, text_type):
payload = payload.encode('ascii')
if re.search(b"[^A-Za-z0-9+=/\n]", payload):
raise InvalidSignatureError("payload contains illegal characters")
self.__validate_signature(signature, payload)
attributes = XmlUtil.dict_from_xml(base64.decodestring(payload))
return WebhookNotification(self.gateway, attributes['notification'])
def verify(self, challenge):
digest = Crypto.sha1_hmac_hash(self.config.private_key, challenge)
return "%s|%s" % (self.config.public_key, digest)
def __matching_signature(self, signature_pairs):
for public_key, signature in signature_pairs:
if public_key == self.config.public_key:
return signature
return None
def __validate_signature(self, signature_string, payload):
signature_pairs = [pair.split("|") for pair in signature_string.split("&") if "|" in pair]
signature = self.__matching_signature(signature_pairs)
if not signature:
raise InvalidSignatureError("no matching public key")
if not any(self.__payload_matches(signature, p) for p in [payload, payload + b"\n"]):
raise InvalidSignatureError("signature does not match payload - one has been modified")
def __payload_matches(self, signature, payload):
payload_signature = Crypto.sha1_hmac_hash(self.config.private_key, payload)
return Crypto.secure_compare(payload_signature, signature)
|