File: parsers.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (128 lines) | stat: -rw-r--r-- 4,902 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from botocore.parsers import (
    LOG,
    EC2QueryParser,
    JSONParser,
    NoInitialResponseError,
    QueryParser,
    ResponseParserError,
    ResponseParserFactory,
    RestJSONParser,
    RestXMLParser,
    lowercase_dict,
)

from .eventstream import AioEventStream


class AioResponseParserFactory(ResponseParserFactory):
    def create_parser(self, protocol_name):
        parser_cls = PROTOCOL_PARSERS[protocol_name]
        return parser_cls(**self._defaults)


def create_parser(protocol):
    return AioResponseParserFactory().create_parser(protocol)


class AioQueryParser(QueryParser):
    def _create_event_stream(self, response, shape):
        parser = self._event_stream_parser
        name = response['context'].get('operation_name')
        return AioEventStream(response['body'], shape, parser, name)


class AioEC2QueryParser(EC2QueryParser):
    def _create_event_stream(self, response, shape):
        parser = self._event_stream_parser
        name = response['context'].get('operation_name')
        return AioEventStream(response['body'], shape, parser, name)


class AioJSONParser(JSONParser):
    async def _do_parse(self, response, shape):
        parsed = {}
        if shape is not None:
            event_name = shape.event_stream_name
            if event_name:
                parsed = await self._handle_event_stream(
                    response, shape, event_name
                )
            else:
                parsed = self._handle_json_body(response['body'], shape)
        self._inject_response_metadata(parsed, response['headers'])
        return parsed

    def _create_event_stream(self, response, shape):
        parser = self._event_stream_parser
        name = response['context'].get('operation_name')
        return AioEventStream(response['body'], shape, parser, name)

    async def _handle_event_stream(self, response, shape, event_name):
        event_stream_shape = shape.members[event_name]
        event_stream = self._create_event_stream(response, event_stream_shape)
        try:
            event = await event_stream.get_initial_response()
        except NoInitialResponseError:
            error_msg = 'First event was not of type initial-response'
            raise ResponseParserError(error_msg)
        parsed = self._handle_json_body(event.payload, shape)
        parsed[event_name] = event_stream
        return parsed

    # this is actually from ResponseParser however for now JSONParser is the
    # only class that needs this async
    async def parse(self, response, shape):
        LOG.debug('Response headers: %s', response['headers'])
        LOG.debug('Response body:\n%s', response['body'])
        if response['status_code'] >= 301:
            if self._is_generic_error_response(response):
                parsed = self._do_generic_error_parse(response)
            elif self._is_modeled_error_shape(shape):
                parsed = self._do_modeled_error_parse(response, shape)
                # We don't want to decorate the modeled fields with metadata
                return parsed
            else:
                parsed = self._do_error_parse(response, shape)
        else:
            parsed = await self._do_parse(response, shape)

        # We don't want to decorate event stream responses with metadata
        if shape and shape.serialization.get('eventstream'):
            return parsed

        # Add ResponseMetadata if it doesn't exist and inject the HTTP
        # status code and headers from the response.
        if isinstance(parsed, dict):
            response_metadata = parsed.get('ResponseMetadata', {})
            response_metadata['HTTPStatusCode'] = response['status_code']
            # Ensure that the http header keys are all lower cased. Older
            # versions of urllib3 (< 1.11) would unintentionally do this for us
            # (see urllib3#633). We need to do this conversion manually now.
            headers = response['headers']
            response_metadata['HTTPHeaders'] = lowercase_dict(headers)
            parsed['ResponseMetadata'] = response_metadata
            self._add_checksum_response_metadata(response, response_metadata)
        return parsed


class AioRestJSONParser(RestJSONParser):
    def _create_event_stream(self, response, shape):
        parser = self._event_stream_parser
        name = response['context'].get('operation_name')
        return AioEventStream(response['body'], shape, parser, name)


class AioRestXMLParser(RestXMLParser):
    def _create_event_stream(self, response, shape):
        parser = self._event_stream_parser
        name = response['context'].get('operation_name')
        return AioEventStream(response['body'], shape, parser, name)


PROTOCOL_PARSERS = {
    'ec2': AioEC2QueryParser,
    'query': AioQueryParser,
    'json': AioJSONParser,
    'rest-json': AioRestJSONParser,
    'rest-xml': AioRestXMLParser,
}