| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 
 | # -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import os
import logging
import shlex
import time
import signal
import platform
import shutil
import tarfile
from typing import Optional, Generator, Any
import zipfile
import certifi
from dotenv import load_dotenv, find_dotenv
import pytest
import subprocess
from urllib3.exceptions import SSLError
from ci_tools.variables import in_ci
from .config import PROXY_URL
from .fake_credentials import FAKE_ACCESS_TOKEN, FAKE_ID, SERVICEBUS_FAKE_SAS, SANITIZED
from .helpers import get_http_client, is_live_and_not_recording
from .sanitizers import (
    add_batch_sanitizers,
    Sanitizer,
    set_custom_default_matcher,
    remove_batch_sanitizers,
)
load_dotenv(find_dotenv())
# Raise urllib3's exposed logging level so that we don't see tons of warnings while polling the proxy's availability
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
_LOGGER = logging.getLogger()
CONTAINER_STARTUP_TIMEOUT = 60
PROXY_MANUALLY_STARTED = os.getenv("PROXY_MANUAL_START", False)
PROXY_CHECK_URL = PROXY_URL + "/Info/Available"
TOOL_ENV_VAR = "PROXY_PID"
AVAILABLE_TEST_PROXY_BINARIES = {
    "Windows": {
        "AMD64": {
            "system": "Windows",
            "machine": "AMD64",
            "file_name": "test-proxy-standalone-win-x64.zip",
            "executable": "Azure.Sdk.Tools.TestProxy.exe",
        },
    },
    "Linux": {
        "X86_64": {
            "system": "Linux",
            "machine": "X86_64",
            "file_name": "test-proxy-standalone-linux-x64.tar.gz",
            "executable": "Azure.Sdk.Tools.TestProxy",
        },
        "ARM64": {
            "system": "Linux",
            "machine": "ARM64",
            "file_name": "test-proxy-standalone-linux-arm64.tar.gz",
            "executable": "Azure.Sdk.Tools.TestProxy",
        },
    },
    "Darwin": {
        "X86_64": {
            "system": "Darwin",
            "machine": "X86_64",
            "file_name": "test-proxy-standalone-osx-x64.zip",
            "executable": "Azure.Sdk.Tools.TestProxy",
        },
        "ARM64": {
            "system": "Darwin",
            "machine": "ARM64",
            "file_name": "test-proxy-standalone-osx-arm64.zip",
            "executable": "Azure.Sdk.Tools.TestProxy",
        },
    },
}
PROXY_DOWNLOAD_URL = "https://github.com/Azure/azure-sdk-tools/releases/download/Azure.Sdk.Tools.TestProxy_{}/{}"
discovered_roots = []
def get_target_version(repo_root: str) -> str:
    """Gets the target test-proxy version from the target_version.txt file in /eng/common/testproxy"""
    version_file_location = os.path.relpath("eng/common/testproxy/target_version.txt")
    override_version_file_location = os.path.relpath("eng/target_proxy_version.txt")
    version_file_location_from_root = os.path.abspath(os.path.join(repo_root, version_file_location))
    override_version_file_location_from_root = os.path.abspath(os.path.join(repo_root, override_version_file_location))
    if os.path.exists(override_version_file_location_from_root):
        with open(override_version_file_location_from_root, "r") as f:
            target_version = f.read().strip()
    else:
        with open(version_file_location_from_root, "r") as f:
            target_version = f.read().strip()
    return target_version
def get_downloaded_version(repo_root: str) -> Optional[str]:
    """Gets version from downloaded_version.txt within the local download folder"""
    downloaded_version_file = os.path.abspath(os.path.join(repo_root, ".proxy", "downloaded_version.txt"))
    if os.path.exists(downloaded_version_file):
        with open(downloaded_version_file, "r") as f:
            version = f.read().strip()
            return version
    else:
        return None
