File: webhook_notification_gateway.py

package info (click to toggle)
python-braintree 3.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,376 kB
  • ctags: 1,998
  • sloc: python: 13,634; makefile: 73; sh: 8
file content (48 lines) | stat: -rw-r--r-- 2,051 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import re
import base64
import sys
from braintree.exceptions.invalid_signature_error import InvalidSignatureError
from braintree.util.crypto import Crypto
from braintree.util.xml_util import XmlUtil
from braintree.webhook_notification import WebhookNotification

if sys.version_info[0] == 2:
    text_type = unicode
else:
    text_type = str

class WebhookNotificationGateway(object):
    def __init__(self, gateway):
        self.gateway = gateway
        self.config = gateway.config

    def parse(self, signature, payload):
        if isinstance(payload, text_type):
            payload = payload.encode('ascii')
        if re.search(b"[^A-Za-z0-9+=/\n]", payload):
            raise InvalidSignatureError("payload contains illegal characters")
        self.__validate_signature(signature, payload)
        attributes = XmlUtil.dict_from_xml(base64.decodestring(payload))
        return WebhookNotification(self.gateway, attributes['notification'])

    def verify(self, challenge):
        digest = Crypto.sha1_hmac_hash(self.config.private_key, challenge)
        return "%s|%s" % (self.config.public_key, digest)

    def __matching_signature(self, signature_pairs):
        for public_key, signature in signature_pairs:
            if public_key == self.config.public_key:
                return signature
        return None

    def __validate_signature(self, signature_string, payload):
        signature_pairs = [pair.split("|") for pair in signature_string.split("&") if "|" in pair]
        signature = self.__matching_signature(signature_pairs)
        if not signature:
            raise InvalidSignatureError("no matching public key")
        if not any(self.__payload_matches(signature, p) for p in [payload, payload + b"\n"]):
            raise InvalidSignatureError("signature does not match payload - one has been modified")

    def __payload_matches(self, signature, payload):
        payload_signature = Crypto.sha1_hmac_hash(self.config.private_key, payload)
        return Crypto.secure_compare(payload_signature, signature)