File: runs_artifacts.py

package info (click to toggle)
azure-devops-cli-extension 1.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,384 kB
  • sloc: python: 160,782; xml: 198; makefile: 56; sh: 51
file content (54 lines) | stat: -rw-r--r-- 2,825 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from knack.log import get_logger
from azext_devops.dev.common.services import (get_build_client,
                                              resolve_instance_and_project)
from azext_devops.dev.common.artifacttool import ArtifactToolInvoker
from azext_devops.dev.common.artifacttool_updater import ArtifactToolUpdater
from azext_devops.dev.common.external_tool import ProgressReportingExternalToolInvoker

logger = get_logger(__name__)


def run_artifact_download(run_id, artifact_name, path, organization=None, project=None, detect=None):
    """ Download a pipeline artifact.
    :param run_id: ID of the run that the artifact is associated to.
    :type run_id: int
    :param artifact_name: Name of the artifact to download.
    :type artifact_name: string
    :param path: Path to download the artifact into.
    :type path: string
    """
    organization, project = resolve_instance_and_project(detect=detect, organization=organization, project=project)
    artifact_tool = ArtifactToolInvoker(ProgressReportingExternalToolInvoker(), ArtifactToolUpdater())
    return artifact_tool.download_pipeline_artifact(
        organization=organization, project=project, run_id=run_id, artifact_name=artifact_name, path=path)


def run_artifact_list(run_id, organization=None, project=None, detect=None):
    """ List artifacts associated with a run.
    :param run_id: ID of the run that the artifact is associated to.
    :type run_id: int
    """
    organization, project = resolve_instance_and_project(detect=detect, organization=organization, project=project)
    client = get_build_client(organization)
    artifacts = client.get_artifacts(project=project, build_id=run_id)
    return artifacts


def run_artifact_upload(run_id, artifact_name, path, organization=None, project=None, detect=None):
    """ Upload a pipeline artifact.
    :param run_id: ID of the run that the artifact is associated to.
    :type run_id: int
    :param artifact_name: Name of the artifact to upload.
    :type artifact_name: string
    :param path: Path to upload the artifact from.
    :type path: string
    """
    organization, project = resolve_instance_and_project(detect=detect, organization=organization, project=project)
    artifact_tool = ArtifactToolInvoker(ProgressReportingExternalToolInvoker(), ArtifactToolUpdater())
    return artifact_tool.upload_pipeline_artifact(
        organization=organization, project=project, run_id=run_id, artifact_name=artifact_name, path=path)