def ascend_to_root(start_dir_or_file: str) -> str:
    """Given a path, ascend until encountering a folder with a `.git` folder present within it. Return that directory.
    :param str start_dir_or_file: The starting directory or file. Either is acceptable.
    """
    if os.path.isfile(start_dir_or_file):
        current_dir = os.path.dirname(start_dir_or_file)
    else:
        current_dir = start_dir_or_file
    while current_dir is not None and not (os.path.dirname(current_dir) == current_dir):
        possible_root = os.path.join(current_dir, ".git")
        # we need the git check to prevent ascending out of the repo
        if os.path.exists(possible_root):
            if current_dir not in discovered_roots:
                discovered_roots.append(current_dir)
            return current_dir
        else:
            current_dir = os.path.dirname(current_dir)
    raise Exception(f'Requested target "{start_dir_or_file}" does not exist within a git repo.')
def check_availability() -> int:
    """Attempts request to /Info/Available. If a test-proxy instance is responding, we should get a response."""
    try:
        http_client = get_http_client(raise_on_status=False)
        response = http_client.request(method="GET", url=PROXY_CHECK_URL, timeout=10)
        return response.status
    # We get an SSLError if the container is started but the endpoint isn't available yet
    except SSLError as sslError:
        _LOGGER.debug(sslError)
        return 404
    except Exception as e:
        _LOGGER.debug(e)
        return 404
def check_certificate_location(repo_root: str) -> None:
    """Checks for a certificate bundle containing the test proxy's self-signed certificate.
    If a certificate bundle either isn't present or doesn't contain the correct test proxy certificate, a bundle is
    automatically created. SSL_CERT_DIR and REQUESTS_CA_BUNDLE are set to point to this bundle for the session.
    """
    existing_root_pem = certifi.where()
    local_dev_cert = os.path.abspath(os.path.join(repo_root, "eng", "common", "testproxy", "dotnet-devcert.crt"))
    combined_filename = os.path.basename(local_dev_cert).split(".")[0] + ".pem"
    combined_folder = os.path.join(repo_root, ".certificate")
    combined_location = os.path.join(combined_folder, combined_filename)
    # If no local certificate folder exists, create one
    if not os.path.exists(combined_folder):
        _LOGGER.info("Missing a test proxy certificate under azure-sdk-for-python/.certificate. Creating one now.")
        os.mkdir(combined_folder)
    def write_dev_cert_bundle():
        """Creates a certificate bundle with the test proxy certificate, followed by the user's existing CA bundle."""
        _LOGGER.info("Writing latest test proxy certificate to local certificate bundle.")
        # Copy the dev cert's content into the new certificate bundle
        with open(local_dev_cert, "r") as f:
            data = f.read()
        with open(combined_location, "w") as f:
            f.write(data)
        # Copy the existing CA bundle contents into the repository's certificate bundle
        with open(existing_root_pem, "r") as f:
            content = f.readlines()
        with open(combined_location, "a") as f:
            f.writelines(content)
    # If the certificate bundle isn't set up, set it up. If the bundle is present, make sure that it starts with the
    # correct certificate from eng/common/testproxy/dotnet-devcert.crt (to account for certificate rotation)
    if not os.path.exists(combined_location):
        write_dev_cert_bundle()
    else:
        with open(local_dev_cert, "r") as f:
            repo_cert = f.read()
        with open(combined_location, "r") as f:
            # The bundle should start with the test proxy's cert; only read as far in as the cert's length
            bundle_data = f.read(len(repo_cert))
        if repo_cert != bundle_data:
            write_dev_cert_bundle()
    _LOGGER.info(
        "Setting SSL_CERT_DIR, SSL_CERT_FILE, and REQUESTS_CA_BUNDLE environment variables for the current session.\n"
        f"SSL_CERT_DIR={combined_folder}\n"
        f"SSL_CERT_FILE=REQUESTS_CA_BUNDLE={combined_location}"
    )
    os.environ["SSL_CERT_DIR"] = combined_folder
    os.environ["SSL_CERT_FILE"] = combined_location
    os.environ["REQUESTS_CA_BUNDLE"] = combined_location
