File: dsl-generator.py

package info (click to toggle)
python-elasticsearch 9.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 22,728 kB
  • sloc: python: 104,053; makefile: 151; javascript: 75
file content (1023 lines) | stat: -rw-r--r-- 44,327 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
#  Licensed to Elasticsearch B.V. under one or more contributor
#  license agreements. See the NOTICE file distributed with
#  this work for additional information regarding copyright
#  ownership. Elasticsearch B.V. licenses this file to you under
#  the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.

import json
import re
import subprocess
import textwrap
from urllib.error import HTTPError
from urllib.request import urlopen

from jinja2 import Environment, PackageLoader, select_autoescape

jinja_env = Environment(
    loader=PackageLoader("utils"),
    autoescape=select_autoescape(),
    trim_blocks=True,
    lstrip_blocks=True,
)
field_py = jinja_env.get_template("field.py.tpl")
query_py = jinja_env.get_template("query.py.tpl")
aggs_py = jinja_env.get_template("aggs.py.tpl")
response_init_py = jinja_env.get_template("response.__init__.py.tpl")
types_py = jinja_env.get_template("types.py.tpl")

# map with name replacements for Elasticsearch attributes
PROP_REPLACEMENTS = {"from": "from_", "global": "global_"}

# map with Elasticsearch type replacements
# keys and values are in given in "{namespace}:{name}" format
TYPE_REPLACEMENTS = {
    "_types.query_dsl:DistanceFeatureQuery": "_types.query_dsl:DistanceFeatureQueryBase",
}

# some aggregation types are complicated to determine from the schema, so they
# have their correct type here
AGG_TYPES = {
    "bucket_count_ks_test": "Pipeline",
    "bucket_correlation": "Pipeline",
    "bucket_sort": "Bucket",
    "categorize_text": "Bucket",
    "filter": "Bucket",
    "moving_avg": "Pipeline",
    "variable_width_histogram": "Bucket",
}


def property_to_class_name(name):
    return "".join([w.title() if w != "ip" else "IP" for w in name.split("_")])


def wrapped_doc(text, width=70, initial_indent="", subsequent_indent=""):
    """Formats a docstring as a list of lines of up to the request width."""
    return textwrap.wrap(
        text.replace("\n", " "),
        width=width,
        initial_indent=initial_indent,
        subsequent_indent=subsequent_indent,
    )


def add_dict_type(type_):
    """Add Dict[str, Any] to a Python type hint."""
    if type_.startswith("Union["):
        type_ = f"{type_[:-1]}, Dict[str, Any]]"
    else:
        type_ = f"Union[{type_}, Dict[str, Any]]"
    return type_


def add_seq_dict_type(type_):
    """Add Sequence[Dict[str, Any]] to a Python type hint."""
    if type_.startswith("Union["):
        type_ = f"{type_[:-1]}, Sequence[Dict[str, Any]]]"
    else:
        type_ = f"Union[{type_}, Sequence[Dict[str, Any]]]"
    return type_


def add_not_set(type_):
    """Add DefaultType to a Python type hint."""
    if type_.startswith("Union["):
        type_ = f'{type_[:-1]}, "DefaultType"]'
    else:
        type_ = f'Union[{type_}, "DefaultType"]'
    return type_


def type_for_types_py(type_):
    """Converts a type rendered in a generic way to the format needed in the
    types.py module.
    """
    type_ = type_.replace('"DefaultType"', "DefaultType")
    type_ = type_.replace('"InstrumentedField"', "InstrumentedField")
    type_ = re.sub(r'"(function\.[a-zA-Z0-9_]+)"', r"\1", type_)
    type_ = re.sub(r'"types\.([a-zA-Z0-9_]+)"', r'"\1"', type_)
    type_ = re.sub(r'"(wrappers\.[a-zA-Z0-9_]+)"', r"\1", type_)
    return type_


