File: test_pipeline_agents.py

package info (click to toggle)
azure-devops-cli-extension 1.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,384 kB
  • sloc: python: 160,782; xml: 198; makefile: 56; sh: 51
file content (132 lines) | stat: -rw-r--r-- 8,277 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import unittest

try:
    # Attempt to load mock (works on Python 3.3 and above)
    from unittest.mock import patch
except ImportError:
    # Attempt to load mock (works on Python version below 3.3)
    from mock import patch

from azext_devops.dev.common.services import clear_connection_cache
from azext_devops.tests.utils.authentication import AuthenticatedTests
from azext_devops.devops_sdk.v5_1.task_agent.task_agent_client import TaskAgentClient
from azext_devops.dev.pipelines.agent_pool_queue import list_pools, show_pool, list_agents, show_agent, list_queues, show_queue

class TestPipelinesAgentPoolQueueMethods(AuthenticatedTests):

    _TEST_DEVOPS_ORGANIZATION = 'https://someorganization.visualstudio.com'
    _TEST_DEVOPS_PROJECT = 'MyProject'

    def setUp(self):
        self.authentication_setup()
        self.authenticate()

        self.get_client_patcher = patch('azext_devops.devops_sdk.connection.Connection.get_client')
        #start the patchers
        self.mock_get_client = self.get_client_patcher.start()
        # Set return values which will be same across tests
        self.mock_get_client.return_value = TaskAgentClient(base_url=self._TEST_DEVOPS_ORGANIZATION)
        #clear connection cache before running each test
        clear_connection_cache()

    def tearDown(self):
        patch.stopall()

    def test_pool_list_with_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_pools') as mock_get_pools:
            # pool listing
            pools = list_pools(pool_name=None, pool_type=None, action=None, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_pools.assert_called_once_with(pool_name=None, properties=None, pool_type=None, action_filter=None)

    def test_pool_list_without_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_pools') as mock_get_pools:
            # pool listing
            pools = list_pools(pool_name='PoolName', pool_type='PoolType', action='ActionUse', organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_pools.assert_called_once_with(pool_name='PoolName', properties=None, pool_type='PoolType', action_filter='ActionUse')

    def test_pool_show_with_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_pool') as mock_get_pool:
            # pool show
            pool = show_pool(pool_id='123', action=None, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_pool.assert_called_once_with(pool_id='123', properties=None, action_filter=None)

    def test_pool_show_without_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_pool') as mock_get_pool:
            # pool show
            pool = show_pool(pool_id='123', action='ActionUse', organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_pool.assert_called_once_with(pool_id='123', properties=None, action_filter='ActionUse')

    def test_agent_list_with_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agents') as mock_get_agents:
            # agent listing
            agents = list_agents(pool_id='123', agent_name=None, include_capabilities=None, include_assigned_request=None,
                include_last_completed_request=None, demands=None, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_agents.assert_called_once_with(pool_id='123', agent_name=None, include_capabilities=None,
        include_last_completed_request=None, include_assigned_request=None, property_filters=None, demands=None)

    def test_agent_list_without_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agents') as mock_get_agents:
            # agent listing
            agents = list_agents(pool_id='123', agent_name='myagent', include_capabilities=True, include_assigned_request=True,
                include_last_completed_request=True, demands='demand1,demand2', organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_agents.assert_called_once_with(pool_id='123', agent_name='myagent', include_capabilities=True,
        include_last_completed_request=True, include_assigned_request=True, property_filters=None, demands=['demand1', 'demand2'])

    def test_agent_show_with_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent') as mock_get_agent:
            # agent Show
            agents = show_agent(pool_id='123', agent_id='1234', include_capabilities=None, include_assigned_request=None,
               include_last_completed_request=None, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_agent.assert_called_once_with(
                pool_id='123', agent_id='1234', include_capabilities=None,
                include_assigned_request=None, include_last_completed_request=None, property_filters=None)

    def test_agent_show_without_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent') as mock_get_agent:
            # agent Show
            agents = show_agent(pool_id='123', agent_id='1234', include_capabilities=True, include_assigned_request=True,
               include_last_completed_request=True, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_agent.assert_called_once_with(
                pool_id='123', agent_id='1234', include_capabilities=True,
                include_assigned_request=True, include_last_completed_request=True, property_filters=None)

    def test_queue_list_with_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_queues') as mock_get_queues:
            # queue listing
            queues = list_queues(queue_name=None, action=None, project=self._TEST_DEVOPS_PROJECT, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_queues.assert_called_once_with(project=self._TEST_DEVOPS_PROJECT, queue_name=None, action_filter=None)

    def test_queue_list_without_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_queues') as mock_get_queues:
            # queue listing
            queues = list_queues(queue_name='Ubuntu', action='ActionUse', project=self._TEST_DEVOPS_PROJECT, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_queues.assert_called_once_with(project=self._TEST_DEVOPS_PROJECT, queue_name='Ubuntu', action_filter='ActionUse')

    def test_queue_show_with_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_queue') as mock_get_queue:
            # queue listing
            queues = show_queue(queue_id='123', action=None, project=self._TEST_DEVOPS_PROJECT, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_queue.assert_called_once_with(project=self._TEST_DEVOPS_PROJECT, queue_id='123', action_filter=None)

    def test_queue_show_without_defaults(self):
        with patch('azext_devops.devops_sdk.v5_1.task_agent.task_agent_client.TaskAgentClient.get_agent_queue') as mock_get_queue:
            # queue listing
            queues = show_queue(queue_id='123', action='ActionUse', project=self._TEST_DEVOPS_PROJECT, organization=self._TEST_DEVOPS_ORGANIZATION)
            # assert
            mock_get_queue.assert_called_once_with(project=self._TEST_DEVOPS_PROJECT, queue_id='123', action_filter='ActionUse')