1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
# Copyright 2022 Scott K Logan
# Licensed under the Apache License, Version 2.0
import asyncio
from asyncio.events import AbstractEventLoop
from asyncio.events import get_event_loop
from asyncio.tasks import Task
from asyncio.transports import WriteTransport
from contextlib import AbstractAsyncContextManager
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple
import xml.etree.ElementTree as Et
from aioraven.data import RAVEnData
from aioraven.device import RAVEnBaseDevice
from aioraven.device import RAVEnConnectionError
from aioraven.device import RAVEnNotOpenError
from aioraven.protocols import RAVEnReaderProtocol
from aioraven.reader import RAVEnReader
class RAVEnWriter:
"""Write commands to a RAVEn device."""
def __init__(
self,
transport: WriteTransport,
protocol: RAVEnReaderProtocol
) -> None:
"""
Construct a RAVEnWriter.
:param transport: The transport instance to wrap.
:param protocol: The reader protocol for the connection.
"""
self._transport = transport
self._protocol = protocol
def __repr__(self) -> str:
info = [self.__class__.__name__]
info.append('transport=%r' % self._transport)
return '<%s>' % ' '.join(info)
def write_cmd(
self,
cmd_name: str,
args: Optional[Dict[str, str]] = None,
) -> None:
element_cmd = Et.Element('Command')
element_name = Et.SubElement(element_cmd, 'Name')
element_name.text = cmd_name
for k, v in (args or {}).items():
element_arg = Et.SubElement(element_cmd, k)
element_arg.text = v
tree = Et.ElementTree(element_cmd)
tree.write(self._transport, encoding='ASCII', xml_declaration=False)
def abort(self) -> None:
self._transport.abort()
def close(self) -> None:
self._transport.close()
async def wait_closed(self) -> None:
await self._protocol._get_close_waiter(self)
async def open_connection(
host: str,
port: int,
*,
loop: Optional[AbstractEventLoop] = None,
) -> Tuple[RAVEnReader, RAVEnWriter]:
"""
Establish a network connection to a RAVEn device.
Additional optional keyword arguments are passed to
`AbstractEventLoop.create_connection()`.
:param host: The hostname or IP address to connect to.
:param port: The TCP port number to connect to.
:param loop: The event loop instance to use.
"""
if loop is None:
loop = get_event_loop()
reader = RAVEnReader(loop=loop)
protocol = RAVEnReaderProtocol(reader, loop=loop)
try:
transport, _ = await loop.create_connection(
lambda: protocol, host=host, port=port)
except OSError as ex:
raise RAVEnConnectionError(f'{ex}') from ex
writer = RAVEnWriter(transport, protocol)
return reader, writer
class RAVEnStreamDevice(
RAVEnBaseDevice,
AbstractAsyncContextManager['RAVEnStreamDevice']
):
"""Read and write coordination for stream-based RAVEn devices."""
_reader: Optional[RAVEnReader] = None
_writer: Optional[RAVEnWriter] = None
async def open(self) -> None:
raise NotImplementedError()
async def abort(self) -> None:
if self._writer:
self._writer.abort()
try:
await self._writer.wait_closed()
except IOError:
pass
self._reader = None
self._writer = None
async def close(self) -> None:
if self._writer:
self._writer.close()
try:
await self._writer.wait_closed()
except IOError:
pass
self._reader = None
self._writer = None
async def synchronize(
self, *, retries: int = 2, timeout: float = 1.0,
) -> None:
await asyncio.sleep(0.05)
for _try in range(retries, -1, -1):
try:
# Try a few times to communicate with the device,
# allowing any data already in the buffer to flush.
await asyncio.wait_for(self.get_meter_list(), timeout)
except (RAVEnConnectionError, asyncio.TimeoutError):
if not _try:
raise
else:
break
async def _query(
self,
cmd_name: str,
res_name: Optional[str] = None,
args: Optional[Dict[str, str]] = None,
) -> Optional[RAVEnData]:
if not self._reader or not self._writer:
raise RAVEnNotOpenError()
waiter: Optional[Task[Optional[RAVEnData]]] = None
if res_name:
waiter = asyncio.create_task(self._reader.read_tag(res_name))
await asyncio.sleep(0)
try:
self._writer.write_cmd(cmd_name, args)
return await waiter if waiter is not None else None
except (Et.ParseError, IOError) as ex:
raise RAVEnConnectionError(f'{ex}') from ex
async def __aenter__(self) -> 'RAVEnStreamDevice':
await self.open()
return self
async def __aexit__(
self,
exc_type: Any,
exc_value: Any,
traceback: Any
) -> None:
await self.close()
class RAVEnNetworkDevice(RAVEnStreamDevice):
"""A network-connected RAVEn device."""
def __init__(
self,
host: str,
port: int,
*,
loop: Optional[AbstractEventLoop] = None,
) -> None:
"""
Construct a RAVEnNetworkDevice.
Additional optional keyword arguments are passed to
`AbstractEventLoop.create_connection()`.
:param host: The hostname or IP address to connect to.
:param port: The TCP port number to connect to.
:param loop: The event loop instance to use.
"""
self._host = host
self._port = port
self._loop = loop
def __repr__(self) -> str:
info = [self.__class__.__name__]
info.append('host=%s' % self._host)
info.append('port=%s' % self._port)
return '<%s>' % ' '.join(info)
async def open(self) -> None:
"""Open the connection to the RAVEn device."""
if self._reader or self._writer:
return
self._reader, self._writer = await open_connection(
host=self._host, port=self._port, loop=self._loop)
|