1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
import asyncio
import ssl
import aiohttp
from . import CloudStack, CloudStackApiException, CloudStackException
from .client import PENDING, SUCCESS, transform
class AIOCloudStack(CloudStack):
def __getattr__(self, command):
def handler(**kwargs):
return self._request(command, **kwargs)
return handler
async def _request(
self,
command,
json=True,
opcode_name="command",
fetch_list=False,
headers=None,
**params
):
fetch_result = params.pop("fetch_result", self.fetch_result)
kwarg, kwargs = self._prepare_request(
command, json, opcode_name, fetch_list, **params
)
ssl_context = None
if self.cert:
ssl_context = ssl.create_default_context(cafile=self.cert)
connector = aiohttp.TCPConnector(
verify_ssl=self.verify, ssl_context=ssl_context
)
async with aiohttp.ClientSession(
read_timeout=self.timeout,
conn_timeout=self.timeout,
connector=connector,
) as session:
handler = getattr(session, self.method)
done = False
final_data = []
page = 1
while not done:
if fetch_list:
kwargs["page"] = page
transform(kwargs)
kwargs.pop("signature", None)
self._sign(kwargs)
response = await handler(
self.endpoint, headers=headers, **{kwarg: kwargs}
)
ctype = response.headers["content-type"].split(";")[0]
try:
data = await response.json(content_type=ctype)
except ValueError as e:
msg = "Make sure endpoint URL {!r} is correct.".format(
self.endpoint
)
raise CloudStackException(
"HTTP {0} response from CloudStack".format(
response.status
),
"{}. {}".format(e, msg),
response=response,
)
[key] = data.keys()
data = data[key]
if response.status != 200:
raise CloudStackApiException(
"HTTP {0} response from CloudStack".format(
response.status
),
error=data,
response=response,
)
if fetch_list:
try:
[key] = [k for k in data.keys() if k != "count"]
except ValueError:
done = True
else:
final_data.extend(data[key])
page += 1
elif fetch_result and "jobid" in data:
try:
final_data = await asyncio.wait_for(
self._jobresult(data["jobid"], response),
self.job_timeout,
)
except asyncio.TimeoutError:
raise CloudStackException(
"Timeout waiting for async job result",
data["jobid"],
response=response,
)
done = True
else:
final_data = data
done = True
return final_data
async def _jobresult(self, jobid, response):
failures = 0
while True:
try:
j = await self.queryAsyncJobResult(
jobid=jobid, fetch_result=False
)
failures = 0
if j["jobstatus"] != PENDING:
if j["jobresultcode"] != 0 or j["jobstatus"] != SUCCESS:
raise CloudStackApiException(
"Job failure",
j,
error=j["jobresult"],
response=response,
)
if "jobresult" not in j:
raise CloudStackException(
"Unknown job result", j, response=response
)
return j["jobresult"]
except CloudStackException:
raise
except Exception:
failures += 1
if failures > 10:
raise
await asyncio.sleep(self.poll_interval)
|