1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
# -*- coding: utf-8 -*-
"""async_upnp_client.advertisement module."""
import asyncio
import logging
import socket
from asyncio.events import AbstractEventLoop
from asyncio.transports import BaseTransport, DatagramTransport
from typing import Any, Callable, Coroutine, Optional
from async_upnp_client.const import AddressTupleVXType, NotificationSubType, SsdpSource
from async_upnp_client.ssdp import (
SSDP_DISCOVER,
SsdpProtocol,
determine_source_target,
get_ssdp_socket,
)
from async_upnp_client.utils import CaseInsensitiveDict
_LOGGER = logging.getLogger(__name__)
class SsdpAdvertisementListener:
"""SSDP Advertisement listener."""
# pylint: disable=too-many-instance-attributes
def __init__(
self,
async_on_alive: Optional[
Callable[[CaseInsensitiveDict], Coroutine[Any, Any, None]]
] = None,
async_on_byebye: Optional[
Callable[[CaseInsensitiveDict], Coroutine[Any, Any, None]]
] = None,
async_on_update: Optional[
Callable[[CaseInsensitiveDict], Coroutine[Any, Any, None]]
] = None,
on_alive: Optional[Callable[[CaseInsensitiveDict], None]] = None,
on_byebye: Optional[Callable[[CaseInsensitiveDict], None]] = None,
on_update: Optional[Callable[[CaseInsensitiveDict], None]] = None,
source: Optional[AddressTupleVXType] = None,
target: Optional[AddressTupleVXType] = None,
loop: Optional[AbstractEventLoop] = None,
) -> None:
"""Initialize."""
# pylint: disable=too-many-arguments,too-many-positional-arguments
assert (
async_on_alive
or async_on_byebye
or async_on_update
or on_alive
or on_byebye
or on_update
), "Provide at least one callback"
self.async_on_alive = async_on_alive
self.async_on_byebye = async_on_byebye
self.async_on_update = async_on_update
self.on_alive = on_alive
self.on_byebye = on_byebye
self.on_update = on_update
self.source, self.target = determine_source_target(source, target)
self.loop: AbstractEventLoop = loop or asyncio.get_event_loop()
self._transport: Optional[BaseTransport] = None
def _on_data(self, request_line: str, headers: CaseInsensitiveDict) -> None:
"""Handle data."""
if headers.get_lower("man") == SSDP_DISCOVER:
# Ignore discover packets.
return
notification_sub_type = headers.get_lower("nts")
if notification_sub_type is None:
_LOGGER.debug("Got non-advertisement packet: %s, %s", request_line, headers)
return
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"Received advertisement, _remote_addr: %s, NT: %s, NTS: %s, USN: %s, location: %s",
headers.get_lower("_remote_addr", ""),
headers.get_lower("nt", "<no NT>"),
headers.get_lower("nts", "<no NTS>"),
headers.get_lower("usn", "<no USN>"),
headers.get_lower("location", ""),
)
headers["_source"] = SsdpSource.ADVERTISEMENT
if notification_sub_type == NotificationSubType.SSDP_ALIVE:
if self.async_on_alive:
coro = self.async_on_alive(headers)
self.loop.create_task(coro)
if self.on_alive:
self.on_alive(headers)
elif notification_sub_type == NotificationSubType.SSDP_BYEBYE:
if self.async_on_byebye:
coro = self.async_on_byebye(headers)
self.loop.create_task(coro)
if self.on_byebye:
self.on_byebye(headers)
elif notification_sub_type == NotificationSubType.SSDP_UPDATE:
if self.async_on_update:
coro = self.async_on_update(headers)
self.loop.create_task(coro)
if self.on_update:
self.on_update(headers)
def _on_connect(self, transport: DatagramTransport) -> None:
sock: Optional[socket.socket] = transport.get_extra_info("socket")
_LOGGER.debug("On connect, transport: %s, socket: %s", transport, sock)
self._transport = transport
async def async_start(self) -> None:
"""Start listening for advertisements."""
_LOGGER.debug("Start listening for advertisements")
# Construct a socket for use with this pairs of endpoints.
sock, _source, _target = get_ssdp_socket(self.source, self.target)
# Bind to address.
address = ("", self.target[1])
_LOGGER.debug("Binding socket, socket: %s, address: %s", sock, address)
sock.bind(address)
# Create protocol and send discovery packet.
await self.loop.create_datagram_endpoint(
lambda: SsdpProtocol(
self.loop,
on_connect=self._on_connect,
on_data=self._on_data,
),
sock=sock,
)
async def async_stop(self) -> None:
"""Stop listening for advertisements."""
_LOGGER.debug("Stop listening for advertisements")
if self._transport:
self._transport.close()
|