File: __init__.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (363 lines) | stat: -rw-r--r-- 12,559 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
"""Stubs for patching HTTP and HTTPS requests"""

import logging
import six
from six.moves.http_client import HTTPConnection, HTTPSConnection, HTTPResponse
from six import BytesIO
from vcr.request import Request
from vcr.errors import CannotOverwriteExistingCassetteException
from . import compat

log = logging.getLogger(__name__)


class VCRFakeSocket(object):
    """
    A socket that doesn't do anything!
    Used when playing back cassettes, when there
    is no actual open socket.
    """

    def close(self):
        pass

    def settimeout(self, *args, **kwargs):
        pass

    def fileno(self):
        """
        This is not very good.  requests will watch
        this descriptor and make sure it's not closed.
        Return file descriptor 0 since that's stdin.
        """
        return 0  # wonder how bad this is....


def parse_headers(header_list):
    """
    Convert headers from our serialized dict with lists for keys to a
    HTTPMessage
    """
    header_string = b""
    for key, values in header_list.items():
        for v in values:
            header_string += key.encode("utf-8") + b":" + v.encode("utf-8") + b"\r\n"
    return compat.get_httpmessage(header_string)


def serialize_headers(response):
    out = {}
    for key, values in compat.get_headers(response.msg):
        out.setdefault(key, [])
        out[key].extend(values)
    return out


class VCRHTTPResponse(HTTPResponse):
    """
    Stub response class that gets returned instead of a HTTPResponse
    """

    def __init__(self, recorded_response):
        self.fp = None
        self.recorded_response = recorded_response
        self.reason = recorded_response["status"]["message"]
        self.status = self.code = recorded_response["status"]["code"]
        self.version = None
        self._content = BytesIO(self.recorded_response["body"]["string"])
        self._closed = False

        headers = self.recorded_response["headers"]
        # Since we are loading a response that has already been serialized, our
        # response is no longer chunked.  That means we don't want any
        # libraries trying to process a chunked response.  By removing the
        # transfer-encoding: chunked header, this should cause the downstream
        # libraries to process this as a non-chunked response.
        te_key = [h for h in headers.keys() if h.upper() == "TRANSFER-ENCODING"]
        if te_key:
            del headers[te_key[0]]
        self.headers = self.msg = parse_headers(headers)

        self.length = compat.get_header(self.msg, "content-length") or None

    @property
    def closed(self):
        # in python3, I can't change the value of self.closed.  So I'
        # twiddling self._closed and using this property to shadow the real
        # self.closed from the superclas
        return self._closed

    def read(self, *args, **kwargs):
        return self._content.read(*args, **kwargs)

    def readall(self):
        return self._content.readall()

    def readinto(self, *args, **kwargs):
        return self._content.readinto(*args, **kwargs)

    def readline(self, *args, **kwargs):
        return self._content.readline(*args, **kwargs)

    def readlines(self, *args, **kwargs):
        return self._content.readlines(*args, **kwargs)

    def seekable(self):
        return self._content.seekable()

    def tell(self):
        return self._content.tell()

    def isatty(self):
        return self._content.isatty()

    def seek(self, *args, **kwargs):
        return self._content.seek(*args, **kwargs)

    def close(self):
        self._closed = True
        return True

    def getcode(self):
        return self.status

    def isclosed(self):
        return self.closed

    def info(self):
        return parse_headers(self.recorded_response["headers"])

    def getheaders(self):
        message = parse_headers(self.recorded_response["headers"])
        return list(compat.get_header_items(message))

    def getheader(self, header, default=None):
        values = [v for (k, v) in self.getheaders() if k.lower() == header.lower()]

        if values:
            return ", ".join(values)
        else:
            return default

    def readable(self):
        return self._content.readable()


