File: meter_event_stream.py

package info (click to toggle)
python-stripe 12.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,864 kB
  • sloc: python: 157,573; makefile: 13; sh: 9
file content (56 lines) | stat: -rw-r--r-- 1,876 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""
meter_event_stream.py - use the high-throughput meter event stream to report create billing meter events.

In this example, we:
  - create a meter event session and store the session's authentication token
  - define an event with a payload
  - use the meter_event_stream service accessor in StripeClient to create an event stream that reports this event

This example expects a billing meter with an event_name of 'alpaca_ai_tokens'.  If you have
a different meter event name, you can change it before running this example.
"""

from datetime import datetime, timezone
import stripe

# Global variable for the meter event session
meter_event_session = None


def refresh_meter_event_session(api_key):
    global meter_event_session

    # Check if the session is None or expired
    if meter_event_session is None or datetime.fromisoformat(
        meter_event_session["expires_at"]
    ) <= datetime.now(timezone.utc):
        # Create a new meter event session if the existing session has expired
        client = stripe.StripeClient(api_key)
        meter_event_session = client.v2.billing.meter_event_session.create()


def send_meter_event(meter_event, api_key):
    # Refresh the meter event session if necessary
    refresh_meter_event_session(api_key)
    if not meter_event_session:
        raise RuntimeError("Unable to refresh meter event session")

    # Create a meter event with the current session's authentication token
    client = stripe.StripeClient(meter_event_session["authentication_token"])
    client.v2.billing.meter_event_stream.create(
        params={"events": [meter_event]}
    )


# Set your API key here
api_key = "{{API_KEY}}"
customer_id = "{{CUSTOMER_ID}}"

# Send meter event
send_meter_event(
    {
        "event_name": "alpaca_ai_tokens",
        "payload": {"stripe_customer_id": customer_id, "value": "25"},
    },
    api_key,
)