def check_proxy_availability() -> None:
    """Waits for the availability of the test-proxy."""
    start = time.time()
    now = time.time()
    status_code = 0
    while now - start < CONTAINER_STARTUP_TIMEOUT and status_code != 200:
        status_code = check_availability()
        now = time.time()
def prepare_local_tool(repo_root: str) -> str:
    """Returns the path to a downloaded executable."""
    target_proxy_version = get_target_version(repo_root)
    download_folder = os.path.join(repo_root, ".proxy")
    system = platform.system()  # Darwin, Linux, Windows
    machine = platform.machine().upper()  # arm64, x86_64, AMD64
    if system in AVAILABLE_TEST_PROXY_BINARIES:
        available_for_system = AVAILABLE_TEST_PROXY_BINARIES[system]
        if machine in available_for_system:
            target_info = available_for_system[machine]
            downloaded_version = get_downloaded_version(repo_root)
            download_necessary = not downloaded_version == target_proxy_version
            if download_necessary:
                if os.path.exists(download_folder):
                    # cleanup the directory for re-download
                    shutil.rmtree(download_folder)
                os.makedirs(download_folder)
                download_url = PROXY_DOWNLOAD_URL.format(target_proxy_version, target_info["file_name"])
                download_file = os.path.join(download_folder, target_info["file_name"])
                http_client = get_http_client()
                with open(download_file, "wb") as out:
                    r = http_client.request("GET", download_url, preload_content=False)
                    shutil.copyfileobj(r, out)
                if download_file.endswith(".zip"):
                    with zipfile.ZipFile(download_file, "r") as zip_ref:
                        zip_ref.extractall(download_folder)
                if download_file.endswith(".tar.gz"):
                    with tarfile.open(download_file) as tar_ref:
                        tar_ref.extractall(download_folder)
                os.remove(download_file)  # Remove downloaded file after contents are extracted
                # Record downloaded version for later comparison with target version in repo
                with open(os.path.join(download_folder, "downloaded_version.txt"), "w") as f:
                    f.writelines([target_proxy_version])
            executable_path = os.path.join(download_folder, target_info["executable"])
            # Mark the executable file as executable by all users; Mac drops these permissions during extraction
            if system == "Darwin":
                os.chmod(executable_path, 0o755)
            return os.path.abspath(executable_path).replace("\\", "/")
        else:
            _LOGGER.error(f'There are no available standalone proxy binaries for platform "{machine}".')
            raise Exception(
                "Unable to download a compatible standalone proxy for the current platform. File an issue against "
                "Azure/azure-sdk-tools with this error."
            )
    else:
        _LOGGER.error(f'There are no available standalone proxy binaries for system "{system}".')
        raise Exception(
            "Unable to download a compatible standalone proxy for the current system. File an issue against "
            "Azure/azure-sdk-tools with this error."
        )
def set_common_sanitizers() -> None:
    """Register sanitizers that will apply to all recordings throughout the SDK."""
    batch_sanitizers = {}
    # Remove headers from recordings if we don't need them, and ignore them if present
    # Authorization, for example, can contain sensitive info and can cause matching failures during challenge auth
    headers_to_ignore = "Authorization, x-ms-client-request-id, x-ms-request-id, Accept-Encoding"
    set_custom_default_matcher(excluded_headers=headers_to_ignore)
    batch_sanitizers[Sanitizer.REMOVE_HEADER] = [{"headers": headers_to_ignore}]
    # Remove OAuth interactions, which can contain client secrets and aren't necessary for playback testing
    # TODO: Determine why this breaks some test playbacks. Since sensitive info in OAuth responses is sanitized
    # through other sanitizers, it's fine to keep this off for now.
    # batch_sanitizers[Sanitizer.OAUTH_RESPONSE] = [None]
    # Body key sanitizers for sensitive fields in JSON requests/responses
    batch_sanitizers[Sanitizer.BODY_KEY] = []
    # Body regex sanitizers for sensitive patterns in request/response bodies
    batch_sanitizers[Sanitizer.BODY_REGEX] = []
    # General regex sanitizers for sensitive patterns throughout interactions
    batch_sanitizers[Sanitizer.GENERAL_REGEX] = [
        {"regex": '(?:[\\?&](sig|se|st|sv)=)(?<secret>[^&\\"\\s]*)', "group_for_replace": "secret", "value": SANITIZED},
    ]
    # Header regex sanitizers for sensitive patterns in request/response headers
    batch_sanitizers[Sanitizer.HEADER_REGEX] = []
    # URI regex sanitizers for sensitive patterns in request/response URLs
    batch_sanitizers[Sanitizer.URI_REGEX] = []
    # Send all the above sanitizers to the test proxy in a single, batch request
    add_batch_sanitizers(sanitizers=batch_sanitizers)
    # Remove certain sanitizers that are too aggressive and cause excessive playback failures
    #  - AZSDK2030: operation-location
    remove_batch_sanitizers(["AZSDK2030"])
