File: artifacttool.py

package info (click to toggle)
azure-devops-cli-extension 1.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,384 kB
  • sloc: python: 160,782; xml: 198; makefile: 56; sh: 51
file content (132 lines) | stat: -rw-r--r-- 6,324 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import json
import os

from knack.log import get_logger

from azext_devops.dev.common.services import _get_credentials
from azext_devops.dev.common.const import ARTIFACTTOOL_PAT_ENVKEY

logger = get_logger(__name__)


class ArtifactToolInvoker:
    def __init__(self, tool_invoker, artifacttool_updater):
        self._tool_invoker = tool_invoker
        self._artifacttool_updater = artifacttool_updater

    def download_pipeline_artifact(self, organization, project, run_id, artifact_name, path):
        args = ["pipelineartifact", "download", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
                "--project", project, "--pipeline-id", run_id, "--artifact-name", artifact_name, "--path", path]
        return self.run_artifacttool(organization, args, "Downloading")

    def upload_pipeline_artifact(self, organization, project, run_id, artifact_name, path):
        args = ["pipelineartifact", "publish", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
                "--project", project, "--pipeline-id", run_id, "--artifact-name", artifact_name, "--path", path]
        return self.run_artifacttool(organization, args, "Uploading")

    def download_universal(self, organization, project, feed, package_name, package_version, path, file_filter):
        args = ["universal", "download", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
                "--feed", feed, "--package-name", package_name, "--package-version", package_version,
                "--path", path]

        if project:
            args.extend(["--project", project])

        if file_filter:
            args.extend(["--filter", file_filter])
        return self.run_artifacttool(organization, args, "Downloading")

    def publish_universal(self, organization, project, feed, package_name, package_version, description, path):
        args = ["universal", "publish", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
                "--feed", feed, "--package-name", package_name, "--package-version", package_version, "--path", path]

        if project:
            args.extend(["--project", project])

        if description:
            args.extend(["--description", description])
        return self.run_artifacttool(organization, args, "Publishing")

    def run_artifacttool(self, organization, args, initial_progress_message):
        # Download ArtifactTool if necessary, and return the path
        artifacttool_dir = self._artifacttool_updater.get_latest_artifacttool(organization)
        artifacttool_binary_path = os.path.join(artifacttool_dir, "artifacttool")

        # Populate the environment for the process with the PAT
        creds = _get_credentials(organization)
        new_env = os.environ.copy()
        new_env[ARTIFACTTOOL_PAT_ENVKEY] = str(creds.password)

        # Run ArtifactTool
        command_args = [artifacttool_binary_path] + args
        proc = self._tool_invoker.run(command_args, new_env, initial_progress_message, _process_stderr)
        if proc:
            output = proc.stdout.read().decode('utf-8')
            try:
                return json.loads(output)
            except ValueError:  # JSONDecodeError but not available on Python 2.7
                if output:
                    logger.warning("Failed to parse the output of ArtifactTool as JSON. The output was:\n %s", output)
        return None


def _process_stderr(line, update_progress_callback):
    try:
        json_line = json.loads(line)
    except BaseException as ex:  # pylint: disable=broad-except
        json_line = None
        logger.warning("Failed to parse structured output from Universal Packages tooling (ArtifactTool)")
        logger.warning("Exception: %s", ex)
        logger.warning("Log line: %s", line)
        return

    _log_message(json_line)
    _process_event(json_line, update_progress_callback)


# Interpret the structured log line from ArtifactTool and emit the message to Azure devops CLI logging
def _log_message(json_line):
    if json_line is not None and '@m' in json_line:
        # Serilog doesn't emit @l for Information it seems
        log_level = json_line['@l'] if '@l' in json_line else "Information"
        message = json_line['@m']
        if log_level in ["Critical", "Error"]:
            ex = json_line['@x'] if '@x' in json_line else None
            if ex:
                message = "{}\n{}".format(message, ex)
            logger.error(message)
        if log_level == "Warning":
            logger.warning(message)
        elif log_level == "Information":
            logger.info(message)
        else:
            logger.debug(message)


# Inspect the structured log line for an event, and update the progress
def _process_event(json_line, update_progress_callback):
    if json_line is not None and 'EventId' in json_line and 'Name' in json_line['EventId']:
        event_name = json_line['EventId']['Name']
        if event_name == "ProcessingFiles":
            processed_files = json_line['ProcessedFiles']
            total_files = json_line['TotalFiles']
            percent = 100 * float(processed_files) / float(total_files)
            update_progress_callback("Pre-upload processing: {}/{} files"
                                     .format(processed_files, total_files), percent)

        if event_name == "Uploading":
            uploaded_bytes = json_line['UploadedBytes']
            total_bytes = json_line['TotalBytes']
            percent = 100 * float(uploaded_bytes) / float(total_bytes)
            update_progress_callback("Uploading: {}/{} bytes".format(uploaded_bytes, total_bytes), percent)

        if event_name == "Downloading":
            downloaded_bytes = json_line['DownloadedBytes']
            total_bytes = json_line['TotalBytes']
            percent = 100 * float(downloaded_bytes) / float(total_bytes)
            update_progress_callback("Downloading: {}/{} bytes".format(downloaded_bytes, total_bytes), percent)