File: raw_response.py

package info (click to toggle)
python-requests-cache 1.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 6,024 kB
  • sloc: python: 7,029; makefile: 4
file content (143 lines) | stat: -rw-r--r-- 5,416 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from io import BytesIO
from logging import getLogger
from typing import TYPE_CHECKING, Optional

from attrs import define, field, fields_dict
from requests import Response
from urllib3.response import (  # type: ignore  # import location false positive
    HTTPHeaderDict,
    HTTPResponse,
    is_fp_closed,
)

from . import RichMixin

logger = getLogger(__name__)


if TYPE_CHECKING:
    from . import CachedResponse


@define(auto_attribs=False, repr=False, slots=False)
class CachedHTTPResponse(RichMixin, HTTPResponse):
    """A wrapper class that emulates :py:class:`~urllib3.response.HTTPResponse`.

    This enables consistent behavior for streaming requests and generator usage in the following
    cases:
    * On an original response, after reading its content to write to the cache
    * On a cached response
    """

    decode_content: bool = field(default=None)
    headers: HTTPHeaderDict = field(factory=HTTPHeaderDict)
    reason: str = field(default=None)
    request_url: str = field(default=None)
    status: int = field(default=0)
    version: int = field(default=0)

    def __init__(self, body: Optional[bytes] = None, **kwargs):
        """First initialize via HTTPResponse.__init__, then via __attrs_init__"""
        kwargs = {k: v for k, v in kwargs.items() if v is not None}
        super().__init__(body=BytesIO(body or b''), preload_content=False, **kwargs)
        self._body = body
        self._fp_bytes_read = 0
        self.length_remaining = len(body or b'')
        self.__attrs_init__(**kwargs)  # type: ignore # False positive in mypy 0.920+?

    @classmethod
    def from_response(cls, response: Response) -> 'CachedHTTPResponse':
        """Create a CachedHTTPResponse based on an original response, and restore the response to
        its previous state.
        """
        # Get basic attributes
        kwargs = {k: getattr(response.raw, k, None) for k in fields_dict(cls).keys()}
        # Init kwarg for HTTPResponse._request_url has no leading '_'
        kwargs['request_url'] = response.raw._request_url
        kwargs['body'] = _copy_body(response)
        return cls(**kwargs)  # type: ignore  # False positive in mypy 0.920+?

    @classmethod
    def from_cached_response(cls, response: 'CachedResponse'):
        """Create a CachedHTTPResponse based on a cached response"""
        obj = cls(
            headers=HTTPHeaderDict(response.headers),
            reason=response.reason,
            status=response.status_code,
            request_url=response.request.url,
        )
        obj.reset(response._content)
        return obj

    @property
    def _request_url(self) -> str:
        """For compatibility with urllib3"""
        return self.request_url

    @_request_url.setter
    def _request_url(self, value: str):
        self.request_url = value

    def release_conn(self):
        """No-op for compatibility"""

    def read(self, amt=None, decode_content=None, **kwargs):
        """Simplified reader for cached content that emulates
        :py:meth:`urllib3.response.HTTPResponse.read()`, but does not need to read from a socket
        or decode content.
        """
        if 'Content-Encoding' in self.headers and decode_content is False:
            logger.warning('read(decode_content=False) is not supported for cached responses')
        if is_fp_closed(self._fp):
            return b''

        data = self._fp.read(amt)
        if data:
            self._fp_bytes_read += len(data)
            if self.length_remaining is not None:
                self.length_remaining -= len(data)
        # "close" the file to inform consumers to stop reading from it
        else:
            self._fp.close()
        return data

    def reset(self, body: Optional[bytes] = None):
        """Reset raw response file pointer, and optionally update content"""
        _reset_fp(self, body or self._body)

    def stream(self, amt=None, **kwargs):
        """Simplified generator over cached content that emulates
        :py:meth:`urllib3.response.HTTPResponse.stream()`
        """
        while not self._fp.closed:
            yield self.read(amt=amt, **kwargs)


def _copy_body(response: Response) -> Optional[bytes]:
    """Read and copy raw response data, and then restore response object to its previous state.
    This is necessary so streaming responses behave consistently with or without the cache.
    """
    # File pointer is missing or closed; nothing to do
    if not getattr(response.raw, '_fp', None) or is_fp_closed(response.raw._fp):
        return None
    # Body has already been read & decoded by requests
    elif getattr(response.raw, '_has_decoded_content', False):
        body = response.content
    # Body has not yet been read
    else:
        body = response.raw.read(decode_content=False)
        _reset_fp(response.raw, body)
        _ = response.content  # This property reads, decodes, and stores response content

    # After reading, reset file pointer once more so client can still read it as a stream
    _reset_fp(response.raw, body)
    return body


def _reset_fp(raw: HTTPResponse, body: Optional[bytes] = None):
    """Set content and reset raw response file pointer"""
    body = body or b''
    raw._body = body  # type: ignore[attr-defined]
    raw._fp_bytes_read = 0  # type: ignore[attr-defined]
    raw._fp = BytesIO(body)  # type: ignore[attr-defined]
    raw.length_remaining = len(body)