1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
"""Module containing the connection wrapper for the AVR interface."""
import asyncio
import logging
from typing import Callable
from .protocol import AVR
__all__ = ["Connection"]
class Connection:
"""Connection handler to maintain network connection for AVR Protocol."""
def __init__(self):
"""Instantiate the Connection object."""
self.log = logging.getLogger(__name__)
self.host = ""
self.port = 0
self._loop: asyncio.AbstractEventLoop = None
self._retry_interval = 1
self._closed = False
self._closing = False
self._halted = False
self._auto_reconnect = False
self.protocol: asyncio.Protocol = None
@classmethod
async def create(
cls,
host: str = "localhost",
port: int = 14999,
auto_reconnect: bool = True,
loop: asyncio.AbstractEventLoop = None,
protocol_class: asyncio.Protocol = AVR,
update_callback: Callable[[str], None] = None,
):
"""Initiate a connection to a specific device.
Here is where we supply the host and port and callback callables we
expect for this AVR class object.
:param host:
Hostname or IP address of the device
:param port:
TCP port number of the device
:param auto_reconnect:
Should the Connection try to automatically reconnect if needed?
:param loop:
asyncio.loop for async operation
:param update_callback"
This function is called whenever AVR state data changes
:type host:
str
:type port:
int
:type auto_reconnect:
boolean
:type loop:
asyncio.loop
:type update_callback:
callable
"""
assert port >= 0, f"Invalid port value: {port}"
conn = cls()
conn.host = host
conn.port = port
conn._loop = loop or asyncio.get_event_loop()
conn._retry_interval = 1
conn._closed = False
conn._closing = False
conn._halted = False
conn._auto_reconnect = auto_reconnect
async def connection_lost():
"""Function callback for Protocoal class when connection is lost."""
if conn._auto_reconnect and not conn._closing:
await conn.reconnect()
conn.protocol = protocol_class(
connection_lost_callback=connection_lost,
loop=conn._loop,
update_callback=update_callback,
)
if auto_reconnect:
await conn.reconnect()
return conn
@property
def transport(self):
"""Return pointer to the transport object.
Use this property to obtain passthrough access to the underlying
Protocol properties and methods.
"""
return self.protocol.transport
def _get_retry_interval(self):
return self._retry_interval
def _reset_retry_interval(self):
self._retry_interval = 1
def _increase_retry_interval(self):
self._retry_interval = min(300, 1.5 * self._retry_interval)
async def reconnect(self):
"""Connect to the host and keep the connection open."""
while True:
try:
if self._halted:
await asyncio.sleep(2)
else:
self.log.debug(
"Connecting to Anthem AVR at %s:%d", self.host, self.port
)
await self._loop.create_connection(
lambda: self.protocol, self.host, self.port
)
self._reset_retry_interval()
return
except OSError:
self._increase_retry_interval()
interval = self._get_retry_interval()
self.log.warning("Connecting failed, retrying in %i seconds", interval)
if not self._auto_reconnect or self._closing:
raise
await asyncio.sleep(interval)
if not self._auto_reconnect or self._closing:
break
def close(self):
"""Close the AVR device connection and don't try to reconnect."""
self.log.debug("Closing connection to AVR")
self._closing = True
if self.protocol.transport:
self.protocol.transport.close()
def halt(self):
"""Close the AVR device connection and wait for a resume() request."""
self.log.warning("Halting connection to AVR")
self._halted = True
if self.protocol.transport:
self.protocol.transport.close()
def resume(self):
"""Resume the AVR device connection if we have been halted."""
self.log.warning("Resuming connection to AVR")
self._halted = False
@property
def dump_conndata(self):
"""Developer tool for debugging forensics."""
attrs = vars(self)
return ", ".join("%s: %s" % item for item in attrs.items())
|