File: http_request.py

package info (click to toggle)
chargebee-python 1.6.6-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 484 kB
  • sloc: python: 1,008; makefile: 6; sh: 3
file content (78 lines) | stat: -rw-r--r-- 2,751 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import base64

from chargebee import APIError,PaymentError,InvalidRequestError,OperationFailedError, compat
from chargebee.main import ChargeBee
from chargebee.main import Environment
from chargebee.version import VERSION

def _basic_auth_str(username):
    return 'Basic ' + base64.b64encode(('%s:' % username).encode('latin1')).strip().decode('latin1')


def request(method, url, env, params=None, headers=None):
    if not env:
        raise Exception('No environment configured.')
    if headers is None:
        headers = {}

    url = env.api_url(url)
    if method.lower() in ('get', 'head', 'delete'):
        url = '%s?%s' % (url, compat.urlencode(params))
        payload = None
    else:
        payload = compat.urlencode(params)
        headers['Content-type'] = 'application/x-www-form-urlencoded'

    headers.update({
        'User-Agent': 'ChargeBee-Python-Client v%s' % VERSION,
        'Accept': 'application/json',
        'Authorization': _basic_auth_str(env.api_key),
    })

    meta = compat.urlparse(url)
    if ChargeBee.verify_ca_certs:
        connection = compat.VerifiedHTTPSConnection(meta.netloc)
        connection.set_cert(ca_certs=ChargeBee.ca_cert_path)
    else:
        if Environment.protocol == "https":
            connection = compat.HTTPSConnection(meta.netloc)
        else:
            connection = compat.HTTPConnection(meta.netloc)    
        
    connection.request(method.upper(), meta.path + '?' + meta.query, payload, headers)
    try:
        response = connection.getresponse()
        data = response.read()
        if compat.py_major_v >= 3:
            data = data.decode('utf-8')

        return process_response(url,data, response.status)
    finally:
        connection.close()


def process_response(url,response, http_code):
    try:
        resp_json = compat.json.loads(response)
    except Exception as ex:     
        raise Exception("Response not in JSON format. Probably not a chargebee error. \n URL is " + url + "\n Content is \n" + response)
    if http_code < 200 or http_code > 299:
        handle_api_resp_error(url,http_code, resp_json)

    return resp_json


def handle_api_resp_error(url,http_code, resp_json):
    if 'api_error_code' not in resp_json:
        raise Exception("The api_error_code is not present. Probably not a chargebee error. \n URL is " + url + "\nContent is \n " + str(resp_json))

    if 'payment' == resp_json.get('type'):
        raise PaymentError(http_code, resp_json)
    elif 'operation_failed' == resp_json.get('type'):
        raise OperationFailedError(http_code, resp_json)
    elif 'invalid_request' == resp_json.get('type'):
        raise InvalidRequestError(http_code, resp_json)
    else:
        raise APIError(http_code, resp_json)