class ElasticsearchSchema:
    """Operations related to the Elasticsearch schema."""

    def __init__(self, version="main"):
        response = None
        for branch in [version, "main"]:
            url = f"https://raw.githubusercontent.com/elastic/elasticsearch-specification/{branch}/output/schema/schema.json"
            try:
                response = urlopen(url)
                print(f"Initializing code generation with '{branch}' specification.")
                break
            except HTTPError:
                continue
        if not response:
            raise RuntimeError("Could not download Elasticsearch schema")
        self.schema = json.loads(response.read())

        # Interfaces collects interfaces that are seen while traversing the schema.
        # Any interfaces collected here are then rendered as Python in the
        # types.py module.
        self.interfaces = []
        self.response_interfaces = []

    def find_type(self, name, namespace=None):
        for t in self.schema["types"]:
            if t["name"]["name"] == name and (
                namespace is None or t["name"]["namespace"] == namespace
            ):
                return t

    def inherits_from(self, type_, name, namespace=None):
        while "inherits" in type_:
            type_ = self.find_type(
                type_["inherits"]["type"]["name"],
                type_["inherits"]["type"]["namespace"],
            )
            if type_["name"]["name"] == name and (
                namespace is None or type_["name"]["namespace"] == namespace
            ):
                return True
        return False

    def get_python_type(self, schema_type, for_response=False):
        """Obtain Python typing details for a given schema type

        This method returns a tuple. The first element is a string with the
        Python type hint. The second element is a dictionary with Python DSL
        specific typing details to be stored in the DslBase._param_defs
        attribute (or None if the type does not need to be in _param_defs).

        When `for_response` is `False`, any new interfaces that are discovered
        are registered to be generated in "request" style, with alternative
        Dict type hints and default values. If `for_response` is `True`,
        interfaces are generated just with their declared type, without
        Dict alternative and without defaults, to help type checkers be more
        effective at parsing response expressions.
        """
        if schema_type["kind"] == "instance_of":
            type_name = schema_type["type"]
            if type_name["namespace"] in ["_types", "internal", "_builtins"]:
                if type_name["name"] in ["integer", "uint", "long", "ulong"]:
                    return "int", None
                elif type_name["name"] in ["number", "float", "double"]:
                    return "float", None
                elif type_name["name"] == "string":
                    return "str", None
                elif type_name["name"] == "boolean":
                    return "bool", None
                elif type_name["name"] == "binary":
                    return "bytes", None
                elif type_name["name"] == "null":
                    return "None", None
                elif type_name["name"] == "Field":
                    if for_response:
                        return "str", None
                    else:
                        return 'Union[str, "InstrumentedField"]', None
                else:
                    # not an instance of a native type, so we get the type and try again
                    return self.get_python_type(
                        self.find_type(type_name["name"], type_name["namespace"]),
                        for_response=for_response,
                    )
            elif (
                type_name["namespace"] == "_types.query_dsl"
                and type_name["name"] == "QueryContainer"
            ):
                # QueryContainer maps to the DSL's Query class
                return "Query", {"type": "query"}
            elif (
                type_name["namespace"] == "_global.search._types"
                and type_name["name"] == "SearchRequestBody"
            ):
                # we currently do not provide specific typing for this one
                return "Dict[str, Any]", None
            elif (
                type_name["namespace"] == "_types.query_dsl"
                and type_name["name"] == "FunctionScoreContainer"
            ):
                # FunctionScoreContainer maps to the DSL's ScoreFunction class
                return "ScoreFunction", {"type": "score_function"}
            elif (
                type_name["namespace"] == "_types.aggregations"
                and type_name["name"] == "Buckets"
            ):
                if for_response:
                    return "Union[Sequence[Any], Dict[str, Any]]", None
                else:
                    return "Dict[str, Query]", {"type": "query", "hash": True}
            elif (
                type_name["namespace"] == "_types.aggregations"
                and type_name["name"] == "CompositeAggregationSource"
            ):
                # CompositeAggreagationSource maps to the DSL's Agg class
                return "Agg[_R]", None
            else:
                # for any other instances we get the type and recurse
                type_ = self.find_type(type_name["name"], type_name["namespace"])
                if type_:
                    return self.get_python_type(type_, for_response=for_response)

        elif schema_type["kind"] == "type_alias":
            # for an alias, we use the aliased type
            return self.get_python_type(schema_type["type"], for_response=for_response)

        elif schema_type["kind"] == "array_of":
            # for arrays we use Sequence[element_type]
            type_, param = self.get_python_type(
                schema_type["value"], for_response=for_response
            )
            return f"Sequence[{type_}]", {**param, "multi": True} if param else None

        elif schema_type["kind"] == "dictionary_of":
            # for dicts we use Mapping[key_type, value_type]
            key_type, key_param = self.get_python_type(
                schema_type["key"], for_response=for_response
            )
            value_type, value_param = self.get_python_type(
                schema_type["value"], for_response=for_response
            )
            return f"Mapping[{key_type}, {value_type}]", (
                {**value_param, "hash": True} if value_param else None
            )

        elif schema_type["kind"] == "union_of":
            if (
                len(schema_type["items"]) == 2
                and schema_type["items"][0]["kind"] == "instance_of"
                and schema_type["items"][1]["kind"] == "array_of"
                and schema_type["items"][0] == schema_type["items"][1]["value"]
            ):
                # special kind of unions in the form Union[type, Sequence[type]]
                type_, param = self.get_python_type(
                    schema_type["items"][0], for_response=for_response
                )
                if schema_type["items"][0]["type"]["name"] in [
                    "CompletionSuggestOption",
                    "PhraseSuggestOption",
                    "TermSuggestOption",
                ]:
                    # for suggest types we simplify this type and return just the array form
                    return (
                        f"Sequence[{type_}]",
                        ({"type": param["type"], "multi": True} if param else None),
                    )
                else:
                    # for every other types we produce an union with the two alternatives
                    return (
                        f"Union[{type_}, Sequence[{type_}]]",
                        ({"type": param["type"], "multi": True} if param else None),
                    )
            elif (
                len(schema_type["items"]) == 2
                and schema_type["items"][0]["kind"] == "instance_of"
                and schema_type["items"][1]["kind"] == "instance_of"
                and schema_type["items"][0]["type"]
                == {"name": "T", "namespace": "_spec_utils.PipeSeparatedFlags"}
                and schema_type["items"][1]["type"]
                == {"name": "string", "namespace": "_builtins"}
            ):
                # for now we treat PipeSeparatedFlags as a special case
                if "PipeSeparatedFlags" not in self.interfaces:
                    self.interfaces.append("PipeSeparatedFlags")
                return '"types.PipeSeparatedFlags"', None
            else:
                # generic union type
                types = list(
                    dict.fromkeys(  # eliminate duplicates
                        [
                            self.get_python_type(t, for_response=for_response)
                            for t in schema_type["items"]
                        ]
                    )
                )
                if len(types) == 1:
                    return types[0]
                return "Union[" + ", ".join([type_ for type_, _ in types]) + "]", None

        elif schema_type["kind"] == "enum":
            # enums are mapped to Literal[member, ...]
            t = (
                "Literal["
                + ", ".join(
                    [f"\"{member['name']}\"" for member in schema_type["members"]]
                )
                + "]"
            )
            if {"name": "true"} in schema_type["members"] and {
                "name": "false"
            } in schema_type["members"]:
                # this is a boolean that was later upgraded to an enum, so we
                # should also allow bools
                t = f"Union[{t}, bool]"
            return t, None

        elif schema_type["kind"] == "interface":
            if schema_type["name"]["namespace"] == "_types.query_dsl":
                # handle specific DSL classes explicitly to map to existing
                # Python DSL classes
                if schema_type["name"]["name"].endswith("RangeQuery"):
                    return '"wrappers.Range[Any]"', None
                elif schema_type["name"]["name"].endswith("ScoreFunction"):
                    name = schema_type["name"]["name"].removesuffix("Function")
                    return f'"function.{name}"', None
                elif schema_type["name"]["name"].endswith("DecayFunction"):
                    return '"function.DecayFunction"', None
                elif schema_type["name"]["name"].endswith("Function"):
                    return f"\"function.{schema_type['name']['name']}\"", None
            elif schema_type["name"]["namespace"] == "_types.analysis" and schema_type[
                "name"
            ]["name"].endswith("Analyzer"):
                # not expanding analyzers at this time, maybe in the future
                return "str, Dict[str, Any]", None
            elif schema_type["name"]["namespace"] == "_types.aggregations":
                if (
                    schema_type["name"]["name"].endswith("AggregationRange")
                    or schema_type["name"]["name"] == "DateRangeExpression"
                ) and schema_type["name"]["name"] != "IpRangeAggregationRange":
                    return '"wrappers.AggregationRange"', None

            # to handle other interfaces we generate a type of the same name
            # and add the interface to the interfaces.py module
            if schema_type["name"]["name"] not in self.interfaces:
                self.interfaces.append(schema_type["name"]["name"])
                if for_response:
                    self.response_interfaces.append(schema_type["name"]["name"])
            return f"\"types.{schema_type['name']['name']}\"", None
        elif schema_type["kind"] == "user_defined_value":
            # user_defined_value maps to Python's Any type
            return "Any", None

        raise RuntimeError(f"Cannot find Python type for {schema_type}")

    def add_attribute(self, k, arg, for_types_py=False, for_response=False):
        """Add an attribute to the internal representation of a class.

        This method adds the argument `arg` to the data structure for a class
        stored in `k`. In particular, the argument is added to the `k["args"]`
        list, making sure required arguments are first in the list. If the
        argument is of a type that needs Python DSL specific typing details to
        be stored in the DslBase._param_defs attribute, then this is added to
        `k["params"]`.

        When `for_types_py` is `True`, type hints are formatted in the most
        convenient way for the types.py file. When possible, double quotes are
        removed from types, and for types that are in the same file the quotes
        are kept to prevent forward references, but the "types." namespace is
        removed. When `for_types_py` is `False`, all non-native types use
        quotes and are namespaced.

        When `for_response` is `True`, type hints are not given the optional
        dictionary representation, nor the `DefaultType` used for omitted
        attributes.
        """
        try:
            type_, param = self.get_python_type(arg["type"], for_response=for_response)
        except RuntimeError:
            type_ = "Any"
            param = None
        if not for_response:
            if type_ != "Any":
                if (
                    'Sequence["types.' in type_
                    or 'Sequence["wrappers.AggregationRange' in type_
                ):
                    type_ = add_seq_dict_type(type_)  # interfaces can be given as dicts
                elif "types." in type_ or "wrappers.AggregationRange" in type_:
                    type_ = add_dict_type(type_)  # interfaces can be given as dicts
                type_ = add_not_set(type_)
        if for_types_py:
            type_ = type_for_types_py(type_)
        required = "(required) " if arg["required"] else ""
        server_default = (
            f" Defaults to `{arg['serverDefault']}` if omitted."
            if arg.get("serverDefault")
            else ""
        )
        doc = wrapped_doc(
            f":arg {arg['name']}: {required}{arg.get('description', '')}{server_default}",
            subsequent_indent="    ",
        )
        arg = {
            "name": PROP_REPLACEMENTS.get(arg["name"], arg["name"]),
            "type": type_,
            "doc": doc,
            "required": arg["required"],
        }
        if param is not None:
            param = {"name": arg["name"], "param": param}
        if arg["required"]:
            # insert in the right place so that all required arguments
            # appear at the top of the argument list
            i = 0
            for i in range(len(k["args"]) + 1):
                if i == len(k["args"]):
                    break
                if k["args"][i].get("positional"):
                    continue
                if k["args"][i]["required"] is False:
                    break
            k["args"].insert(i, arg)
        else:
            k["args"].append(arg)
        if param and "params" in k:
            k["params"].append(param)

    def add_behaviors(self, type_, k, for_types_py=False, for_response=False):
        """Add behaviors reported in the specification of the given type to the
        class representation.
        """
        if "behaviors" in type_:
            for behavior in type_["behaviors"]:
                if (
                    behavior["type"]["name"] != "AdditionalProperty"
                    or behavior["type"]["namespace"] != "_spec_utils"
                ):
                    # we do not support this behavior, so we ignore it
                    continue
                key_type, _ = self.get_python_type(
                    behavior["generics"][0], for_response=for_response
                )
                if "InstrumentedField" in key_type:
                    value_type, _ = self.get_python_type(
                        behavior["generics"][1], for_response=for_response
                    )
                    if for_types_py:
                        value_type = value_type.replace('"DefaultType"', "DefaultType")
                        value_type = value_type.replace(
                            '"InstrumentedField"', "InstrumentedField"
                        )
                        value_type = re.sub(
                            r'"(function\.[a-zA-Z0-9_]+)"', r"\1", value_type
                        )
                        value_type = re.sub(
                            r'"types\.([a-zA-Z0-9_]+)"', r'"\1"', value_type
                        )
                        value_type = re.sub(
                            r'"(wrappers\.[a-zA-Z0-9_]+)"', r"\1", value_type
                        )
                    k["args"].append(
                        {
                            "name": "_field",
                            "type": add_not_set(key_type),
                            "doc": [":arg _field: The field to use in this query."],
                            "required": False,
                            "positional": True,
                        }
                    )
                    k["args"].append(
                        {
                            "name": "_value",
                            "type": add_not_set(add_dict_type(value_type)),
                            "doc": [":arg _value: The query value for the field."],
                            "required": False,
                            "positional": True,
                        }
                    )
                    k["is_single_field"] = True
                else:
                    raise RuntimeError(
                        f"Non-field AdditionalProperty are not supported for interface {type_['name']['namespace']}:{type_['name']['name']}."
                    )

    def property_to_python_class(self, p):
        """Return a dictionary with template data necessary to render a schema
        property as a Python class.

        Used for "container" sub-classes such as `QueryContainer`, where each
        sub-class is represented by a Python DSL class.

        The format is as follows:

        ```python
        {
            "property_name": "the name of the property",
            "name": "the class name to use for the property",
            "docstring": "the formatted docstring as a list of strings",
            "args": [  # a Python description of each class attribute
                "name": "the name of the attribute",
                "type": "the Python type hint for the attribute",
                "doc": ["formatted lines of documentation to add to class docstring"],
                "required": bool,
                "positional": bool,
            ],
            "params": [
                "name": "the attribute name",
                "param": "the param dictionary to include in `_param_defs` for the class",
            ],  # a DSL-specific description of interesting attributes
            "is_single_field": bool  # True for single-key dicts with field key
            "is_multi_field": bool  # True for multi-key dicts with field keys
        }
        ```
        """
        k = {
            "property_name": p["name"],
            "name": property_to_class_name(p["name"]),
        }
        k["docstring"] = wrapped_doc(p.get("description") or "")
        other_classes = []
        kind = p["type"]["kind"]
        if kind == "instance_of":
            namespace = p["type"]["type"]["namespace"]
            name = p["type"]["type"]["name"]
            if f"{namespace}:{name}" in TYPE_REPLACEMENTS:
                namespace, name = TYPE_REPLACEMENTS[f"{namespace}:{name}"].split(":")
            if name == "QueryContainer" and namespace == "_types.query_dsl":
                type_ = {
                    "kind": "interface",
                    "properties": [p],
                }
            else:
                type_ = self.find_type(name, namespace)
            if p["name"] in AGG_TYPES:
                k["parent"] = AGG_TYPES[p["name"]]

            if type_["kind"] == "interface":
                # set the correct parent for bucket and pipeline aggregations
                if self.inherits_from(
                    type_, "PipelineAggregationBase", "_types.aggregations"
                ):
                    k["parent"] = "Pipeline"
                elif self.inherits_from(
                    type_, "BucketAggregationBase", "_types.aggregations"
                ):
                    k["parent"] = "Bucket"

                # generate class attributes
                k["args"] = []
                k["params"] = []
                self.add_behaviors(type_, k)
                while True:
                    for arg in type_["properties"]:
                        self.add_attribute(k, arg)
                    if "inherits" in type_ and "type" in type_["inherits"]:
                        type_ = self.find_type(
                            type_["inherits"]["type"]["name"],
                            type_["inherits"]["type"]["namespace"],
                        )
                    else:
                        break

            elif type_["kind"] == "type_alias":
                if type_["type"]["kind"] == "union_of":
                    # for unions we create sub-classes
                    for other in type_["type"]["items"]:
                        other_class = self.interface_to_python_class(
                            other["type"]["name"],
                            other["type"]["namespace"],
                            for_types_py=False,
                        )
                        other_class["parent"] = k["name"]
                        other_classes.append(other_class)
                else:
                    raise RuntimeError(
                        "Cannot generate code for instances of type_alias instances that are not unions."
                    )

            else:
                raise RuntimeError(
                    f"Cannot generate code for instances of kind '{type_['kind']}'"
                )

        elif kind == "dictionary_of":
            key_type, _ = self.get_python_type(p["type"]["key"])
            if "InstrumentedField" in key_type:
                value_type, _ = self.get_python_type(p["type"]["value"])
                if p["type"]["singleKey"]:
                    # special handling for single-key dicts with field key
                    k["args"] = [
                        {
                            "name": "_field",
                            "type": add_not_set(key_type),
                            "doc": [":arg _field: The field to use in this query."],
                            "required": False,
                            "positional": True,
                        },
                        {
                            "name": "_value",
                            "type": add_not_set(add_dict_type(value_type)),
                            "doc": [":arg _value: The query value for the field."],
                            "required": False,
                            "positional": True,
                        },
                    ]
                    k["is_single_field"] = True
                else:
                    # special handling for multi-key dicts with field keys
                    k["args"] = [
                        {
                            "name": "_fields",
                            "type": f"Optional[Mapping[{key_type}, {value_type}]]",
                            "doc": [
                                ":arg _fields: A dictionary of fields with their values."
                            ],
                            "required": False,
                            "positional": True,
                        },
                    ]
                    k["is_multi_field"] = True
            else:
                raise RuntimeError(f"Cannot generate code for type {p['type']}")

        else:
            raise RuntimeError(f"Cannot generate code for type {p['type']}")
        return [k] + other_classes

    def interface_to_python_class(
        self,
        interface,
        namespace=None,
        *,
        for_types_py=True,
        for_response=False,
    ):
        """Return a dictionary with template data necessary to render an
        interface a Python class.

        This is used to render interfaces that are referenced by container
        classes. The current list of rendered interfaces is passed as a second
        argument to allow this method to add more interfaces to it as they are
        discovered.

        The returned format is as follows:

        ```python
        {
            "name": "the class name to use for the interface class",
            "parent": "the parent class name",
            "args": [ # a Python description of each class attribute
                "name": "the name of the attribute",
                "type": "the Python type hint for the attribute",
                "doc": ["formatted lines of documentation to add to class docstring"],
                "required": bool,
                "positional": bool,
            ],
            "buckets_as_dict": "type" # optional, only present in aggregation response
                                      # classes that have buckets that can have a list
                                      # or dict representation
        }
        ```
        """
        type_ = self.find_type(interface, namespace)
        if type_["kind"] not in ["interface", "response"]:
            raise RuntimeError(f"Type {interface} is not an interface")
        if type_["kind"] == "response":
            # we consider responses as interfaces because they also have properties
            # but the location of the properties is different
            type_ = type_["body"]
        k = {"name": interface, "for_response": for_response, "args": []}
        k["docstring"] = wrapped_doc(type_.get("description") or "")
        self.add_behaviors(
            type_, k, for_types_py=for_types_py, for_response=for_response
        )
        generics = []
        while True:
            for arg in type_["properties"]:
                if interface == "ResponseBody" and arg["name"] == "hits":
                    k["args"].append(
                        {
                            "name": "hits",
                            "type": "Sequence[_R]",
                            "doc": [":arg hits: search results"],
                            "required": arg["required"],
                        }
                    )
                elif interface == "ResponseBody" and arg["name"] == "aggregations":
                    # Aggregations are tricky because the DSL client uses a
                    # flexible representation that is difficult to generate
                    # from the schema.
                    # To handle this we let the generator do its work by calling
                    # `add_attribute()`, but then we save the generated attribute
                    # apart and replace it with the DSL's `AggResponse` class.
                    # The generated type is then used in type hints in variables
                    # and methods of this class.
                    self.add_attribute(
                        k, arg, for_types_py=for_types_py, for_response=for_response
                    )
                    k["aggregate_type"] = (
                        k["args"][-1]["type"]
                        .split("Mapping[str, ")[1]
                        .rsplit("]", 1)[0]
                    )
                    k["args"][-1] = {
                        "name": "aggregations",
                        "type": '"AggResponse[_R]"',
                        "doc": [":arg aggregations: aggregation results"],
                        "required": arg["required"],
                    }
                elif (
                    "name" in type_
                    and type_["name"]["name"] == "MultiBucketAggregateBase"
                    and arg["name"] == "buckets"
                ):
                    # Also during aggregation response generation, the "buckets"
                    # attribute that many aggregation responses have is very
                    # complex, supporting over a dozen different aggregation
                    # types via generics, each in array or object configurations.
                    # Typing this attribute proved very difficult. A solution
                    # that worked with mypy and pyright is to type "buckets"
                    # for the list form, and create a `buckets_as_dict`
                    # property that is typed appropriately for accessing the
                    # buckets in dictionary form.
                    # The generic type is assumed to be the first in the list,
                    # which is a simplification that should be improved when a
                    # more complete implementation of generics is added.
                    if generics[0]["type"]["name"] == "Void":
                        generic_type = "Any"
                    else:
                        _g = self.find_type(
                            generics[0]["type"]["name"],
                            generics[0]["type"]["namespace"],
                        )
                        generic_type, _ = self.get_python_type(
                            _g, for_response=for_response
                        )
                        generic_type = type_for_types_py(generic_type)
                    k["args"].append(
                        {
                            "name": arg["name"],
                            # for the type we only include the array form, since
                            # this client does not request the dict form
                            "type": f"Sequence[{generic_type}]",
                            "doc": [
                                ":arg buckets: (required) the aggregation buckets as a list"
                            ],
                            "required": True,
                        }
                    )
                    k["buckets_as_dict"] = generic_type
                elif namespace == "_types.mapping":
                    if arg["name"] in ["fields", "properties"]:
                        # Python DSL provides a high level representation for the
                        # "fields" and 'properties' properties that many types support
                        k["args"].append(
                            {
                                "name": arg["name"],
                                "type": 'Union[Mapping[str, Field], "DefaultType"]',
                                "doc": [f":arg {arg['name']}:"],
                                "required": False,
                            }
                        )
                        if "params" not in k:
                            k["params"] = []
                        k["params"].append(
                            {
                                "name": arg["name"],
                                "param": {"type": "field", "hash": True},
                            }
                        )

                    else:
                        # also the Python DSL provides implementations of analyzers
                        # and normalizers, so here we make sure these are noted as
                        # params and have an appropriate type hint.
                        self.add_attribute(
                            k, arg, for_types_py=for_types_py, for_response=for_response
                        )
                        if arg["name"].endswith("analyzer"):
                            if "params" not in k:
                                k["params"] = []
                            k["params"].append(
                                {"name": arg["name"], "param": {"type": "analyzer"}}
                            )
                            k["args"][-1]["type"] = 'Union[str, DslBase, "DefaultType"]'
                        elif arg["name"].endswith("normalizer"):
                            if "params" not in k:
                                k["params"] = []
                            k["params"].append(
                                {"name": arg["name"], "param": {"type": "normalizer"}}
                            )
                            k["args"][-1]["type"] = 'Union[str, DslBase, "DefaultType"]'
                else:
                    if interface == "Hit" and arg["name"].startswith("_"):
                        # Python DSL removes the undersore prefix from all the
                        # properties of the hit, so we do the same
                        arg["name"] = arg["name"][1:]

                    self.add_attribute(
                        k, arg, for_types_py=for_types_py, for_response=for_response
                    )

            if "inherits" not in type_ or "type" not in type_["inherits"]:
                break

            if "generics" in type_["inherits"]:
                # Generics are only supported for certain specific cases at this
                # time. Here we just save them so that they can be recalled later
                # while traversing over to parent classes to find inherited
                # attributes.
                for generic_type in type_["inherits"]["generics"]:
                    generics.append(generic_type)

            type_ = self.find_type(
                type_["inherits"]["type"]["name"],
                type_["inherits"]["type"]["namespace"],
            )
        return k


