File: test_eventstreams.py

package info (click to toggle)
python-aiobotocore 2.25.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,524 kB
  • sloc: python: 15,437; makefile: 84
file content (147 lines) | stat: -rw-r--r-- 4,838 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from contextlib import AsyncExitStack

import anyio
import pytest

from aiobotocore.eventstream import AioEventStream
from aiobotocore.parsers import AioEventStreamXMLParser

# TODO once Moto supports either S3 Select or Kinesis SubscribeToShard then
# this can be tested against a real AWS API


TEST_STREAM_DATA = (
    b'\x00\x00\x00w\x00\x00\x00U5\xd1F\xcd\r:message-type\x07\x00\x05event\x0b:event-'
    b'type\x07\x00\x07Records\r:content-type\x07\x00\x18application/octet-stream{"hel'
    b'lo":"world"}\nF\x0e\x9a2',
    b'\x00\x00\x00\xce\x00\x00\x00C\xdc\xd2\x99\xf9\r:message-type\x07\x00\x05event'
    b'\x0b:event-type\x07\x00\x05Stats\r:content-type\x07\x00\x08text/xml<Stats xml'
    b'ns=""><BytesScanned>19</BytesScanned><BytesProcessed>19</BytesProcessed><Byte'
    b'sReturned>18</BytesReturned></Stats>\x92\xd0?\xa5\x00\x00\x008\x00\x00\x00(\xc1'
    b'\xc6\x84\xd4\r:message-type\x07\x00\x05event\x0b:event-type\x07\x00\x03End\xcf'
    b'\x97\xd3\x92',
)


class FakeStreamReader:
    class ChunkedIterator:
        def __init__(self, chunks):
            self.iter = iter(chunks)

        def __aiter__(self):
            return self

        async def __anext__(self):
            try:
                result = next(self.iter)
                return result, True
            except StopIteration:
                raise StopAsyncIteration()

    def __init__(self, chunks):
        self.chunks = chunks
        self.content = self

    def iter_chunks(self):
        return self.ChunkedIterator(self.chunks)


async def test_eventstream_chunking(s3_client):
    # These are the options passed to the EventStream class
    # during a normal run with botocore.
    operation_name = 'SelectObjectContent'
    outputshape = s3_client._service_model.operation_model(
        operation_name
    ).output_shape.members['Payload']
    parser = AioEventStreamXMLParser()
    sr = FakeStreamReader(TEST_STREAM_DATA)

    event_stream = AioEventStream(sr, outputshape, parser, operation_name)

    events = []
    # {'Records': {'Payload': b'{"hello":"world"}\n'}}
    # {'Stats': {'Details': {'BytesScanned': 19,
    #                        'BytesProcessed': 19,
    #                        'BytesReturned': 18}}}
    # {'End': {}}
    async for event in event_stream:
        events.append(event)

    assert len(events) == 3
    event1, event2, event3 = events

    assert 'Records' in event1
    assert 'Stats' in event2
    assert 'End' in event3


async def test_eventstream_no_iter(s3_client):
    # These are the options passed to the EventStream class
    # during a normal run with botocore.
    operation_name = 'SelectObjectContent'
    outputshape = s3_client._service_model.operation_model(
        operation_name
    ).output_shape.members['Payload']
    parser = AioEventStreamXMLParser()
    sr = FakeStreamReader(TEST_STREAM_DATA)

    event_stream = AioEventStream(sr, outputshape, parser, operation_name)

    with pytest.raises(NotImplementedError):
        for _ in event_stream:
            pass


@pytest.mark.localonly
async def test_kinesis_stream_json_parser(
    exit_stack: AsyncExitStack, kinesis_client, create_stream
):
    # unfortunately moto doesn't support kinesis register_stream_consumer +
    # subscribe_to_shard yet
    stream_name = await create_stream(ShardCount=1)

    describe_response = await kinesis_client.describe_stream(
        StreamName=stream_name
    )

    shard_id = describe_response["StreamDescription"]["Shards"][0]["ShardId"]
    stream_arn = describe_response["StreamDescription"]["StreamARN"]

    consumer_arn = None
    consumer_name = 'consumer'

    # Create some data
    keys = [str(i) for i in range(1, 5)]
    for k in keys:
        await kinesis_client.put_record(
            StreamName=stream_name, Data=k, PartitionKey=k
        )

    register_response = await kinesis_client.register_stream_consumer(
        StreamARN=stream_arn, ConsumerName=consumer_name
    )
    consumer_arn = register_response['Consumer']['ConsumerARN']

    while (
        describe_response := (
            await kinesis_client.describe_stream_consumer(  # noqa: E231, E999, E251, E501
                StreamARN=stream_arn,
                ConsumerName=consumer_name,
                ConsumerARN=consumer_arn,
            )
        )
    ) and describe_response['ConsumerDescription'][
        'ConsumerStatus'
    ] == 'CREATING':
        print("Waiting for stream consumer creation")
        await anyio.sleep(1)

    starting_position = {'Type': 'LATEST'}
    subscribe_response = await kinesis_client.subscribe_to_shard(
        ConsumerARN=consumer_arn,
        ShardId=shard_id,
        StartingPosition=starting_position,
    )
    async for event in subscribe_response['EventStream']:
        assert event['SubscribeToShardEvent']['Records'] == []
        break