File: httplib2_intercept.py

package info (click to toggle)
python-wsgi-intercept 1.13.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 560 kB
  • sloc: python: 1,390; makefile: 56; sh: 5
file content (54 lines) | stat: -rw-r--r-- 1,962 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""Intercept HTTP connections that use
`httplib2 <https://github.com/jcgregorio/httplib2>`_.
"""

import sys

from httplib2 import (SCHEME_TO_CONNECTION, HTTPConnectionWithTimeout,
                      HTTPSConnectionWithTimeout)

from . import (HTTPConnection, HTTPSConnection, WSGI_HTTPConnection,
               WSGI_HTTPSConnection)

HTTPInterceptorMixin = WSGI_HTTPConnection
HTTPSInterceptorMixin = WSGI_HTTPSConnection


class HTTP_WSGIInterceptorWithTimeout(HTTPInterceptorMixin,
        HTTPConnectionWithTimeout):
    def __init__(self, host, port=None, strict=None, timeout=None,
            proxy_info=None, source_address=None):

        # In Python3 strict is deprecated
        if sys.version_info[0] < 3:
            HTTPConnection.__init__(self, host, port=port, strict=strict,
                    timeout=timeout, source_address=source_address)
        else:
            HTTPConnection.__init__(self, host, port=port,
                    timeout=timeout, source_address=source_address)


class HTTPS_WSGIInterceptorWithTimeout(HTTPSInterceptorMixin,
        HTTPSConnectionWithTimeout):
    def __init__(self, host, port=None, strict=None, timeout=None,
            proxy_info=None, ca_certs=None, source_address=None,
            **kwargs):

        # ignore proxy_info and ca_certs
        # In Python3 strict is deprecated
        if sys.version_info[0] < 3:
            HTTPSConnection.__init__(self, host, port=port, strict=strict,
                    timeout=timeout, source_address=source_address)
        else:
            HTTPSConnection.__init__(self, host, port=port,
                    timeout=timeout, source_address=source_address)


def install():
    SCHEME_TO_CONNECTION['http'] = HTTP_WSGIInterceptorWithTimeout
    SCHEME_TO_CONNECTION['https'] = HTTPS_WSGIInterceptorWithTimeout


def uninstall():
    SCHEME_TO_CONNECTION['http'] = HTTPConnectionWithTimeout
    SCHEME_TO_CONNECTION['https'] = HTTPSConnectionWithTimeout