File: eventstream.py

package info (click to toggle)
python-aiobotocore 2.25.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,524 kB
  • sloc: python: 15,437; makefile: 84
file content (51 lines) | stat: -rw-r--r-- 1,677 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from botocore.eventstream import (
    EventStream,
    EventStreamBuffer,
    NoInitialResponseError,
)
from botocore.exceptions import EventStreamError


class AioEventStream(EventStream):
    def __iter__(self):
        raise NotImplementedError('Use async-for instead')

    def __aiter__(self):
        return self.__anext__()

    async def __anext__(self):
        async for event in self._event_generator:
            parsed_event = await self._parse_event(event)
            if parsed_event:
                yield parsed_event

    async def _create_raw_event_generator(self):
        event_stream_buffer = EventStreamBuffer()
        async for chunk, _ in self._raw_stream.content.iter_chunks():
            event_stream_buffer.add_data(chunk)
            for event in event_stream_buffer:
                yield event  # unfortunately no yield from async func support

    async def _parse_event(self, event):
        response_dict = event.to_response_dict()
        parsed_response = await self._parser.parse(
            response_dict, self._output_shape
        )
        if response_dict['status_code'] == 200:
            return parsed_response
        else:
            raise EventStreamError(parsed_response, self._operation_name)

    async def get_initial_response(self):
        try:
            async for event in self._event_generator:
                event_type = event.headers.get(':event-type')
                if event_type == 'initial-response':
                    return event

                break
        except StopIteration:
            pass
        raise NoInitialResponseError()

    # self._raw_stream.close() is sync so no override needed