class VCRConnection(object):
    # A reference to the cassette that's currently being patched in
    cassette = None

    def _port_postfix(self):
        """
        Returns empty string for the default port and ':port' otherwise
        """
        port = self.real_connection.port
        default_port = {"https": 443, "http": 80}[self._protocol]
        return ":{}".format(port) if port != default_port else ""

    def _uri(self, url):
        """Returns request absolute URI"""
        if url and not url.startswith("/"):
            # Then this must be a proxy request.
            return url
        uri = "{}://{}{}{}".format(self._protocol, self.real_connection.host, self._port_postfix(), url)
        log.debug("Absolute URI: %s", uri)
        return uri

    def _url(self, uri):
        """Returns request selector url from absolute URI"""
        prefix = "{}://{}{}".format(self._protocol, self.real_connection.host, self._port_postfix())
        return uri.replace(prefix, "", 1)

    def request(self, method, url, body=None, headers=None, *args, **kwargs):
        """Persist the request metadata in self._vcr_request"""
        self._vcr_request = Request(method=method, uri=self._uri(url), body=body, headers=headers or {})
        log.debug("Got {}".format(self._vcr_request))

        # Note: The request may not actually be finished at this point, so
        # I'm not sending the actual request until getresponse().  This
        # allows me to compare the entire length of the response to see if it
        # exists in the cassette.

        self._sock = VCRFakeSocket()

    def putrequest(self, method, url, *args, **kwargs):
        """
        httplib gives you more than one way to do it.  This is a way
        to start building up a request.  Usually followed by a bunch
        of putheader() calls.
        """
        self._vcr_request = Request(method=method, uri=self._uri(url), body="", headers={})
        log.debug("Got {}".format(self._vcr_request))

    def putheader(self, header, *values):
        self._vcr_request.headers[header] = values

    def send(self, data):
        """
        This method is called after request(), to add additional data to the
        body of the request.  So if that happens, let's just append the data
        onto the most recent request in the cassette.
        """
        self._vcr_request.body = self._vcr_request.body + data if self._vcr_request.body else data

    def close(self):
        # Note: the real connection will only close if it's open, so
        # no need to check that here.
        self.real_connection.close()

    def endheaders(self, message_body=None):
        """
        Normally, this would actually send the request to the server.
        We are not sending the request until getting the response,
        so bypass this part and just append the message body, if any.
        """
        if message_body is not None:
            self._vcr_request.body = message_body

    def getresponse(self, _=False, **kwargs):
        """Retrieve the response"""
        # Check to see if the cassette has a response for this request. If so,
        # then return it
        if self.cassette.can_play_response_for(self._vcr_request):
            log.info("Playing response for {} from cassette".format(self._vcr_request))
            response = self.cassette.play_response(self._vcr_request)
            return VCRHTTPResponse(response)
        else:
            if self.cassette.write_protected and self.cassette.filter_request(self._vcr_request):
                raise CannotOverwriteExistingCassetteException(
                    cassette=self.cassette, failed_request=self._vcr_request
                )

            # Otherwise, we should send the request, then get the response
            # and return it.

            log.info("{} not in cassette, sending to real server".format(self._vcr_request))
            # This is imported here to avoid circular import.
            # TODO(@IvanMalison): Refactor to allow normal import.
            from vcr.patch import force_reset

            with force_reset():
                self.real_connection.request(
                    method=self._vcr_request.method,
                    url=self._url(self._vcr_request.uri),
                    body=self._vcr_request.body,
                    headers=self._vcr_request.headers,
                )

            # get the response
            response = self.real_connection.getresponse()

            # put the response into the cassette
            response = {
                "status": {"code": response.status, "message": response.reason},
                "headers": serialize_headers(response),
                "body": {"string": response.read()},
            }
            self.cassette.append(self._vcr_request, response)
        return VCRHTTPResponse(response)

    def set_debuglevel(self, *args, **kwargs):
        self.real_connection.set_debuglevel(*args, **kwargs)

    def connect(self, *args, **kwargs):
        """
        httplib2 uses this.  Connects to the server I'm assuming.

        Only pass to the baseclass if we don't have a recorded response
        and are not write-protected.
        """

        if hasattr(self, "_vcr_request") and self.cassette.can_play_response_for(self._vcr_request):
            # We already have a response we are going to play, don't
            # actually connect
            return

        if self.cassette.write_protected:
            # Cassette is write-protected, don't actually connect
            return

        from vcr.patch import force_reset

        with force_reset():
            return self.real_connection.connect(*args, **kwargs)

        self._sock = VCRFakeSocket()

    @property
    def sock(self):
        if self.real_connection.sock:
            return self.real_connection.sock
        return self._sock

    @sock.setter
    def sock(self, value):
        if self.real_connection.sock:
            self.real_connection.sock = value

    def __init__(self, *args, **kwargs):
        if six.PY3:
            kwargs.pop("strict", None)  # apparently this is gone in py3

        # need to temporarily reset here because the real connection
        # inherits from the thing that we are mocking out.  Take out
        # the reset if you want to see what I mean :)
        from vcr.patch import force_reset

        with force_reset():
            self.real_connection = self._baseclass(*args, **kwargs)

        self._sock = None

    def __setattr__(self, name, value):
        """
        We need to define this because any attributes that are set on the
        VCRConnection need to be propogated to the real connection.

        For example, urllib3 will set certain attributes on the connection,
        such as 'ssl_version'. These attributes need to get set on the real
        connection to have the correct and expected behavior.

        TODO: Separately setting the attribute on the two instances is not
        ideal. We should switch to a proxying implementation.
        """
        try:
            setattr(self.real_connection, name, value)
        except AttributeError:
            # raised if real_connection has not been set yet, such as when
            # we're setting the real_connection itself for the first time
            pass

        super(VCRConnection, self).__setattr__(name, value)

    def __getattr__(self, name):
        """
        Send requests for weird attributes up to the real connection
        (counterpart to __setattr above)
        """
        if self.__dict__.get("real_connection"):
            # check in case real_connection has not been set yet, such as when
            # we're setting the real_connection itself for the first time
            return getattr(self.real_connection, name)

        return super(VCRConnection, self).__getattr__(name)


for k, v in HTTPConnection.__dict__.items():
    if isinstance(v, staticmethod):
        setattr(VCRConnection, k, v)


class VCRHTTPConnection(VCRConnection):
    """A Mocked class for HTTP requests"""

    _baseclass = HTTPConnection
    _protocol = "http"


class VCRHTTPSConnection(VCRConnection):
    """A Mocked class for HTTPS requests"""

    _baseclass = HTTPSConnection
    _protocol = "https"
    is_verified = True