1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import Any, AsyncIterable, Callable, Dict, Generic, Optional, TypeVar
import warnings
from azure.core.async_paging import AsyncItemPaged, AsyncList
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest
from azure.mgmt.core.exceptions import ARMErrorFormat
from ... import models as _models
T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]
class AlertsOperations:
"""AlertsOperations async operations.
You should not instantiate this class directly. Instead, you should create a Client instance that
instantiates it for you and attaches it as an attribute.
:ivar models: Alias to model classes used in this operation group.
:type models: ~azure.mgmt.databoxedge.v2020_12_01.models
:param client: Client for service requests.
:param config: Configuration of service client.
:param serializer: An object model serializer.
:param deserializer: An object model deserializer.
"""
models = _models
def __init__(self, client, config, serializer, deserializer) -> None:
self._client = client
self._serialize = serializer
self._deserialize = deserializer
self._config = config
def list_by_data_box_edge_device(
self,
device_name: str,
resource_group_name: str,
**kwargs
) -> AsyncIterable["_models.AlertList"]:
"""Gets all the alerts for a Data Box Edge/Data Box Gateway device.
:param device_name: The device name.
:type device_name: str
:param resource_group_name: The resource group name.
:type resource_group_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: An iterator like instance of either AlertList or the result of cls(response)
:rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.mgmt.databoxedge.v2020_12_01.models.AlertList]
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType["_models.AlertList"]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2020-12-01"
accept = "application/json"
def prepare_request(next_link=None):
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
if not next_link:
# Construct URL
url = self.list_by_data_box_edge_device.metadata['url'] # type: ignore
path_format_arguments = {
'deviceName': self._serialize.url("device_name", device_name, 'str'),
'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
request = self._client.get(url, query_parameters, header_parameters)
else:
url = next_link
query_parameters = {} # type: Dict[str, Any]
request = self._client.get(url, query_parameters, header_parameters)
return request
async def extract_data(pipeline_response):
deserialized = self._deserialize('AlertList', pipeline_response)
list_of_elem = deserialized.value
if cls:
list_of_elem = cls(list_of_elem)
return deserialized.next_link or None, AsyncList(list_of_elem)
async def get_next(next_link=None):
request = prepare_request(next_link)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
return pipeline_response
return AsyncItemPaged(
get_next, extract_data
)
list_by_data_box_edge_device.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataBoxEdge/dataBoxEdgeDevices/{deviceName}/alerts'} # type: ignore
async def get(
self,
device_name: str,
name: str,
resource_group_name: str,
**kwargs
) -> "_models.Alert":
"""Gets an alert by name.
Gets an alert by name.
:param device_name: The device name.
:type device_name: str
:param name: The alert name.
:type name: str
:param resource_group_name: The resource group name.
:type resource_group_name: str
:keyword callable cls: A custom type or function that will be passed the direct response
:return: Alert, or the result of cls(response)
:rtype: ~azure.mgmt.databoxedge.v2020_12_01.models.Alert
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType["_models.Alert"]
error_map = {
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2020-12-01"
accept = "application/json"
# Construct URL
url = self.get.metadata['url'] # type: ignore
path_format_arguments = {
'deviceName': self._serialize.url("device_name", device_name, 'str'),
'name': self._serialize.url("name", name, 'str'),
'subscriptionId': self._serialize.url("self._config.subscription_id", self._config.subscription_id, 'str'),
'resourceGroupName': self._serialize.url("resource_group_name", resource_group_name, 'str'),
}
url = self._client.format_url(url, **path_format_arguments)
# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')
# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response, error_format=ARMErrorFormat)
deserialized = self._deserialize('Alert', pipeline_response)
if cls:
return cls(pipeline_response, deserialized, {})
return deserialized
get.metadata = {'url': '/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataBoxEdge/dataBoxEdgeDevices/{deviceName}/alerts/{name}'} # type: ignore
|