1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import json
import os
from knack.log import get_logger
from azext_devops.dev.common.services import _get_credentials
from azext_devops.dev.common.const import ARTIFACTTOOL_PAT_ENVKEY
logger = get_logger(__name__)
class ArtifactToolInvoker:
def __init__(self, tool_invoker, artifacttool_updater):
self._tool_invoker = tool_invoker
self._artifacttool_updater = artifacttool_updater
def download_pipeline_artifact(self, organization, project, run_id, artifact_name, path):
args = ["pipelineartifact", "download", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
"--project", project, "--pipeline-id", run_id, "--artifact-name", artifact_name, "--path", path]
return self.run_artifacttool(organization, args, "Downloading")
def upload_pipeline_artifact(self, organization, project, run_id, artifact_name, path):
args = ["pipelineartifact", "publish", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
"--project", project, "--pipeline-id", run_id, "--artifact-name", artifact_name, "--path", path]
return self.run_artifacttool(organization, args, "Uploading")
def download_universal(self, organization, project, feed, package_name, package_version, path, file_filter):
args = ["universal", "download", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
"--feed", feed, "--package-name", package_name, "--package-version", package_version,
"--path", path]
if project:
args.extend(["--project", project])
if file_filter:
args.extend(["--filter", file_filter])
return self.run_artifacttool(organization, args, "Downloading")
def publish_universal(self, organization, project, feed, package_name, package_version, description, path):
args = ["universal", "publish", "--service", organization, "--patvar", ARTIFACTTOOL_PAT_ENVKEY,
"--feed", feed, "--package-name", package_name, "--package-version", package_version, "--path", path]
if project:
args.extend(["--project", project])
if description:
args.extend(["--description", description])
return self.run_artifacttool(organization, args, "Publishing")
def run_artifacttool(self, organization, args, initial_progress_message):
# Download ArtifactTool if necessary, and return the path
artifacttool_dir = self._artifacttool_updater.get_latest_artifacttool(organization)
artifacttool_binary_path = os.path.join(artifacttool_dir, "artifacttool")
# Populate the environment for the process with the PAT
creds = _get_credentials(organization)
new_env = os.environ.copy()
new_env[ARTIFACTTOOL_PAT_ENVKEY] = str(creds.password)
# Run ArtifactTool
command_args = [artifacttool_binary_path] + args
proc = self._tool_invoker.run(command_args, new_env, initial_progress_message, _process_stderr)
if proc:
output = proc.stdout.read().decode('utf-8')
try:
return json.loads(output)
except ValueError: # JSONDecodeError but not available on Python 2.7
if output:
logger.warning("Failed to parse the output of ArtifactTool as JSON. The output was:\n %s", output)
return None
def _process_stderr(line, update_progress_callback):
try:
json_line = json.loads(line)
except BaseException as ex: # pylint: disable=broad-except
json_line = None
logger.warning("Failed to parse structured output from Universal Packages tooling (ArtifactTool)")
logger.warning("Exception: %s", ex)
logger.warning("Log line: %s", line)
return
_log_message(json_line)
_process_event(json_line, update_progress_callback)
# Interpret the structured log line from ArtifactTool and emit the message to Azure devops CLI logging
def _log_message(json_line):
if json_line is not None and '@m' in json_line:
# Serilog doesn't emit @l for Information it seems
log_level = json_line['@l'] if '@l' in json_line else "Information"
message = json_line['@m']
if log_level in ["Critical", "Error"]:
ex = json_line['@x'] if '@x' in json_line else None
if ex:
message = "{}\n{}".format(message, ex)
logger.error(message)
if log_level == "Warning":
logger.warning(message)
elif log_level == "Information":
logger.info(message)
else:
logger.debug(message)
# Inspect the structured log line for an event, and update the progress
def _process_event(json_line, update_progress_callback):
if json_line is not None and 'EventId' in json_line and 'Name' in json_line['EventId']:
event_name = json_line['EventId']['Name']
if event_name == "ProcessingFiles":
processed_files = json_line['ProcessedFiles']
total_files = json_line['TotalFiles']
percent = 100 * float(processed_files) / float(total_files)
update_progress_callback("Pre-upload processing: {}/{} files"
.format(processed_files, total_files), percent)
if event_name == "Uploading":
uploaded_bytes = json_line['UploadedBytes']
total_bytes = json_line['TotalBytes']
percent = 100 * float(uploaded_bytes) / float(total_bytes)
update_progress_callback("Uploading: {}/{} bytes".format(uploaded_bytes, total_bytes), percent)
if event_name == "Downloading":
downloaded_bytes = json_line['DownloadedBytes']
total_bytes = json_line['TotalBytes']
percent = 100 * float(downloaded_bytes) / float(total_bytes)
update_progress_callback("Downloading: {}/{} bytes".format(downloaded_bytes, total_bytes), percent)
|