File: conftest.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (134 lines) | stat: -rw-r--r-- 5,108 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import functools
import os

import pytest
import responses
from azure.iot.deviceprovisioning import DeviceProvisioningClient
from azure.core.credentials import AzureNamedKeyCredential
from devtools_testutils import (
    add_body_key_sanitizer,
    EnvironmentVariableLoader,
    add_general_regex_sanitizer,
    add_header_regex_sanitizer,
    remove_batch_sanitizers,
    test_proxy,
)


from urllib3.util.retry import Retry


# cSpell:disable
mock_dps_target = {}
mock_dps_target["cs"] = "HostName=mydps;SharedAccessKeyName=name;SharedAccessKey=value"
mock_dps_target["entity"] = "mydps.azure-devices-provisioning.net"
mock_dps_target["primarykey"] = "fakekeyfakekeyfakekeyfakekeyfakekeyfakekeyA="
mock_dps_target["secondarykey"] = "fakekeyfakekeyfakekeyfakekeyfakekeyfakekeyA="
mock_dps_target["policy"] = "provisioningserviceowner"
mock_dps_target["subscription"] = "5952cff8-bcd1-4235-9554-af2c0348bf23"
mock_dps_target["endpoint"] = "https://{}".format(mock_dps_target["entity"])
generic_cs_template = "HostName={};SharedAccessKeyName={};SharedAccessKey={}"

GLOBAL_PROVISIONING_HOST = "global.azure-devices-provisioning.net"
WEBHOOK_URL = "https://www.test.test"
TEST_ENDORSEMENT_KEY = (
    "AToAAQALAAMAsgAgg3GXZ0SEs/gakMyNRqXXJP1S124GUgtk8qHaGzMUaaoABgCAAEMAEAgAAAAAAAEAibym9HQP9vxCGF5dVc1Q"
    "QsAGe021aUGJzNol1/gycBx3jFsTpwmWbISRwnFvflWd0w2Mc44FAAZNaJOAAxwZvG8GvyLlHh6fGKdh+mSBL4iLH2bZ4Ry22cB3"
    "CJVjXmdGoz9Y/j3/NwLndBxQC+baNvzvyVQZ4/A2YL7vzIIj2ik4y+ve9ir7U0GbNdnxskqK1KFIITVVtkTIYyyFTIR0BySjPrRI"
    "Dj7r7Mh5uF9HBppGKQCBoVSVV8dI91lNazmSdpGWyqCkO7iM4VvUMv2HT/ym53aYlUrau+Qq87Tu+uQipWYgRdF11KDfcpMHqqzB"
    "QQ1NpOJVhrsTrhyJzO7KNw=="
)
API_VERSION = "2019-03-31"
CUSTOM_ALLOCATION = {"webhookUrl": WEBHOOK_URL, "apiVersion": API_VERSION}
TEST_DICT = {"hello": "world"}
DEVICE_INFO = {"additionalProperties": TEST_DICT}
CERT_FOLDER = "./test_certs"
# reprovision policy models
REPROVISION_MIGRATE = {"migrateDeviceData": True, "updateHubAssignment": True}

REPROVISION_RESET = {"migrateDeviceData": False, "updateHubAssignment": True}

REPROVISION_NEVER = {"migrateDeviceData": False, "updateHubAssignment": False}

INITIAL_TWIN_PROPERTIES = {"properties": {"desired": {"key": "value"}}}


@pytest.fixture(scope="session", autouse=True)
def add_sanitizers(test_proxy):
    subscription_id = os.environ.get(
        "AZURE_SUBSCRIPTION_ID", "00000000-0000-0000-0000-000000000000"
    )
    tenant_id = os.environ.get(
        "AZURE_TENANT_ID", "00000000-0000-0000-0000-000000000000"
    )
    client_id = os.environ.get(
        "AZURE_CLIENT_ID", "00000000-0000-0000-0000-000000000000"
    )
    client_secret = os.environ.get(
        "AZURE_CLIENT_SECRET", "00000000-0000-0000-0000-000000000000"
    )
    add_general_regex_sanitizer(
        regex=subscription_id, value="00000000-0000-0000-0000-000000000000"
    )
    add_general_regex_sanitizer(
        regex=tenant_id, value="00000000-0000-0000-0000-000000000000"
    )
    add_general_regex_sanitizer(
        regex=client_id, value="00000000-0000-0000-0000-000000000000"
    )
    add_general_regex_sanitizer(
        regex=client_secret, value="00000000-0000-0000-0000-000000000000"
    )
    add_general_regex_sanitizer(
        regex=r"-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----",
        value="certificate",
    )
    add_header_regex_sanitizer(key="Set-Cookie", value="[set-cookie;]")
    add_header_regex_sanitizer(key="Cookie", value="cookie;")
    add_body_key_sanitizer(json_path="$..access_token", value="access_token")
    add_body_key_sanitizer(json_path="$..primaryKey", value="primaryKey")
    add_body_key_sanitizer(json_path="$..secondaryKey", value="secondaryKey")
    add_body_key_sanitizer(json_path="$..sha256Thumbprint", value="thumbprint")
    add_body_key_sanitizer(json_path="$..sha1Thumbprint", value="thumbprint")

    # Remove the following sanitizers since certain fields are needed in tests and are non-sensitive:
    #  - AZSDK3490: $..etag
    remove_batch_sanitizers(["AZSDK3490"])
    return


@pytest.fixture
def mocked_response():
    with responses.RequestsMock() as mock:
        on_request_with_no_retry = functools.partial(
            mock._on_request,
            retries=Retry(
                0,
                read=False,
            ),
        )
        mock._on_request = on_request_with_no_retry
        yield mock


@pytest.fixture
def sdk_client() -> DeviceProvisioningClient:
    host_name, policy_name, key = (
        mock_dps_target["entity"],
        mock_dps_target["policy"],
        mock_dps_target["primarykey"],
    )
    creds = AzureNamedKeyCredential(policy_name, key)
    client = DeviceProvisioningClient(
        endpoint=f"https://{host_name}", credential=creds, authentication_policy=creds
    )
    return client


ProvisioningServicePreparer = functools.partial(
    EnvironmentVariableLoader,
    "iothub",
    iothub_dps_endpoint="fake-resource.azure-devices-provisioning.net",
    iothub_dps_conn_str="HostName=mydps;SharedAccessKeyName=name;SharedAccessKey=fakekeyfakekeyfakekeyfakekeyfakekeyfakekeyA=",
    iothub_dps_idscope="IDSCOPE",
)