File: invoke.py

package info (click to toggle)
azure-devops-cli-extension 1.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,384 kB
  • sloc: python: 160,782; xml: 198; makefile: 56; sh: 51
file content (157 lines) | stat: -rw-r--r-- 6,018 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from __future__ import print_function
from knack.log import get_logger
from knack.util import CLIError

from azext_devops.dev.common.services import (resolve_instance,
                                              get_connection)
from azext_devops.devops_sdk.client import Client


logger = get_logger(__name__)


# pylint: disable=too-many-locals, too-many-statements, inconsistent-return-statements, protected-access, too-many-branches
def invoke(area=None, resource=None,
           route_parameters=None,
           query_parameters=None,
           api_version='5.0',
           http_method='GET',
           in_file=None,
           encoding='utf-8',
           media_type='application/json',
           accept_media_type='application/json',
           out_file=None,
           organization=None, detect=None):
    logger.info('route_parameter received is %s', route_parameters)
    version = apiVersionToFloat(api_version)

    organization = resolve_instance(detect=detect, organization=organization)
    connection = get_connection(organization)

    request_body = None
    if in_file:
        from os import path
        if not path.exists(in_file):
            raise CLIError('--in-file does not point to a valid file location')
        from azext_devops.dev.common.utils import read_file_content
        in_file_content = read_file_content(file_path=in_file, encoding=encoding)
        import json
        request_body = json.loads(in_file_content)

    resource_areas = connection._get_resource_areas(force=True)

    if (not area and not resource):
        print('Please wait a couple of seconds while we fetch all required information.')
        service_list = []

        for x in resource_areas:
            if x.location_url not in service_list:
                service_list.append(x.location_url)

        resource_locations = []

        for x in service_list:
            try:
                logger.info('trying to get locations from %s', x)
                clientMock = Client(x, connection._creds)
                resource_location_on_this_service = clientMock._get_resource_locations(all_host_types=True)
                resource_locations.extend(resource_location_on_this_service)
            except:  # pylint: disable=bare-except
                logger.info('Failed to get location for %s', x)

        return resource_locations

    client_url = ''
    if not resource_areas:
        # this is for on-prem
        client_url = connection.base_url

    for resource_area in resource_areas:
        if resource_area.name.lower() == area.lower():
            client_url = resource_area.location_url

    if not client_url:
        raise CLIError('--area is not present in current organization')

    client = Client(client_url, connection._creds)

    # there can be multiple resource/ area with different version so this version comparision is needed
    location_id = ''
    current_version = 0.0
    resource_locations = client._get_resource_locations(all_host_types=True)
    for resource_location in resource_locations:
        if (resource.lower() == resource_location.resource_name.lower() and
                area.lower() == resource_location.area.lower()):
            current_maxVersion = float(resource_location.max_version)
            if current_maxVersion > current_version and version >= current_version:
                location_id = resource_location.id
                current_version = current_maxVersion

    if not location_id:
        raise CLIError('--resource and --api-version combination is not correct')

    route_values = stringToDict(route_parameters)
    query_values = stringToDict(query_parameters)

    response = client._send(http_method=http_method,
                            location_id=location_id,
                            version=api_version,
                            query_parameters=query_values,
                            route_values=route_values,
                            media_type=media_type,
                            accept_media_type=accept_media_type,
                            content=request_body)
    logger.info('content type header')
    logger.info(response.headers.get("content-type"))
    is_content_available = True

    if not response.headers.get("content-type"):
        logger.info('Content type header is None.')
        is_content_available = False
    elif 'json' in response.headers.get("content-type") and not out_file:
        response_dict = response.json()
        response_dict["continuation_token"] = response.headers.get('X-MS-ContinuationToken')
        return response_dict

    # Only handle out file scenario if the content is available (content-type is not None)
    if is_content_available:
        if not out_file:
            raise CLIError('Response is not json, you need to provide --out-file where it can be written')

        import os
        if os.path.exists(out_file):
            raise CLIError('Out file already exists, please give a new name.')

        open(out_file, "a").close()

        with open(out_file, 'ab') as f:
            for chunk in client._client.stream_download(response, callback=None):
                f.write(chunk)


def apiVersionToFloat(apiVersion):
    apiVersion = apiVersion.replace('-preview', '')

    return float(apiVersion)


def stringToDict(inputList):
    if not inputList:
        return {}

    result = {}

    for inputSet in inputList:
        parts = inputSet.split('=', 1)
        if len(parts) != 2:
            raise CLIError('%s is not valid it needs to be in format param=value' % (inputSet))
        key = parts[0]
        value = parts[1]
        result[key] = value

    return result