File: _test_base.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (133 lines) | stat: -rw-r--r-- 5,057 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import json
import math
import sys

from azure_devtools.perfstress_tests import PerfStressTest
from azure.identity import DefaultAzureCredential
from azure.identity.aio import DefaultAzureCredential as AsyncDefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.aio import SchemaRegistryClient as AsyncSchemaRegistryClient


class _SchemaRegistryTest(PerfStressTest):
    def __init__(self, arguments):
        super().__init__(arguments)

        self.fully_qualified_namespace = self.get_from_env(
            "SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE"
        )
        self.group_name = self.get_from_env("SCHEMAREGISTRY_GROUP")
        self.name = "your-schema-name"
        self.format = "Avro"
        self.definition = self._create_schema_definition()

    def _create_schema_definition(self):
        schema_size = self.args.schema_size

        fields = []
        schema = {
            "type": "record",
            "name": "example.User",
            "fields": fields,
        }

        # 100 bytes
        schema_no_fields_size = sys.getsizeof(json.dumps(schema, separators=(",", ":")))
        fields.append({"name": "favor_number00000", "type": ["int", "null"]})
        # each additional field is 50 bytes
        schema_one_field_size = sys.getsizeof(json.dumps(schema, separators=(",", ":")))
        field_size = schema_one_field_size - schema_no_fields_size

        # calculate number of fields to add to get args.schema_size rounded down to nearest 50 multiple
        num_fields = math.floor((schema_size - schema_no_fields_size) / field_size)

        for i in range(1, num_fields):
            num_idx = f"{i:05d}"
            fields.append(
                {"name": f"favo_number{num_idx}", "type": ["int", "null"]},
            )
        definition = json.dumps(schema, separators=(",", ":"))
        return definition

    @staticmethod
    def add_arguments(parser):
        super(_SchemaRegistryTest, _SchemaRegistryTest).add_arguments(parser)
        parser.add_argument(
            "--schema-size",
            nargs="?",
            type=int,
            help="Size of a single schema. Max 1000000 bytes. Defaults to 150 bytes",
            default=150,
        )
        parser.add_argument(
            "--num-schemas",
            nargs="?",
            type=int,
            help="""Number of schemas to register/get by ID/get properties for. Default is 10.
                May result in 'Forbidden' Exception for `RegisterSchemaTest` operation, if reached
                the limit of schemas allowed for Schema Registry tier.""",
            default=10,
        )


class _RegisterTest(_SchemaRegistryTest):
    def __init__(self, arguments):
        super().__init__(arguments)
        self.sync_credential = DefaultAzureCredential()
        self.sync_client = SchemaRegistryClient(
            fully_qualified_namespace=self.fully_qualified_namespace,
            credential=self.sync_credential,
        )
        self.async_credential = AsyncDefaultAzureCredential()
        self.async_client = AsyncSchemaRegistryClient(
            fully_qualified_namespace=self.fully_qualified_namespace,
            credential=self.async_credential,
        )

    async def global_setup(self):
        await super().global_setup()

    async def close(self):
        self.sync_client.close()
        self.sync_credential.close()
        await self.async_client.close()
        await self.async_credential.close()
        await super().close()


class _GetSchemaTest(_SchemaRegistryTest):
    def __init__(self, arguments):
        super().__init__(arguments)
        self.sync_credential = DefaultAzureCredential()
        self.sync_client = SchemaRegistryClient(
            fully_qualified_namespace=self.fully_qualified_namespace,
            credential=self.sync_credential,
        )
        self.async_credential = AsyncDefaultAzureCredential()
        self.async_client = AsyncSchemaRegistryClient(
            fully_qualified_namespace=self.fully_qualified_namespace,
            credential=self.async_credential,
        )
        self.schema_id = self._preregister_schema()

    def _preregister_schema(self):
        with self.sync_client as client:
            schema_properties = client.register_schema(
                self.group_name, self.name, self.definition, self.format
            )
            return schema_properties.id

    async def global_setup(self):
        await super().global_setup()

    async def close(self):
        self.sync_client.close()
        self.sync_credential.close()
        await self.async_client.close()
        await self.async_credential.close()
        await super().close()