1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
|
"""
Helper functions for updating conda files.
"""
import os
import glob
from functools import lru_cache
from typing import Optional
import csv
from ci_tools.logging import logger
import urllib.request
from datetime import datetime
from ci_tools.parsing import ParsedSetup
from packaging.version import Version
from pypi_tools.pypi import PyPIClient, retrieve_versions_from_pypi
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
SDK_DIR = os.path.join(ROOT_DIR, "sdk")
AZURE_SDK_CSV_URL = "https://raw.githubusercontent.com/Azure/azure-sdk/main/_data/releases/latest/python-packages.csv"
PACKAGE_COL = "Package"
LATEST_GA_DATE_COL = "LatestGADate"
VERSION_GA_COL = "VersionGA"
FIRST_GA_DATE_COL = "FirstGADate"
DISPLAY_NAME_COL = "DisplayName"
SERVICE_NAME_COL = "ServiceName"
REPO_PATH_COL = "RepoPath"
TYPE_COL = "Type"
SUPPORT_COL = "Support"
# =====================================
# Helpers for handling bundled releases
# =====================================
@lru_cache(maxsize=None)
def _build_package_path_index() -> dict[str, str]:
"""
Build a one-time index mapping package names to their filesystem paths.
This scans the sdk/ directory once and caches the result for all subsequent lookups.
"""
all_paths = glob.glob(os.path.join(SDK_DIR, "*", "*"))
# Exclude temp directories like .tox, .venv, __pycache__, etc.
return {
os.path.basename(p): p
for p in all_paths
if os.path.isdir(p) and not os.path.basename(p).startswith((".", "__"))
}
def get_package_path(package_name: str) -> Optional[str]:
"""Get the filesystem path of an SDK package given its name."""
path_index = _build_package_path_index()
package_path = path_index.get(package_name)
if not package_path:
logger.warning(f"Package path not found for package: {package_name}")
return None
return package_path
def get_bundle_name(package_name: str) -> Optional[str]:
"""
Check bundled release config from package's pyproject.toml file given the package name.
If bundled, return the bundle name; otherwise, return None.
"""
package_path = get_package_path(package_name)
if not package_path:
logger.warning(f"Cannot determine package path for {package_name}")
return None
parsed = ParsedSetup.from_path(package_path)
if not parsed:
# can't proceed, need to know if it's bundled or not
logger.error(f"Failed to parse setup for package {package_name}")
raise Exception(f"Failed to parse setup for package {package_name}")
conda_config = parsed.get_conda_config()
if not conda_config:
if is_stable_on_pypi(package_name):
raise Exception(
f"Stable release package {package_name} needs a conda config"
)
logger.warning(
f"No conda config found for package {package_name}, which may be a pre-release"
)
return None
if conda_config and "bundle_name" in conda_config:
return conda_config["bundle_name"]
return None
def map_bundle_to_packages(
package_names: list[str],
) -> tuple[dict[str, list[str]], list[str]]:
"""Create a mapping of bundle names to their constituent package names.
:return: Tuple of (bundle_map, failed_packages) where failed_packages are packages that threw exceptions.
"""
logger.info("Mapping bundle names to packages...")
bundle_map = {}
failed_packages = []
for package_name in package_names:
logger.debug(f"Processing package for bundle mapping: {package_name}")
try:
bundle_name = get_bundle_name(package_name)
if bundle_name:
logger.debug(f"Bundle name for package {package_name}: {bundle_name}")
bundle_map.setdefault(bundle_name, []).append(package_name)
except Exception as e:
logger.error(f"Failed to get bundle name for {package_name}: {e}")
failed_packages.append(package_name)
continue
return bundle_map, failed_packages
# =====================================
# Utility functions for parsing data
# =====================================
def parse_csv() -> list[dict[str, str]]:
"""Download and parse the Azure SDK Python packages CSV file."""
try:
logger.info(f"Downloading CSV from {AZURE_SDK_CSV_URL}")
with urllib.request.urlopen(AZURE_SDK_CSV_URL, timeout=10) as response:
csv_content = response.read().decode("utf-8")
# Parse the CSV content
csv_reader = csv.DictReader(csv_content.splitlines())
packages = list(csv_reader)
logger.info(f"Successfully parsed {len(packages)} packages from CSV")
return packages
except Exception as e:
logger.error(f"Failed to download or parse CSV: {e}")
return []
def is_mgmt_package(pkg: dict[str, str]) -> bool:
pkg_name = pkg.get(PACKAGE_COL, "")
_type = pkg.get(TYPE_COL, "")
if _type == "mgmt":
return True
elif _type == "client":
return False
else:
return pkg_name != "azure-mgmt-core" and (
"mgmt" in pkg_name or "cognitiveservices" in pkg_name
)
def separate_packages_by_type(
packages: list[dict[str, str]],
) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
"""Separate packages into data plane and management plane libraries."""
data_plane_packages = []
mgmt_plane_packages = []
for pkg in packages:
if is_mgmt_package(pkg):
mgmt_plane_packages.append(pkg)
else:
data_plane_packages.append(pkg)
logger.debug(
f"Separated {len(data_plane_packages)} data plane and {len(mgmt_plane_packages)} management plane packages"
)
return (data_plane_packages, mgmt_plane_packages)
def package_needs_update(
package_row: dict[str, str], prev_release_date: str, is_new=False
) -> bool:
"""
Check if the package is new or needs version update (i.e., FirstGADate or LatestGADate is after the last release).
:param package_row: The parsed CSV row for the package.
:param prev_release_date: The date of the previous release in "mm/dd/yyyy" format.
:param is_new: Whether to check for new package (FirstGADate) or outdated package (LatestGADate).
:return: if the package is new or needs an update.
"""
compare_date = (
package_row.get(FIRST_GA_DATE_COL)
if is_new
else package_row.get(LATEST_GA_DATE_COL)
)
logger.debug(
f"Checking {'new package' if is_new else 'outdated package'} for package {package_row.get(PACKAGE_COL)} with against date: {compare_date}"
)
if not compare_date:
logger.debug(
f"Package {package_row.get(PACKAGE_COL)} is skipped due to missing {FIRST_GA_DATE_COL if is_new else LATEST_GA_DATE_COL}."
)
return False
try:
# Convert string dates to datetime objects for proper comparison
compare_date = datetime.strptime(compare_date, "%m/%d/%Y")
prev_date = datetime.strptime(prev_release_date, "%m/%d/%Y")
logger.debug(
f"Comparing {package_row.get(PACKAGE_COL)} CompareDate {compare_date} with previous release date {prev_date}"
)
return compare_date > prev_date
except ValueError as e:
logger.error(
f"Date parsing error for package {package_row.get(PACKAGE_COL)}: {e}"
)
return False
def is_stable_on_pypi(package_name: str) -> bool:
"""
Check if a package has any stable (GA) release on PyPI.
:param package_name: The name of the package to check.
:return: True if any stable version exists on PyPI, False otherwise.
"""
try:
versions = retrieve_versions_from_pypi(package_name)
if not versions:
logger.warning(f"No versions found on PyPI for {package_name}")
return False
# Check if any version is stable (not a prerelease)
for v in versions:
if not Version(v).is_prerelease:
logger.debug(f"Package {package_name} has stable version {v}")
return True
logger.debug(f"Package {package_name} has no stable versions")
return False
except Exception as e:
logger.warning(f"Failed to check PyPI for {package_name}: {e}")
return False
def get_package_data_from_pypi(
package_name: str,
) -> tuple[Optional[str], Optional[str]]:
"""Fetch the latest version and download URI for a package from PyPI."""
try:
client = PyPIClient()
data = client.project(package_name)
# Get the latest version
latest_version = data["info"]["version"]
if latest_version in data["releases"] and data["releases"][latest_version]:
# Get the source distribution (sdist) if available
files = data["releases"][latest_version]
source_dist = next((f for f in files if f["packagetype"] == "sdist"), None)
if source_dist:
download_url = source_dist["url"]
logger.info(
f"Found download URL for {package_name}=={latest_version}: {download_url}"
)
return latest_version, download_url
except Exception as e:
logger.error(f"Failed to fetch download URI from PyPI for {package_name}: {e}")
return None, None
def build_package_index(conda_artifacts: list[dict]) -> dict[str, tuple[int, int]]:
"""Build an index of package name -> (artifact_idx, checkout_idx) for fast lookups in conda-sdk-client.yml."""
package_index = {}
for artifact_idx, artifact in enumerate(conda_artifacts):
if "checkout" in artifact:
for checkout_idx, checkout_item in enumerate(artifact["checkout"]):
package_name = checkout_item.get("package")
if package_name:
package_index[package_name] = (artifact_idx, checkout_idx)
return package_index
def get_valid_package_imports(package_name: str) -> list[str]:
"""
Inspect the package's actual module structure and return only valid imports.
:param package_name: The name of the package (e.g., "azure-mgmt-advisor" or "azure-eventgrid").
:return: List of valid module names for import (e.g., ["azure.eventgrid", "azure.eventgrid.aio"]).
"""
package_path = get_package_path(package_name)
if not package_path:
logger.warning(
f"Could not find package path for {package_name} to determine imports, using fallback"
)
return [package_name.replace("-", ".")]
else:
parsed = ParsedSetup.from_path(package_path)
if not parsed or not parsed.namespace:
logger.warning(
f"Could not parse namespace for {package_name}, using fallback"
)
module_name = package_name.replace("-", ".")
else:
module_name = parsed.namespace
imports = [module_name]
# Construct the path to the actual module directory
module_parts = module_name.split(".")
module_dir = os.path.join(package_path, *module_parts)
if not os.path.isdir(module_dir):
logger.warning(
f"Module directory not found for {package_name} at {module_dir}, using base import only"
)
return imports
# Check for common submodules and only add if they exist
submodules_to_check = ["aio", "models", "operations"]
for submodule_name in submodules_to_check:
submodule_path = os.path.join(module_dir, submodule_name)
if os.path.isdir(submodule_path) and os.path.exists(
os.path.join(submodule_path, "__init__.py")
):
imports.append(f"{module_name}.{submodule_name}")
# Check for aio.operations (nested submodule)
aio_operations_path = os.path.join(module_dir, "aio", "operations")
if os.path.isdir(aio_operations_path) and os.path.exists(
os.path.join(aio_operations_path, "__init__.py")
):
imports.append(f"{module_name}.aio.operations")
return imports
|