1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from copy import deepcopy
from typing import Any, Awaitable, Optional, TYPE_CHECKING, cast
from typing_extensions import Self
from azure.core.pipeline import policies
from azure.core.rest import AsyncHttpResponse, HttpRequest
from azure.core.settings import settings
from azure.mgmt.core import AsyncARMPipelineClient
from azure.mgmt.core.policies import AsyncARMAutoResourceProviderRegistrationPolicy
from azure.mgmt.core.tools import get_arm_endpoints
from .._utils.serialization import Deserializer, Serializer
from ._configuration import AzureStackHCIVmClientConfiguration
from .operations import (
AttestationStatusesOperations,
GalleryImagesOperations,
GuestAgentsOperations,
HybridIdentityMetadataOperations,
LogicalNetworksOperations,
MarketplaceGalleryImagesOperations,
NetworkInterfacesOperations,
NetworkSecurityGroupsOperations,
SecurityRulesOperations,
StorageContainersOperations,
VirtualHardDisksOperations,
VirtualMachineInstancesOperations,
)
if TYPE_CHECKING:
from azure.core.credentials_async import AsyncTokenCredential
class AzureStackHCIVmClient: # pylint: disable=too-many-instance-attributes
"""Azure Stack HCI management service.
:ivar gallery_images: GalleryImagesOperations operations
:vartype gallery_images: azure.mgmt.azurestackhcivm.aio.operations.GalleryImagesOperations
:ivar logical_networks: LogicalNetworksOperations operations
:vartype logical_networks: azure.mgmt.azurestackhcivm.aio.operations.LogicalNetworksOperations
:ivar marketplace_gallery_images: MarketplaceGalleryImagesOperations operations
:vartype marketplace_gallery_images:
azure.mgmt.azurestackhcivm.aio.operations.MarketplaceGalleryImagesOperations
:ivar network_interfaces: NetworkInterfacesOperations operations
:vartype network_interfaces:
azure.mgmt.azurestackhcivm.aio.operations.NetworkInterfacesOperations
:ivar network_security_groups: NetworkSecurityGroupsOperations operations
:vartype network_security_groups:
azure.mgmt.azurestackhcivm.aio.operations.NetworkSecurityGroupsOperations
:ivar security_rules: SecurityRulesOperations operations
:vartype security_rules: azure.mgmt.azurestackhcivm.aio.operations.SecurityRulesOperations
:ivar storage_containers: StorageContainersOperations operations
:vartype storage_containers:
azure.mgmt.azurestackhcivm.aio.operations.StorageContainersOperations
:ivar virtual_hard_disks: VirtualHardDisksOperations operations
:vartype virtual_hard_disks:
azure.mgmt.azurestackhcivm.aio.operations.VirtualHardDisksOperations
:ivar virtual_machine_instances: VirtualMachineInstancesOperations operations
:vartype virtual_machine_instances:
azure.mgmt.azurestackhcivm.aio.operations.VirtualMachineInstancesOperations
:ivar hybrid_identity_metadata: HybridIdentityMetadataOperations operations
:vartype hybrid_identity_metadata:
azure.mgmt.azurestackhcivm.aio.operations.HybridIdentityMetadataOperations
:ivar attestation_statuses: AttestationStatusesOperations operations
:vartype attestation_statuses:
azure.mgmt.azurestackhcivm.aio.operations.AttestationStatusesOperations
:ivar guest_agents: GuestAgentsOperations operations
:vartype guest_agents: azure.mgmt.azurestackhcivm.aio.operations.GuestAgentsOperations
:param credential: Credential used to authenticate requests to the service. Required.
:type credential: ~azure.core.credentials_async.AsyncTokenCredential
:param subscription_id: The ID of the target subscription. The value must be an UUID. Required.
:type subscription_id: str
:param base_url: Service host. Default value is None.
:type base_url: str
:keyword api_version: The API version to use for this operation. Default value is
"2025-06-01-preview". Note that overriding this default value may result in unsupported
behavior.
:paramtype api_version: str
:keyword int polling_interval: Default waiting time between two polls for LRO operations if no
Retry-After header is present.
"""
def __init__(
self, credential: "AsyncTokenCredential", subscription_id: str, base_url: Optional[str] = None, **kwargs: Any
) -> None:
_endpoint = "{endpoint}"
_cloud = kwargs.pop("cloud_setting", None) or settings.current.azure_cloud # type: ignore
_endpoints = get_arm_endpoints(_cloud)
if not base_url:
base_url = _endpoints["resource_manager"]
credential_scopes = kwargs.pop("credential_scopes", _endpoints["credential_scopes"])
self._config = AzureStackHCIVmClientConfiguration(
credential=credential,
subscription_id=subscription_id,
base_url=cast(str, base_url),
credential_scopes=credential_scopes,
**kwargs
)
_policies = kwargs.pop("policies", None)
if _policies is None:
_policies = [
policies.RequestIdPolicy(**kwargs),
self._config.headers_policy,
self._config.user_agent_policy,
self._config.proxy_policy,
policies.ContentDecodePolicy(**kwargs),
AsyncARMAutoResourceProviderRegistrationPolicy(),
self._config.redirect_policy,
self._config.retry_policy,
self._config.authentication_policy,
self._config.custom_hook_policy,
self._config.logging_policy,
policies.DistributedTracingPolicy(**kwargs),
policies.SensitiveHeaderCleanupPolicy(**kwargs) if self._config.redirect_policy else None,
self._config.http_logging_policy,
]
self._client: AsyncARMPipelineClient = AsyncARMPipelineClient(
base_url=cast(str, _endpoint), policies=_policies, **kwargs
)
self._serialize = Serializer()
self._deserialize = Deserializer()
self._serialize.client_side_validation = False
self.gallery_images = GalleryImagesOperations(self._client, self._config, self._serialize, self._deserialize)
self.logical_networks = LogicalNetworksOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.marketplace_gallery_images = MarketplaceGalleryImagesOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.network_interfaces = NetworkInterfacesOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.network_security_groups = NetworkSecurityGroupsOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.security_rules = SecurityRulesOperations(self._client, self._config, self._serialize, self._deserialize)
self.storage_containers = StorageContainersOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.virtual_hard_disks = VirtualHardDisksOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.virtual_machine_instances = VirtualMachineInstancesOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.hybrid_identity_metadata = HybridIdentityMetadataOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.attestation_statuses = AttestationStatusesOperations(
self._client, self._config, self._serialize, self._deserialize
)
self.guest_agents = GuestAgentsOperations(self._client, self._config, self._serialize, self._deserialize)
def send_request(
self, request: HttpRequest, *, stream: bool = False, **kwargs: Any
) -> Awaitable[AsyncHttpResponse]:
"""Runs the network request through the client's chained policies.
>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest("GET", "https://www.example.org/")
<HttpRequest [GET], url: 'https://www.example.org/'>
>>> response = await client.send_request(request)
<AsyncHttpResponse: 200 OK>
For more information on this code flow, see https://aka.ms/azsdk/dpcodegen/python/send_request
:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.AsyncHttpResponse
"""
request_copy = deepcopy(request)
path_format_arguments = {
"endpoint": self._serialize.url("self._config.base_url", self._config.base_url, "str", skip_quote=True),
}
request_copy.url = self._client.format_url(request_copy.url, **path_format_arguments)
return self._client.send_request(request_copy, stream=stream, **kwargs) # type: ignore
async def close(self) -> None:
await self._client.close()
async def __aenter__(self) -> Self:
await self._client.__aenter__()
return self
async def __aexit__(self, *exc_details: Any) -> None:
await self._client.__aexit__(*exc_details)
|