| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 
 | # -*- coding: utf-8 -*-
"""async_upnp_client.advertisement module."""
import asyncio
import logging
import socket
from asyncio.events import AbstractEventLoop
from asyncio.transports import BaseTransport, DatagramTransport
from typing import Any, Callable, Coroutine, Optional
from async_upnp_client.const import AddressTupleVXType, NotificationSubType, SsdpSource
from async_upnp_client.ssdp import (
    SSDP_DISCOVER,
    SsdpProtocol,
    determine_source_target,
    get_ssdp_socket,
)
from async_upnp_client.utils import CaseInsensitiveDict
_LOGGER = logging.getLogger(__name__)
class SsdpAdvertisementListener:
    """SSDP Advertisement listener."""
    # pylint: disable=too-many-instance-attributes
    def __init__(
        self,
        async_on_alive: Optional[
            Callable[[CaseInsensitiveDict], Coroutine[Any, Any, None]]
        ] = None,
        async_on_byebye: Optional[
            Callable[[CaseInsensitiveDict], Coroutine[Any, Any, None]]
        ] = None,
        async_on_update: Optional[
            Callable[[CaseInsensitiveDict], Coroutine[Any, Any, None]]
        ] = None,
        on_alive: Optional[Callable[[CaseInsensitiveDict], None]] = None,
        on_byebye: Optional[Callable[[CaseInsensitiveDict], None]] = None,
        on_update: Optional[Callable[[CaseInsensitiveDict], None]] = None,
        source: Optional[AddressTupleVXType] = None,
        target: Optional[AddressTupleVXType] = None,
        loop: Optional[AbstractEventLoop] = None,
    ) -> None:
        """Initialize."""
        # pylint: disable=too-many-arguments,too-many-positional-arguments
        assert (
            async_on_alive
            or async_on_byebye
            or async_on_update
            or on_alive
            or on_byebye
            or on_update
        ), "Provide at least one callback"
        self.async_on_alive = async_on_alive
        self.async_on_byebye = async_on_byebye
        self.async_on_update = async_on_update
        self.on_alive = on_alive
        self.on_byebye = on_byebye
        self.on_update = on_update
        self.source, self.target = determine_source_target(source, target)
        self.loop: AbstractEventLoop = loop or asyncio.get_event_loop()
        self._transport: Optional[BaseTransport] = None
    def _on_data(self, request_line: str, headers: CaseInsensitiveDict) -> None:
        """Handle data."""
        if headers.get_lower("man") == SSDP_DISCOVER:
            # Ignore discover packets.
            return
        notification_sub_type = headers.get_lower("nts")
        if notification_sub_type is None:
            _LOGGER.debug("Got non-advertisement packet: %s, %s", request_line, headers)
            return
        if _LOGGER.isEnabledFor(logging.DEBUG):
            _LOGGER.debug(
                "Received advertisement, _remote_addr: %s, NT: %s, NTS: %s, USN: %s, location: %s",
                headers.get_lower("_remote_addr", ""),
                headers.get_lower("nt", "<no NT>"),
                headers.get_lower("nts", "<no NTS>"),
                headers.get_lower("usn", "<no USN>"),
                headers.get_lower("location", ""),
            )
        headers["_source"] = SsdpSource.ADVERTISEMENT
        if notification_sub_type == NotificationSubType.SSDP_ALIVE:
            if self.async_on_alive:
                coro = self.async_on_alive(headers)
                self.loop.create_task(coro)
            if self.on_alive:
                self.on_alive(headers)
        elif notification_sub_type == NotificationSubType.SSDP_BYEBYE:
            if self.async_on_byebye:
                coro = self.async_on_byebye(headers)
                self.loop.create_task(coro)
            if self.on_byebye:
                self.on_byebye(headers)
        elif notification_sub_type == NotificationSubType.SSDP_UPDATE:
            if self.async_on_update:
                coro = self.async_on_update(headers)
                self.loop.create_task(coro)
            if self.on_update:
                self.on_update(headers)
    def _on_connect(self, transport: DatagramTransport) -> None:
        sock: Optional[socket.socket] = transport.get_extra_info("socket")
        _LOGGER.debug("On connect, transport: %s, socket: %s", transport, sock)
        self._transport = transport
    async def async_start(self) -> None:
        """Start listening for advertisements."""
        _LOGGER.debug("Start listening for advertisements")
        # Construct a socket for use with this pairs of endpoints.
        sock, _source, _target = get_ssdp_socket(self.source, self.target)
        # Bind to address.
        address = ("", self.target[1])
        _LOGGER.debug("Binding socket, socket: %s, address: %s", sock, address)
        sock.bind(address)
        # Create protocol and send discovery packet.
        await self.loop.create_datagram_endpoint(
            lambda: SsdpProtocol(
                self.loop,
                on_connect=self._on_connect,
                on_data=self._on_data,
            ),
            sock=sock,
        )
    async def async_stop(self) -> None:
        """Stop listening for advertisements."""
        _LOGGER.debug("Stop listening for advertisements")
        if self._transport:
            self._transport.close()
 |