File: test_responses.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (188 lines) | stat: -rw-r--r-- 6,224 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import datetime
from collections import OrderedDict
from gzip import compress as gzip_compress
from typing import Any
from unittest import SkipTest, mock

from botocore.awsrequest import AWSPreparedRequest, HTTPHeaders
from freezegun import freeze_time

from moto import settings
from moto.core.responses import BaseResponse
from moto.s3.responses import S3Response


def test_parse_qs_unicode_decode_error() -> None:
    body = b'{"key": "%D0"}, "C": "#0 = :0"}'
    headers = HTTPHeaders()
    headers["foo"] = "bar"
    request = AWSPreparedRequest("GET", "http://request", headers, body, False)
    BaseResponse().setup_class(request, request.url, request.headers)


def test_get_params() -> None:
    subject = BaseResponse()
    subject.querystring = OrderedDict(
        [
            ("Action", ["CreateRule"]),
            ("Version", ["2015-12-01"]),
            (
                "ListenerArn",
                [
                    "arn:aws:elasticloadbalancing:us-east-1:1:listener/my-lb/50dc6c495c0c9188/80139731473870416"
                ],
            ),
            ("Priority", ["100"]),
            ("Conditions.member.1.Field", ["http-header"]),
            ("Conditions.member.1.HttpHeaderConfig.HttpHeaderName", ["User-Agent"]),
            ("Conditions.member.1.HttpHeaderConfig.Values.member.2", ["curl"]),
            ("Conditions.member.1.HttpHeaderConfig.Values.member.1", ["Mozilla"]),
            ("Actions.member.1.FixedResponseConfig.StatusCode", ["200"]),
            ("Actions.member.1.FixedResponseConfig.ContentType", ["text/plain"]),
            ("Actions.member.1.Type", ["fixed-response"]),
        ]
    )

    result = subject._get_params()

    assert result == {
        "Action": "CreateRule",
        "Version": "2015-12-01",
        "ListenerArn": "arn:aws:elasticloadbalancing:us-east-1:1:listener/my-lb/50dc6c495c0c9188/80139731473870416",
        "Priority": "100",
        "Conditions": [
            {
                "Field": "http-header",
                "HttpHeaderConfig": {
                    "HttpHeaderName": "User-Agent",
                    "Values": ["Mozilla", "curl"],
                },
            }
        ],
        "Actions": [
            {
                "Type": "fixed-response",
                "FixedResponseConfig": {
                    "StatusCode": "200",
                    "ContentType": "text/plain",
                },
            }
        ],
    }


def test_response_environment_preserved_by_type() -> None:
    """Ensure Jinja environment is cached by response type."""

    class ResponseA(BaseResponse):
        pass

    class ResponseB(BaseResponse):
        pass

    resp_a = ResponseA()
    another_resp_a = ResponseA()
    resp_b = ResponseB()

    assert resp_a.environment is another_resp_a.environment
    assert resp_b.environment is not resp_a.environment

    source_1 = "template"
    source_2 = "amother template"

    assert not resp_a.contains_template(BaseResponse._make_template_id(source_1))
    resp_a.response_template(source_1)
    assert resp_a.contains_template(BaseResponse._make_template_id(source_1))

    assert not resp_a.contains_template(BaseResponse._make_template_id(source_2))
    resp_a.response_template(source_2)
    assert resp_a.contains_template(BaseResponse._make_template_id(source_2))

    assert not resp_b.contains_template(BaseResponse._make_template_id(source_1))
    assert not resp_b.contains_template(BaseResponse._make_template_id(source_2))

    assert another_resp_a.contains_template(BaseResponse._make_template_id(source_1))
    assert another_resp_a.contains_template(BaseResponse._make_template_id(source_2))

    resp_a_new_instance = ResponseA()
    assert resp_a_new_instance.contains_template(
        BaseResponse._make_template_id(source_1)
    )
    assert resp_a_new_instance.contains_template(
        BaseResponse._make_template_id(source_2)
    )


@mock.patch(
    "moto.core.responses.settings.PRETTIFY_RESPONSES",
    new_callable=mock.PropertyMock(return_value=True),
)
def test_jinja_render_prettify(m_env_var: Any) -> None:  # type: ignore[misc]
    if settings.TEST_SERVER_MODE:
        raise SkipTest(
            "It is not possible to set the environment variable in server mode"
        )
    response = BaseResponse()
    TEMPLATE = """<TestTemplate><ResponseText>Test text</ResponseText></TestTemplate>"""
    expected_output = '<?xml version="1.0" ?>\n<TestTemplate>\n\t<ResponseText>Test text</ResponseText>\n</TestTemplate>'
    template = response.response_template(TEMPLATE)
    xml_string = template.render()
    assert xml_string == expected_output
    assert m_env_var


def test_response_metadata() -> None:
    # Setup
    frozen_time = datetime.datetime(
        2023, 5, 20, 10, 20, 30, tzinfo=datetime.timezone.utc
    )
    request = AWSPreparedRequest("GET", "http://request", HTTPHeaders(), None, False)

    # Execute
    with freeze_time(frozen_time):
        bc = BaseResponse()
        bc.setup_class(request, request.url, request.headers)

    # Verify
    assert "date" in bc.response_headers
    if not settings.TEST_SERVER_MODE:
        assert bc.response_headers["date"] == "Sat, 20 May 2023 10:20:30 GMT"


def test_compression_gzip() -> None:
    body = '{"key": "%D0"}, "C": "#0 = :0"}'
    headers = HTTPHeaders()
    headers["Content-Encoding"] = "gzip"
    request = AWSPreparedRequest(
        "GET",
        url="http://request",
        headers=headers,
        body=_gzip_compress_body(body),
        stream_output=False,
    )
    response = BaseResponse()
    response.setup_class(request, request.url, request.headers)

    assert body == response.body


def test_compression_gzip_in_s3() -> None:
    body = b"some random data"
    headers = HTTPHeaders()
    headers["Content-Encoding"] = "gzip"
    request = AWSPreparedRequest(
        "GET",
        url="http://request",
        headers=headers,
        body=body,
        stream_output=False,
    )
    response = S3Response()
    response.setup_class(request, request.url, request.headers)

    assert body == response.body


def _gzip_compress_body(body: str) -> bytes:
    assert isinstance(body, str)
    return gzip_compress(data=body.encode("utf-8"))