File: openapi20.py

package info (click to toggle)
sphinxcontrib-openapi 0.8.4-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 876 kB
  • sloc: python: 7,575; makefile: 15
file content (263 lines) | stat: -rw-r--r-- 9,000 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""
    sphinxcontrib.openapi.openapi20
    -------------------------------

    The OpenAPI 2.0 (f.k.a. Swagger) spec renderer. Based on
    ``sphinxcontrib-httpdomain``.

    :copyright: (c) 2016, Ihor Kalnytskyi.
    :license: BSD, see LICENSE for details.
"""

import collections
import itertools
import re

from sphinxcontrib.openapi import utils


def _httpresource(endpoint, method, properties, convert):
    parameters = properties.get('parameters', [])
    responses = properties['responses']
    indent = '   '

    yield '.. http:{0}:: {1}'.format(method, endpoint)
    yield '   :synopsis: {0}'.format(properties.get('summary', 'null'))
    yield ''

    if 'summary' in properties:
        for line in properties['summary'].splitlines():
            yield '{indent}**{line}**'.format(**locals())
        yield ''

    if 'description' in properties:
        for line in convert(properties['description']).splitlines():
            yield '{indent}{line}'.format(**locals())
        yield ''

    for param in filter(lambda p: p['in'] == 'path', parameters):
        yield indent + ':param {type} {name}:'.format(**param)
        for line in convert(param.get('description', '')).splitlines():
            yield '{indent}{indent}{line}'.format(**locals())

    # print request's query params
    for param in filter(lambda p: p['in'] == 'query', parameters):
        yield indent + ':query {type} {name}:'.format(**param)
        for line in convert(param.get('description', '')).splitlines():
            yield '{indent}{indent}{line}'.format(**locals())

    # print the json body params
    for param in filter(lambda p: p['in'] == 'body', parameters):
        if 'schema' in param:
            yield ''
            for line in convert_json_schema(param['schema']):
                yield '{indent}{line}'.format(**locals())
            yield ''

    # print response status codes
    for status, response in sorted(responses.items()):
        yield '{indent}:status {status}:'.format(**locals())
        for line in convert(response.get('description', '')).splitlines():
            yield '{indent}{indent}{line}'.format(**locals())

    # print request header params
    for param in filter(lambda p: p['in'] == 'header', parameters):
        yield indent + ':reqheader {name}:'.format(**param)
        for line in convert(param.get('description', '')).splitlines():
            yield '{indent}{indent}{line}'.format(**locals())

    # print response headers
    for status, response in responses.items():
        for headername, header in response.get('headers', {}).items():
            yield indent + ':resheader {name}:'.format(name=headername)
            for line in convert(header.get('description', '')).splitlines():
                yield '{indent}{indent}{line}'.format(**locals())

    for status, response in responses.items():
        if not is_2xx_response(status):
            continue
        if 'schema' in response:
            yield ''
            for line in convert_json_schema(
                    response['schema'], directive=':>json'):
                yield '{indent}{line}'.format(**locals())
            yield ''

    yield ''


def convert_json_schema(schema, directive=':<json'):
    """
    Convert json schema to `:<json` sphinx httpdomain.
    """

    output = []

    def _convert(schema, name='', required=False):
        """
        Fill the output list, with 2-tuple (name, template)

        i.e: ('user.age', 'str user.age: the age of user')

        This allow to sort output by field name
        """

        type_ = schema.get('type', 'any')
        required_properties = schema.get('required', ())
        if type_ == 'object' and schema.get('properties'):
            for prop, next_schema in schema.get('properties', {}).items():
                _convert(
                    next_schema, '{name}.{prop}'.format(**locals()),
                    (prop in required_properties))

        elif type_ == 'array':
            _convert(schema['items'], name + '[]')

        else:
            if name:
                name = name.lstrip('.')
                constraints = []
                if required:
                    constraints.append('required')
                if schema.get('readOnly', False):
                    constraints.append('read only')
                if constraints:
                    constraints = '({})'.format(', '.join(constraints))
                else:
                    constraints = ''

                if schema.get('description', ''):
                    if constraints:
                        output.append((
                            name,
                            '{type_} {name}:'
                            ' {schema[description]}'
                            ' {constraints}'.format(**locals())))
                    else:
                        output.append((
                            name,
                            '{type_} {name}:'
                            ' {schema[description]}'.format(**locals())))

                else:
                    if constraints:
                        output.append(
                            (name,
                             '{type_} {name}:'
                             ' {constraints}'.format(**locals())))
                    else:
                        output.append(
                            (name,
                             '{type_} {name}:'.format(**locals())))

    _convert(schema)

    for _, render in sorted(output):
        yield '{} {}'.format(directive, render)


def is_2xx_response(status):
    try:
        status = int(status)
        return 200 <= status < 300
    except ValueError:
        pass
    return False


def _header(title):
    yield title
    yield '=' * len(title)
    yield ''


def openapihttpdomain(spec, **options):
    if 'examples' in options:
        raise ValueError(
            'Rendering examples is not supported for OpenAPI v2.x specs.')

    if 'request' in options:
        raise ValueError(
            'The :request: option is not supported for OpenAPI v2.x specs.')

    generators = []

    # OpenAPI spec may contain JSON references, common properties, etc.
    # Trying to render the spec "As Is" will require to put multiple
    # if-s around the code. In order to simplify flow, let's make the
    # spec to have only one (expected) schema, i.e. normalize it.
    utils.normalize_spec(spec, **options)

    # Paths list to be processed
    paths = []

    # If 'paths' are passed we've got to ensure they exist within an OpenAPI
    # spec; otherwise raise error and ask user to fix that.
    if 'paths' in options:
        if not set(options['paths']).issubset(spec['paths']):
            raise ValueError(
                'One or more paths are not defined in the spec: %s.' % (
                    ', '.join(set(options['paths']) - set(spec['paths'])),
                )
            )
        paths = options['paths']

    # Check against regular expressions to be included
    if 'include' in options:
        for i in options['include']:
            ir = re.compile(i)
            for path in spec['paths']:
                if ir.match(path):
                    paths.append(path)

    # If no include nor paths option, then take full path
    if 'include' not in options and 'paths' not in options:
        paths = spec['paths']

    # Remove paths matching regexp
    if 'exclude' in options:
        _paths = []
        for e in options['exclude']:
            er = re.compile(e)
            for path in paths:
                if not er.match(path):
                    _paths.append(path)
        paths = _paths

    if 'group' in options:
        groups = collections.OrderedDict(
            [(x['name'], []) for x in spec.get('tags', {})]
            )

        for endpoint in paths:
            for method, properties in spec['paths'][endpoint].items():
                if options.get('methods') and method not in options.get('methods'):
                    continue
                key = properties.get('tags', [''])[0]
                groups.setdefault(key, []).append(_httpresource(
                    endpoint,
                    method,
                    properties,
                    utils.get_text_converter(options),
                    ))

        for key in groups.keys():
            if key:
                generators.append(_header(key))
            else:
                generators.append(_header('default'))

            generators.extend(groups[key])
    else:
        for endpoint in paths:
            for method, properties in spec['paths'][endpoint].items():
                if options.get('methods') and method not in options.get('methods'):
                    continue
                generators.append(_httpresource(
                    endpoint,
                    method,
                    properties,
                    utils.get_text_converter(options),
                    ))

    return iter(itertools.chain(*generators))