File: verify_whl.py

package info (click to toggle)
python-azure 20251118%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 783,356 kB
  • sloc: python: 6,474,533; ansic: 804; javascript: 287; sh: 205; makefile: 198; xml: 109
file content (208 lines) | stat: -rw-r--r-- 7,989 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

# This script is used to verify that root directory in whl is azure and to verify manifest so all directories in soruce is included in sdist
import argparse
import logging
import os
import glob
import shutil
import tempfile
import subprocess
from packaging.version import Version
from typing import Dict, Any, Optional
from tox_helper_tasks import (
    unzip_file_to_directory,
)

from ci_tools.parsing import ParsedSetup, extract_package_metadata
from pypi_tools.pypi import retrieve_versions_from_pypi

logging.getLogger().setLevel(logging.INFO)

# Excluding auto generated applicationinsights and loganalytics
EXCLUDED_PACKAGES = [
    "azure",
    "azure-mgmt",
    "azure-common",
    "azure-applicationinsights",
    "azure-loganalytics",
]


def extract_whl(dist_dir, version):
    # Find whl for the package
    path_to_whl = glob.glob(os.path.join(dist_dir, "*{}*.whl".format(version)))[0]

    # Cleanup any existing stale files if any and rename whl file to zip for extraction later
    zip_file = path_to_whl.replace(".whl", ".zip")
    cleanup(zip_file)
    os.rename(path_to_whl, zip_file)

    # Extrat renamed gz file to unzipped folder
    extract_location = os.path.join(dist_dir, "unzipped")
    cleanup(extract_location)
    unzip_file_to_directory(zip_file, extract_location)
    return extract_location


def verify_whl_root_directory(dist_dir: str, expected_top_level_module: str, parsed_pkg: ParsedSetup) -> bool:
    # Verify metadata compatibility with prior version
    version: str = parsed_pkg.version
    metadata: Dict[str, Any] = extract_package_metadata(get_path_to_zip(dist_dir, version))
    prior_version = get_prior_version(parsed_pkg.name, version)
    if prior_version:
        if not verify_prior_version_metadata(parsed_pkg.name, prior_version, metadata):
            return False

    # This method ensures root directory in whl is the directory indicated by our top level namespace
    extract_location = extract_whl(dist_dir, version)
    root_folders = os.listdir(extract_location)

    # check for non 'azure' folder as root folder
    non_azure_folders = [
        d for d in root_folders if d != expected_top_level_module and not d.endswith(".dist-info")
    ]
    if non_azure_folders:
        logging.error(
            "whl has following incorrect directory at root level [%s]",
            non_azure_folders,
        )
        return False
    else:
        return True


def cleanup(path):
    # This function deletes all files and cleanup the directory if it exists
    if os.path.exists(path):
        if os.path.isdir(path):
            shutil.rmtree(path)
        else:
            os.remove(path)


def should_verify_package(package_name):
    return (
        package_name not in EXCLUDED_PACKAGES
        and "nspkg" not in package_name
    )


def get_prior_version(package_name: str, current_version: str) -> Optional[str]:
    """Get prior stable version if it exists, otherwise get prior preview version, else return None."""
    try:
        all_versions = retrieve_versions_from_pypi(package_name)
        current_ver = Version(current_version)
        prior_versions = [Version(v) for v in all_versions if Version(v) < current_ver]
        if not prior_versions:
            return None

        # Try stable versions first
        stable_versions = [v for v in prior_versions if not v.is_prerelease]
        if stable_versions:
            return str(max(stable_versions))

        # Fall back to preview versions
        preview_versions = [v for v in prior_versions if v.is_prerelease]
        return str(max(preview_versions)) if preview_versions else None
    except Exception:
        return None


def verify_prior_version_metadata(package_name: str, prior_version: str, current_metadata: Dict[str, Any], package_type: str = "*.whl") -> bool:
    """Download prior version and verify metadata compatibility."""
    with tempfile.TemporaryDirectory() as tmp_dir:
        try:
            is_binary = "--only-binary=:all:" if package_type == "*.whl" else "--no-binary=:all:"
            subprocess.run([
                "pip", "download", "--no-deps", is_binary,
                f"{package_name}=={prior_version}", "--dest", tmp_dir
            ], check=True, capture_output=True)
            zip_files = glob.glob(os.path.join(tmp_dir, package_type))
            # If no match and we're not constrained to wheel-only, attempt legacy sdist (zip) once.
            if not zip_files and package_type != "*.whl":
                zip_files = glob.glob(os.path.join(tmp_dir, "*.zip"))
            if not zip_files:  # Still nothing -> treat as no prior artifact to compare.
                return True

            prior_metadata: Dict[str, Any] = extract_package_metadata(zip_files[0])
            is_compatible = verify_metadata_compatibility(current_metadata, prior_metadata)
            return is_compatible
        except Exception:
            return True


def verify_metadata_compatibility(current_metadata: Dict[str, Any], prior_metadata: Dict[str, Any]) -> bool:
    """Verify that all keys from prior version metadata are present in current version.

    Special handling: homepage/repository keys are exempt from prior compatibility check,
    but current version must have at least one of them.
    """
    if not current_metadata:
        return False
    # Check that current version has at least one homepage or repository URL
    repo_urls = ['homepage', 'repository']
    current_keys_lower = {k.lower() for k in current_metadata.keys()}
    if not any(key in current_keys_lower for key in repo_urls):
        logging.error(f"Current metadata must contain at least one of: {repo_urls}")
        return False

    if not prior_metadata:
        return True

    # For backward compatibility check, exclude homepage/repository from prior requirements
    prior_keys_filtered = {k for k in prior_metadata.keys() if k.lower() not in repo_urls}
    current_keys = set(current_metadata.keys())

    is_compatible = prior_keys_filtered.issubset(current_keys)
    if not is_compatible:
        missing_keys = prior_keys_filtered - current_keys
        logging.error("Metadata compatibility failed. Missing keys: %s", missing_keys)
    return is_compatible


def get_path_to_zip(dist_dir: str, version: str, package_type: str = "*.whl") -> str:
    return glob.glob(os.path.join(dist_dir, "**", "*{}{}".format(version, package_type)), recursive=True)[0]


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Verify directories included in whl, contents in manifest file, and metadata compatibility"
    )

    parser.add_argument(
        "-t",
        "--target",
        dest="target_package",
        help="The target package directory on disk.",
        required=True,
    )

    parser.add_argument(
        "-d",
        "--dist_dir",
        dest="dist_dir",
        help="The dist location on disk. Usually /tox/dist.",
        required=True,
    )

    args = parser.parse_args()

    # get target package name from target package path
    pkg_dir = os.path.abspath(args.target_package)

    pkg_details = ParsedSetup.from_path(pkg_dir)
    top_level_module = pkg_details.namespace.split('.')[0]

    if should_verify_package(pkg_details.name):
        logging.info("Verifying whl for package: [%s]", pkg_details.name)
        if verify_whl_root_directory(args.dist_dir, top_level_module, pkg_details):
            logging.info("Verified whl for package: [%s]", pkg_details.name)
        else:
            logging.error("Failed to verify whl for package: [%s]", pkg_details.name)
            exit(1)