1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
import hmac
import json
import time
from collections import OrderedDict
from hashlib import sha256
# Used for global variables
import stripe # noqa: IMP101
from stripe._event import Event
from stripe import _util
from stripe._error import SignatureVerificationError
from stripe._api_requestor import _APIRequestor
class Webhook(object):
DEFAULT_TOLERANCE = 300
@staticmethod
def construct_event(
payload, sig_header, secret, tolerance=DEFAULT_TOLERANCE, api_key=None
):
if hasattr(payload, "decode"):
payload = payload.decode("utf-8")
WebhookSignature.verify_header(payload, sig_header, secret, tolerance)
data = json.loads(payload, object_pairs_hook=OrderedDict)
event = Event._construct_from(
values=data,
requestor=_APIRequestor._global_with_options(
api_key=api_key or stripe.api_key
),
api_mode="V1",
)
return event
class WebhookSignature(object):
EXPECTED_SCHEME = "v1"
@staticmethod
def _compute_signature(payload, secret):
mac = hmac.new(
secret.encode("utf-8"),
msg=payload.encode("utf-8"),
digestmod=sha256,
)
return mac.hexdigest()
@staticmethod
def _get_timestamp_and_signatures(header, scheme):
list_items = [i.split("=", 2) for i in header.split(",")]
timestamp = int([i[1] for i in list_items if i[0] == "t"][0])
signatures = [i[1] for i in list_items if i[0] == scheme]
return timestamp, signatures
@classmethod
def verify_header(cls, payload, header, secret, tolerance=None):
try:
timestamp, signatures = cls._get_timestamp_and_signatures(
header, cls.EXPECTED_SCHEME
)
except Exception:
raise SignatureVerificationError(
"Unable to extract timestamp and signatures from header",
header,
payload,
)
if not signatures:
raise SignatureVerificationError(
"No signatures found with expected scheme "
"%s" % cls.EXPECTED_SCHEME,
header,
payload,
)
signed_payload = "%d.%s" % (timestamp, payload)
expected_sig = cls._compute_signature(signed_payload, secret)
if not any(_util.secure_compare(expected_sig, s) for s in signatures):
raise SignatureVerificationError(
"No signatures found matching the expected signature for "
"payload",
header,
payload,
)
if tolerance and timestamp < time.time() - tolerance:
raise SignatureVerificationError(
"Timestamp outside the tolerance zone (%d)" % timestamp,
header,
payload,
)
return True
|