def start_test_proxy(request) -> None:
    """Starts the test proxy and returns when the proxy server is ready to receive requests.
    In regular use cases, this will auto-start the test-proxy docker container. In CI, or when environment variable
    TF_BUILD is set, this function will start the test-proxy .NET tool.
    """
    repo_root = ascend_to_root(request.node.items[0].module.__file__)
    check_certificate_location(repo_root)
    if not PROXY_MANUALLY_STARTED:
        if check_availability() == 200:
            _LOGGER.debug("Tool is responding, exiting...")
        else:
            root = os.getenv("BUILD_SOURCESDIRECTORY", repo_root)
            _LOGGER.info("{} is calculated repo root".format(root))
            # If we're in CI, allow for tox environment parallelization and write proxy output to a log file
            log = None
            if in_ci():
                envname = os.getenv("TOX_ENV_NAME", "default")
                log = open(os.path.join(root, "_proxy_log_{}.log".format(envname)), "a")
                os.environ["PROXY_ASSETS_FOLDER"] = os.path.join(root, "l", envname)
                if not os.path.exists(os.environ["PROXY_ASSETS_FOLDER"]):
                    os.makedirs(os.environ["PROXY_ASSETS_FOLDER"])
            if os.getenv("TF_BUILD"):
                _LOGGER.info("Starting the test proxy tool from dotnet tool cache...")
                tool_name = "test-proxy"
            else:
                _LOGGER.info("Downloading and starting standalone proxy executable...")
                tool_name = prepare_local_tool(root)
            # Always start the proxy with these two defaults set to allow SSL connection
            passenv = {
                "ASPNETCORE_Kestrel__Certificates__Default__Path": os.path.join(
                    root, "eng", "common", "testproxy", "dotnet-devcert.pfx"
                ),
                "ASPNETCORE_Kestrel__Certificates__Default__Password": "password",
            }
            # If they are already set, override what we give the proxy with what is in os.environ
            passenv.update(os.environ)
            proc = subprocess.Popen(
                shlex.split(f'{tool_name} start --storage-location="{root}" -- --urls "{PROXY_URL}"'),
                stdout=log or subprocess.DEVNULL,
                stderr=log or subprocess.STDOUT,
                env=passenv,
            )
            os.environ[TOOL_ENV_VAR] = str(proc.pid)
    # Wait for the proxy server to become available
    check_proxy_availability()
    set_common_sanitizers()
def stop_test_proxy() -> None:
    """Stops any running instance of the test proxy"""
    if not PROXY_MANUALLY_STARTED:
        _LOGGER.info("Stopping the test proxy tool...")
        try:
            os.kill(int(os.getenv(TOOL_ENV_VAR)), signal.SIGTERM)
        except:
            _LOGGER.debug("Unable to kill running test-proxy process.")
@pytest.fixture(scope="session")
def test_proxy(request) -> Generator[None, None, None]:
    """Pytest fixture to be used before running any tests that are recorded with the test proxy"""
    if is_live_and_not_recording():
        yield
    else:
        start_test_proxy(request)
        # Everything before this yield will be run before fixtures that invoke this one are run
        # Everything after it will be run after invoking fixtures are done executing
        yield
        stop_test_proxy()
 |