def generate_field_py(schema, filename):
    """Generate field.py with all the Elasticsearch fields as Python classes."""
    float_fields = ["half_float", "scaled_float", "double", "rank_feature"]
    integer_fields = ["byte", "short", "long"]
    range_fields = [
        "integer_range",
        "float_range",
        "long_range",
        "double_range",
        "date_range",
    ]
    object_fields = ["nested"]
    coerced_fields = [
        "boolean",
        "date",
        "float",
        "object",
        "dense_vector",
        "integer",
        "ip",
        "binary",
        "percolator",
    ]

    classes = []
    property = schema.find_type("Property", "_types.mapping")
    for type_ in property["type"]["items"]:
        if type_["type"]["name"] == "DynamicProperty":
            # no support for dynamic properties
            continue
        field = schema.find_type(type_["type"]["name"], type_["type"]["namespace"])
        name = class_name = ""
        for prop in field["properties"]:
            if prop["name"] == "type":
                if prop["type"]["kind"] != "literal_value":
                    raise RuntimeError(f"Unexpected property type {prop}")
                name = prop["type"]["value"]
                class_name = "".join([n.title() for n in name.split("_")])
        k = schema.interface_to_python_class(
            type_["type"]["name"],
            type_["type"]["namespace"],
            for_types_py=False,
            for_response=False,
        )
        k["name"] = class_name
        k["field"] = name
        k["coerced"] = name in coerced_fields
        if name in float_fields:
            k["parent"] = "Float"
        elif name in integer_fields:
            k["parent"] = "Integer"
        elif name in range_fields:
            k["parent"] = "RangeField"
        elif name in object_fields:
            k["parent"] = "Object"
        else:
            k["parent"] = "Field"
        k["args"] = [prop for prop in k["args"] if prop["name"] != "type"]
        if name == "object":
            # the DSL's object field has a doc_class argument
            k["args"] = [
                {
                    "name": "doc_class",
                    "type": 'Union[Type["InnerDoc"], "DefaultType"]',
                    "doc": [
                        ":arg doc_class: base doc class that handles mapping.",
                        "   If no `doc_class` is provided, new instance of `InnerDoc` will be created,",
                        "   populated with `properties` and used. Can not be provided together with `properties`",
                    ],
                    "positional": True,
                    "required": False,
                }
            ] + k["args"]
        elif name == "date":
            k["args"] = [
                {
                    "name": "default_timezone",
                    "type": 'Union[str, "tzinfo", "DefaultType"]',
                    "doc": [
                        ":arg default_timezone: timezone that will be automatically used for tz-naive values",
                        "   May be instance of `datetime.tzinfo` or string containing TZ offset",
                    ],
                    "positional": True,
                    "required": False,
                }
            ] + k["args"]
        classes.append(k)
    # make sure parent classes appear first
    classes = sorted(
        classes,
        key=lambda k: (
            f'AA{k["name"]}'
            if k["name"] in ["Float", "Integer", "Object"]
            else k["name"]
        ),
    )

    with open(filename, "w") as f:
        f.write(field_py.render(classes=classes))
    print(f"Generated {filename}.")


