File: thinevent_webhook_handler.py

package info (click to toggle)
python-stripe 12.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,864 kB
  • sloc: python: 157,573; makefile: 13; sh: 9
file content (53 lines) | stat: -rw-r--r-- 1,660 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
"""
thinevent_webhook_handler.py - receive and process thin events like the
v1.billing.meter.error_report_triggered event.

In this example, we:
    - create a StripeClient called client
    - use client.parse_thin_event to parse the received thin event webhook body
    - call client.v2.core.events.retrieve to retrieve the full event object
    - if it is a V1BillingMeterErrorReportTriggeredEvent event type, call
      event.fetchRelatedObject to retrieve the Billing Meter object associated
      with the event.
"""

import os
from stripe import StripeClient
from stripe.events import V1BillingMeterErrorReportTriggeredEvent

from flask import Flask, request, jsonify

app = Flask(__name__)
api_key = os.environ.get("STRIPE_API_KEY")
webhook_secret = os.environ.get("WEBHOOK_SECRET")

client = StripeClient(api_key)


@app.route("/webhook", methods=["POST"])
def webhook():
    webhook_body = request.data
    sig_header = request.headers.get("Stripe-Signature")

    try:
        thin_event = client.parse_thin_event(
            webhook_body, sig_header, webhook_secret
        )

        # Fetch the event data to understand the failure
        event = client.v2.core.events.retrieve(thin_event.id)
        if isinstance(event, V1BillingMeterErrorReportTriggeredEvent):
            meter = event.fetch_related_object()
            meter_id = meter.id
            print("Success! " + str(meter_id))

            # Record the failures and alert your team
            # Add your logic here

        return jsonify(success=True), 200
    except Exception as e:
        return jsonify(error=str(e)), 400


if __name__ == "__main__":
    app.run(port=4242)