1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
from functools import partial
import json
import logging
from typing import Dict
import aiohttp
import async_timeout
_LOGGER = logging.getLogger(__name__)
async def authenticate(session: aiohttp.ClientSession, host: str) -> str:
"""Authenticate and return a token."""
with async_timeout.timeout(10):
resp = await session.request(
"get",
"http://" + host + ":8249/authenticate",
data={},
raise_for_status=True,
)
data = await resp.json()
return data["token"]
class PyMutesync:
def __init__(self, token, host, session: aiohttp.ClientSession):
self.token = token
self.host = host
self.session = session
async def request(self, method, path, data=None) -> Dict:
url = f"http://{self.host}:8249/{path}"
kwargs = {}
if data:
if method == "get":
kwargs["query"] = data
else:
kwargs["json"] = data
with async_timeout.timeout(10):
resp = await self.session.request(
method,
url,
headers={
"Authorization": f"Token {self.token}",
"x-mutesync-api-version": "1",
},
raise_for_status=True,
**kwargs,
)
response = await resp.json()
return response["data"]
async def get_state(self):
return await self.request("get", "state")
|