def generate_query_py(schema, filename):
    """Generate query.py with all the properties of `QueryContainer` as Python
    classes.
    """
    classes = []
    query_container = schema.find_type("QueryContainer", "_types.query_dsl")
    for p in query_container["properties"]:
        classes += schema.property_to_python_class(p)

    with open(filename, "w") as f:
        f.write(query_py.render(classes=classes, parent="Query"))
    print(f"Generated {filename}.")


def generate_aggs_py(schema, filename):
    """Generate aggs.py with all the properties of `AggregationContainer` as
    Python classes.
    """
    classes = []
    aggs_container = schema.find_type("AggregationContainer", "_types.aggregations")
    for p in aggs_container["properties"]:
        if "containerProperty" not in p or not p["containerProperty"]:
            classes += schema.property_to_python_class(p)

    with open(filename, "w") as f:
        f.write(aggs_py.render(classes=classes, parent="Agg"))
    print(f"Generated {filename}.")


def generate_response_init_py(schema, filename):
    """Generate response/__init__.py with all the response properties
    documented and typed.
    """
    search_response = schema.interface_to_python_class(
        "ResponseBody",
        "_global.search",
        for_types_py=False,
        for_response=True,
    )
    ubq_response = schema.interface_to_python_class(
        "Response",
        "_global.update_by_query",
        for_types_py=False,
        for_response=True,
    )
    with open(filename, "w") as f:
        f.write(
            response_init_py.render(response=search_response, ubq_response=ubq_response)
        )
    print(f"Generated {filename}.")


