File: workload_utils.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (145 lines) | stat: -rw-r--r-- 6,768 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
import asyncio
import os
import random
from datetime import datetime
from logging.handlers import RotatingFileHandler

from azure.monitor.opentelemetry import configure_azure_monitor
from workload_configs import *

_NOISY_ERRORS = set([404, 409, 412])
_NOISY_SUB_STATUS_CODES = set([0, None])
_REQUIRED_ATTRIBUTES = ["resource_type", "verb", "operation_type", "status_code", "sub_status_code", "duration"]

def get_user_agent(client_id):
    prefix = USER_AGENT_PREFIX + "-" if USER_AGENT_PREFIX else ""
    return prefix + str(client_id) + "-" + datetime.now().strftime("%Y%m%d-%H%M%S")

def get_random_item():
    random_int = random.randint(0, NUMBER_OF_LOGICAL_PARTITIONS)
    return {"id": "test-" + str(random_int), "pk": "pk-" + str(random_int)}

def upsert_item(container, excluded_locations, num_upserts):
    for _ in range(num_upserts):
        if excluded_locations:
            container.upsert_item(get_random_item(), etag=None, match_condition=None,
                                  excluded_locations=excluded_locations)
        else:
            container.upsert_item(get_random_item(), etag=None, match_condition=None)


def read_item(container, excluded_locations, num_reads):
    for _ in range(num_reads):
        item = get_random_item()
        if excluded_locations:
            container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None,
                                excluded_locations=excluded_locations)
        else:
            container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None)

def query_items(container, excluded_locations, num_queries):
    for _ in range(num_queries):
        perform_query(container, excluded_locations)


def perform_query(container, excluded_locations):
    random_item = get_random_item()
    if excluded_locations:
        results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk",
                                        parameters=[{"name": "@id", "value": random_item["id"]},
                                                    {"name": "@pk", "value": random_item["pk"]}],
                                        partition_key=random_item[PARTITION_KEY],
                                        excluded_locations=excluded_locations)
    else:
        results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk",
                                        parameters=[{"name": "@id", "value": random_item["id"]},
                                                    {"name": "@pk", "value": random_item["pk"]}],
                                        partition_key=random_item[PARTITION_KEY])
    items = [item for item in results]

async def upsert_item_concurrently(container, excluded_locations, num_upserts):
    tasks = []
    for _ in range(num_upserts):
        if excluded_locations:
            tasks.append(container.upsert_item(get_random_item(), etag=None, match_condition=None,
                                               excluded_locations=excluded_locations))
        else:
            tasks.append(container.upsert_item(get_random_item(), etag=None, match_condition=None))
    await asyncio.gather(*tasks)


async def read_item_concurrently(container, excluded_locations, num_reads):
    tasks = []
    for _ in range(num_reads):
        item = get_random_item()
        if excluded_locations:
            tasks.append(container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None,
                                             excluded_locations=excluded_locations))
        else:
            tasks.append(container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None))
    await asyncio.gather(*tasks)


async def query_items_concurrently(container, excluded_locations, num_queries):
    tasks = []
    for _ in range(num_queries):
        tasks.append(perform_query_concurrently(container, excluded_locations))
    await asyncio.gather(*tasks)


async def perform_query_concurrently(container, excluded_locations):
    random_item = get_random_item()
    if excluded_locations:
        results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk",
                                        parameters=[{"name": "@id", "value": random_item["id"]},
                                                    {"name": "@pk", "value": random_item["pk"]}],
                                        partition_key=random_item[PARTITION_KEY],
                                        excluded_locations=excluded_locations)
    else:
        results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk",
                                        parameters=[{"name": "@id", "value": random_item["id"]},
                                                    {"name": "@pk", "value": random_item["pk"]}],
                                        partition_key=random_item[PARTITION_KEY])
    items = [item async for item in results]

def create_logger(file_name):
    os.environ["AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER"] = str(CIRCUIT_BREAKER_ENABLED)
    logger = logging.getLogger()
    if APP_INSIGHTS_CONNECTION_STRING:
        configure_azure_monitor(
            logger_name="azure.cosmos",
            connection_string=APP_INSIGHTS_CONNECTION_STRING,
        )
    prefix = os.path.splitext(file_name)[0] + "-" + str(os.getpid())
    # Create a rotating file handler
    handler = RotatingFileHandler(
        "log-" + get_user_agent(prefix) + '.log',
        maxBytes=1024 * 1024 * 10,  # 10 mb
        backupCount=2
    )
    logger.setLevel(LOG_LEVEL)
    # create filters for the logger handler to reduce the noise
    workload_logger_filter = WorkloadLoggerFilter()
    handler.addFilter(workload_logger_filter)
    logger.addHandler(handler)
    return prefix, logger


class WorkloadLoggerFilter(logging.Filter):
    def filter(self, record):
        # Check if the required attributes exist in the log record
        if all(hasattr(record, attr) for attr in _REQUIRED_ATTRIBUTES):
            # Check the conditions
            # Check database account reads
            if record.resource_type == "databaseaccount" and record.verb == "GET" and record.operation_type == "Read":
                return True
            # Check if there is an error and omit noisy errors
            if record.status_code >= 400 and not (
                    record.status_code in _NOISY_ERRORS and record.sub_status_code in _NOISY_SUB_STATUS_CODES):
                return True
            # Check if the latency (duration) was above 1000 ms
            if record.duration >= 1000:
                return True
        return False