File: test_server.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (129 lines) | stat: -rw-r--r-- 4,413 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import re
import threading
import time
from urllib.parse import urlencode

import moto.server as server
from moto.core.utils import utcnow


def test_sqs_list_identities():
    backend = server.create_backend_app("sqs")
    test_client = backend.test_client()
    headers = {
        "Content-Type": "application/x-www-form-urlencoded",
    }
    res = test_client.get("/?Action=ListQueues", headers=headers)
    assert b"</ListQueuesResponse>" in res.data

    # Make sure that we can receive messages from queues whose name contains dots (".")
    # The AWS API mandates that the names of FIFO queues use the suffix ".fifo"
    # See: https://github.com/getmoto/moto/issues/866

    for queue_name in ("testqueue", "otherqueue.fifo"):
        res = test_client.put(
            f"/?Action=CreateQueue&QueueName={queue_name}", headers=headers
        )

        res = test_client.put(
            f"/123/{queue_name}?MessageBody=test-message&Action=SendMessage",
            headers=headers,
        )

        res = test_client.get(
            f"/123/{queue_name}?Action=ReceiveMessage&MaxNumberOfMessages=1",
            headers=headers,
        )

        message = re.search("<Body>(.*?)</Body>", res.data.decode("utf-8")).groups()[0]
        assert message == "test-message"

    res = test_client.get("/?Action=ListQueues&QueueNamePrefix=other", headers=headers)
    assert b"otherqueue.fifo" in res.data
    assert b"testqueue" not in res.data


def test_messages_polling():
    backend = server.create_backend_app("sqs")
    test_client = backend.test_client()
    messages = []
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    test_client.put("/?Action=CreateQueue&QueueName=testqueue", headers=headers)

    def insert_messages():
        messages_count = 5
        while messages_count > 0:
            test_client.put(
                "/123/testqueue?MessageBody=test-message&Action=SendMessage"
                "&Attribute.1.Name=WaitTimeSeconds&Attribute.1.Value=10",
                headers=headers,
            )
            messages_count -= 1
            time.sleep(0.5)

    def get_messages():
        count = 0
        while count < 5:
            msg_res = test_client.get(
                "/123/testqueue?Action=ReceiveMessage&MaxNumberOfMessages=1&WaitTimeSeconds=5",
                headers=headers,
            )
            new_msgs = re.findall("<Body>(.*?)</Body>", msg_res.data.decode("utf-8"))
            count += len(new_msgs)
            messages.append(new_msgs)

    get_messages_thread = threading.Thread(target=get_messages)
    insert_messages_thread = threading.Thread(target=insert_messages)

    get_messages_thread.start()
    insert_messages_thread.start()

    get_messages_thread.join()
    insert_messages_thread.join()

    # got each message in a separate call to ReceiveMessage, despite the long
    # WaitTimeSeconds
    assert len(messages) == 5


def test_no_messages_polling_timeout():
    backend = server.create_backend_app("sqs")
    queue_name = "test-queue"
    test_client = backend.test_client()
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    test_client.put(f"/?Action=CreateQueue&QueueName={queue_name}", headers=headers)
    wait_seconds = 5
    start = utcnow()
    test_client.get(
        f"/123/{queue_name}?Action=ReceiveMessage&"
        f"MaxNumberOfMessages=1&WaitTimeSeconds={wait_seconds}",
        headers=headers,
    )
    end = utcnow()
    duration = end - start
    assert duration.seconds >= wait_seconds
    assert duration.seconds <= wait_seconds + (wait_seconds / 2)


def test_create_queue_with_tags_using_query_protocol():
    backend = server.create_backend_app("sqs")
    queue_name = "test-queue"
    test_client = backend.test_client()
    headers = {
        "Content-Type": "application/x-www-form-urlencoded",
    }
    params = {
        "Action": "CreateQueue",
        "QueueName": queue_name,
        "Tag.1.Key": "foo",
        "Tag.1.Value": "bar",
    }
    resp = test_client.post(headers=headers, data=urlencode(params))
    assert resp.status_code == 200
    assert "<CreateQueueResult>" in resp.data.decode("utf-8")
    params = {
        "Action": "ListQueueTags",
        "QueueUrl": queue_name,
    }
    resp = test_client.post(headers=headers, data=urlencode(params))
    assert "<Tag><Key>foo</Key><Value>bar</Value></Tag>" in resp.data.decode("utf-8")