def generate_types_py(schema, filename):
    """Generate types.py"""
    classes = {}
    for interface in schema.interfaces:
        if interface == "PipeSeparatedFlags":
            continue  # handled as a special case
        for_response = interface in schema.response_interfaces
        k = schema.interface_to_python_class(
            interface, for_types_py=True, for_response=for_response
        )
        classes[k["name"]] = k

    # sort classes by being request/response and then by name
    sorted_classes = sorted(
        list(classes.keys()),
        key=lambda i: str(int(i in schema.response_interfaces)) + i,
    )
    classes_list = []
    for n in sorted_classes:
        k = classes[n]
        if k in classes_list:
            continue
        classes_list.append(k)

    with open(filename, "w") as f:
        f.write(types_py.render(classes=classes_list))
    print(f"Generated {filename}.")


if __name__ == "__main__":
    v = subprocess.check_output(["git", "branch", "--show-current"]).strip().decode()
    schema = ElasticsearchSchema(v)
    generate_field_py(schema, "elasticsearch/dsl/field.py")
    generate_query_py(schema, "elasticsearch/dsl/query.py")
    generate_aggs_py(schema, "elasticsearch/dsl/aggs.py")
    generate_response_init_py(schema, "elasticsearch/dsl/response/__init__.py")
    generate_types_py(schema, "elasticsearch/dsl/types.py")