File: tox_helper_tasks.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (148 lines) | stat: -rw-r--r-- 5,302 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

# This script is intended to be a place holder for common tasks that are requried by scripts running on tox

import shutil
import sys
import logging
import ast
import os
import textwrap
import io
import glob
import zipfile
import fnmatch
import subprocess
import re

from packaging.specifiers import SpecifierSet
from pkg_resources import Requirement, parse_version

logging.getLogger().setLevel(logging.INFO)


def get_pip_list_output():
    """Uses the invoking python executable to get the output from pip list."""
    out = subprocess.Popen(
        [sys.executable, "-m", "pip", "list", "--disable-pip-version-check", "--format", "freeze"],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

    stdout, stderr = out.communicate()

    collected_output = {}

    if stdout and (stderr is None):
        # this should be compatible with py27 https://docs.python.org/2.7/library/stdtypes.html#str.decode
        for line in stdout.decode("utf-8").split(os.linesep)[2:]:
            if line:
                package, version = re.split("==", line)
                collected_output[package] = version
    else:
        raise Exception(stderr)

    return collected_output


def unzip_sdist_to_directory(containing_folder):
    # grab the first one
    path_to_zip_file = glob.glob(os.path.join(containing_folder, "*.zip"))[0]
    return unzip_file_to_directory(path_to_zip_file, containing_folder)


def unzip_file_to_directory(path_to_zip_file, extract_location):
    # unzip file in given path
    # dump into given path
    with zipfile.ZipFile(path_to_zip_file, "r") as zip_ref:
        zip_ref.extractall(extract_location)
        extracted_dir = os.path.basename(os.path.splitext(path_to_zip_file)[0])
        return os.path.join(extract_location, extracted_dir)


def move_and_rename(source_location):
    new_location = os.path.join(os.path.dirname(source_location), "unzipped")

    if os.path.exists(new_location):
        shutil.rmtree(new_location)

    os.rename(source_location, new_location)
    return new_location


def find_sdist(dist_dir, pkg_name, pkg_version):
    # This function will find a sdist for given package name
    if not os.path.exists(dist_dir):
        logging.error("dist_dir is incorrect")
        return

    if pkg_name is None:
        logging.error("Package name cannot be empty to find sdist")
        return

    pkg_name_format = "{0}-{1}.zip".format(pkg_name, pkg_version)
    packages = []
    for root, dirnames, filenames in os.walk(dist_dir):
        for filename in fnmatch.filter(filenames, pkg_name_format):
            packages.append(os.path.join(root, filename))

    packages = [os.path.relpath(w, dist_dir) for w in packages]

    if not packages:
        logging.error("No sdist is found in directory %s with package name format %s", dist_dir, pkg_name_format)
        return
    return packages[0]


def find_whl(whl_dir, pkg_name, pkg_version):
    # This function will find a whl for given package name
    if not os.path.exists(whl_dir):
        logging.error("whl_dir is incorrect")
        return

    if pkg_name is None:
        logging.error("Package name cannot be empty to find whl")
        return

    pkg_name_format = "{0}-{1}*.whl".format(pkg_name.replace("-", "_"), pkg_version)
    whls = []
    for root, dirnames, filenames in os.walk(whl_dir):
        for filename in fnmatch.filter(filenames, pkg_name_format):
            whls.append(os.path.join(root, filename))

    whls = [os.path.relpath(w, whl_dir) for w in whls]

    if not whls:
        logging.error("No whl is found in directory %s with package name format %s", whl_dir, pkg_name_format)
        logging.info("List of whls in directory: %s", glob.glob(os.path.join(whl_dir, "*.whl")))
        return

    if len(whls) > 1:
        # So we have whls specific to py version or platform since we are here
        py_version = "py{0}".format(sys.version_info.major)
        # filter whl for py version and check if we found just one whl
        whls = [w for w in whls if py_version in w]

        # if whl is platform independent then there should only be one whl in filtered list
        if len(whls) > 1:
            # if we have reached here, that means we have whl specific to platform as well.
            # for now we are failing the test if platform specific wheels are found. Todo: enhance to find platform specific whl
            logging.error(
                "More than one whl is found in wheel directory for package {}. Platform specific whl discovery is not supported now".format(
                    pkg_name
                )
            )
            sys.exit(1)

    # Additional filtering based on arch type willbe required in future if that need arises.
    # for now assumption is that no arch specific whl is generated
    if len(whls) == 1:
        logging.info("Found whl {}".format(whls[0]))
        return whls[0]
    else:
        return None