# -*- coding: utf-8 -*-
"""async_upnp_client.client module."""

from __future__ import annotations

import io
import logging
import urllib.parse
from abc import ABC
from datetime import datetime, timezone
from types import TracebackType
from typing import Any, Callable, Generic, Mapping, Sequence, Type, TypeVar
from xml.etree import ElementTree as ET
from xml.parsers import expat
from xml.sax.saxutils import escape

import defusedxml.ElementTree as DET
import voluptuous as vol

from async_upnp_client.const import (
    NS,
    ActionArgumentInfo,
    ActionInfo,
    DeviceIcon,
    DeviceInfo,
    HttpRequest,
    HttpResponse,
    ServiceInfo,
    StateVariableInfo,
)
from async_upnp_client.exceptions import (
    UpnpActionError,
    UpnpActionResponseError,
    UpnpError,
    UpnpResponseError,
    UpnpValueError,
    UpnpXmlParseError,
)
from async_upnp_client.utils import CaseInsensitiveDict

# pylint: disable=too-many-lines


_LOGGER = logging.getLogger(__name__)


EventCallbackType = Callable[["UpnpService", Sequence["UpnpStateVariable"]], None]


def default_on_pre_receive_device_spec(request: HttpRequest) -> HttpRequest:
    """Pre-receive device specification hook."""
    # pylint: disable=unused-argument
    return request


def default_on_post_receive_device_spec(response: HttpResponse) -> HttpResponse:
    """Post-receive device specification hook."""
    # pylint: disable=unused-argument
    fixed_body = (response.body or "").rstrip(" \t\r\n\0")
    return HttpResponse(response.status_code, response.headers, fixed_body)


def default_on_pre_receive_service_spec(request: HttpRequest) -> HttpRequest:
    """Pre-receive service specification hook."""
    # pylint: disable=unused-argument
    return request


def default_on_post_receive_service_spec(response: HttpResponse) -> HttpResponse:
    """Post-receive service specification hook."""
    # pylint: disable=unused-argument
    fixed_body = (response.body or "").rstrip(" \t\r\n\0")
    return HttpResponse(response.status_code, response.headers, fixed_body)


def default_on_pre_call_action(
    action: "UpnpAction", args: Mapping[str, Any], request: HttpRequest
) -> HttpRequest:
    """Pre-action call hook."""
    # pylint: disable=unused-argument
    return request


def default_on_post_call_action(
    action: "UpnpAction", response: HttpResponse
) -> HttpResponse:
    """Post-action call hook."""
    # pylint: disable=unused-argument
    fixed_body = (response.body or "").rstrip(" \t\r\n\0")
    return HttpResponse(response.status_code, response.headers, fixed_body)


class DisableXmlNamespaces:
    """Context manager to disable XML namespace handling."""

    def __enter__(self) -> None:
        """Enter context manager."""
        # pylint: disable=attribute-defined-outside-init
        self._old_parser_create = expat.ParserCreate

        def expat_parser_create(
            encoding: str | None = None,
            namespace_separator: str | None = None,
            intern: dict[str, Any] | None = None,
        ) -> expat.XMLParserType:
            # pylint: disable=unused-argument
            return self._old_parser_create(encoding, None, intern)

        expat.ParserCreate = expat_parser_create

    def __exit__(
        self,
        exc_type: Type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Exit context manager."""
        expat.ParserCreate = self._old_parser_create


class UpnpRequester(ABC):
    """
    Abstract base class used for performing async HTTP requests.

    Implement method async_do_http_request() in your concrete class.
    """

    # pylint: disable=too-few-public-methods

    async def async_http_request(
        self,
        http_request: HttpRequest,
    ) -> HttpResponse:
        """Do a HTTP request."""
        raise NotImplementedError()


class UpnpDevice:
    """UPnP Device representation."""

    # pylint: disable=too-many-public-methods,too-many-instance-attributes

    def __init__(
        self,
        requester: UpnpRequester,
        device_info: DeviceInfo,
        services: Sequence["UpnpService"],
        embedded_devices: Sequence["UpnpDevice"],
        on_pre_receive_device_spec: Callable[
            [HttpRequest], HttpRequest
        ] = default_on_pre_receive_device_spec,
        on_post_receive_device_spec: Callable[
            [HttpResponse], HttpResponse
        ] = default_on_post_receive_device_spec,
    ) -> None:
        """Initialize."""
        # pylint: disable=too-many-arguments,too-many-positional-arguments
        self.requester = requester
        self.device_info = device_info
        self.services = {service.service_type: service for service in services}
        self.embedded_devices = {
            embedded_device.device_type: embedded_device
            for embedded_device in embedded_devices
        }
        self.on_pre_receive_device_spec = on_pre_receive_device_spec
        self.on_post_receive_device_spec = on_post_receive_device_spec

        self._parent_device: "UpnpDevice" | None = None

        # bind services to ourselves
        for service in services:
            service.device = self

        # bind devices to ourselves
        for embedded_device in embedded_devices:
            embedded_device.parent_device = self

        # SSDP headers.
        self.ssdp_headers: CaseInsensitiveDict = CaseInsensitiveDict()

        # Just initialized, mark available.
        self.available = True

    @property
    def parent_device(self) -> "UpnpDevice" | None:
        """Get parent UpnpDevice, if any."""
        return self._parent_device

    @parent_device.setter
    def parent_device(self, parent_device: "UpnpDevice") -> None:
        """Set parent UpnpDevice."""
        if self._parent_device is not None:
            raise UpnpError("UpnpDevice already bound to UpnpDevice")

        self._parent_device = parent_device

    @property
    def root_device(self) -> "UpnpDevice":
        """Get the root device, or self if self is the root device."""
        if self._parent_device is None:
            return self

        return self._parent_device.root_device

    def find_device(self, device_type: str) -> "UpnpDevice" | None:
        """Find a (embedded) device with the given device_type."""
        if self.device_type == device_type:
            return self

        for embedded_device in self.embedded_devices.values():
            device = embedded_device.find_device(device_type)
            if device:
                return device

        return None

    def find_service(self, service_type: str) -> "UpnpService" | None:
        """Find a service with the give service_type."""
        if service_type in self.services:
            return self.services[service_type]

        for embedded_device in self.embedded_devices.values():
            service = embedded_device.find_service(service_type)
            if service:
                return service

        return None

    @property
    def all_devices(self) -> list["UpnpDevice"]:
        """Get all devices, self and embedded."""
        devices = [self]

        for embedded_device in self.embedded_devices.values():
            devices += embedded_device.all_devices

        return devices

    def get_devices_matching_udn(self, udn: str) -> list["UpnpDevice"]:
        """Get all devices matching udn."""
        devices: list["UpnpDevice"] = []

        if self.udn.lower() == udn:
            devices.append(self)

        for embedded_device in self.embedded_devices.values():
            devices += embedded_device.get_devices_matching_udn(udn)

        return devices

    @property
    def all_services(self) -> list["UpnpService"]:
        """Get all services, from self and embedded devices."""
        services: list["UpnpService"] = []

        for device in self.all_devices:
            services += device.services.values()

        return services

    def reinit(self, new_device: "UpnpDevice") -> None:
        """Reinitialize self from another device."""
        if self.device_type != new_device.device_type:
            raise UpnpError(
                f"Mismatch in device_type: {self.device_type} vs {new_device.device_type}"
            )

        self.device_info = new_device.device_info

        # reinit embedded devices
        for device_type, embedded_device in self.embedded_devices.items():
            new_embedded_device = new_device.embedded_devices[device_type]
            embedded_device.reinit(new_embedded_device)

    @property
    def name(self) -> str:
        """Get the name of this device."""
        return self.device_info.friendly_name

    @property
    def friendly_name(self) -> str:
        """Get the friendly name of this device, alias for name."""
        return self.device_info.friendly_name

    @property
    def manufacturer(self) -> str:
        """Get the manufacturer of this device."""
        return self.device_info.manufacturer

    @property
    def manufacturer_url(self) -> str | None:
        """Get the manufacturer URL of this device."""
        return self.device_info.manufacturer_url

    @property
    def model_description(self) -> str | None:
        """Get the model description of this device."""
        return self.device_info.model_description

    @property
    def model_name(self) -> str:
        """Get the model name of this device."""
        return self.device_info.model_name

    @property
    def model_number(self) -> str | None:
        """Get the model number of this device."""
        return self.device_info.model_number

    @property
    def model_url(self) -> str | None:
        """Get the model URL of this device."""
        return self.device_info.model_url

    @property
    def serial_number(self) -> str | None:
        """Get the serial number of this device."""
        return self.device_info.serial_number

    @property
    def udn(self) -> str:
        """Get UDN of this device."""
        return self.device_info.udn

    @property
    def upc(self) -> str | None:
        """Get UPC of this device."""
        return self.device_info.upc

    @property
    def presentation_url(self) -> str | None:
        """Get presentationURL of this device."""
        return self.device_info.presentation_url

    @property
    def device_url(self) -> str:
        """Get the URL of this device."""
        return self.device_info.url

    @property
    def device_type(self) -> str:
        """Get the device type of this device."""
        return self.device_info.device_type

    @property
    def icons(self) -> Sequence[DeviceIcon]:
        """Get the icons for this device."""
        return self.device_info.icons

    @property
    def xml(self) -> ET.Element:
        """Get the XML description for this device."""
        return self.device_info.xml

    def has_service(self, service_type: str) -> bool:
        """Check if service by service_type is available."""
        return service_type in self.services

    def service(self, service_type: str) -> "UpnpService":
        """Get service by service_type."""
        return self.services[service_type]

    def service_id(self, service_id: str) -> "UpnpService" | None:
        """Get service by service_id."""
        for service in self.services.values():
            if service.service_id == service_id:
                return service
        return None

    async def async_ping(self) -> None:
        """Ping the device."""
        bare_request = HttpRequest("GET", self.device_url, {}, None)
        request = self.on_pre_receive_device_spec(bare_request)
        await self.requester.async_http_request(request)

    def __str__(self) -> str:
        """To string."""
        return f"<UpnpDevice({self.udn})>"


class UpnpService:
    """UPnP Service representation."""

    # pylint: disable=too-many-instance-attributes

    def __init__(
        self,
        requester: UpnpRequester,
        service_info: ServiceInfo,
        state_variables: Sequence["UpnpStateVariable"],
        actions: Sequence["UpnpAction"],
        on_pre_call_action: Callable[
            ["UpnpAction", Mapping[str, Any], HttpRequest], HttpRequest
        ] = default_on_pre_call_action,
        on_post_call_action: Callable[
            ["UpnpAction", HttpResponse], HttpResponse
        ] = default_on_post_call_action,
    ) -> None:
        """Initialize."""
        # pylint: disable=too-many-arguments,too-many-positional-arguments
        self.requester = requester
        self._service_info = service_info
        self.state_variables = {sv.name: sv for sv in state_variables}
        self.actions = {ac.name: ac for ac in actions}
        self.on_pre_call_action = on_pre_call_action
        self.on_post_call_action = on_post_call_action

        self.on_event: EventCallbackType | None = None
        self._device: UpnpDevice | None = None

        # bind state variables to ourselves
        for state_var in state_variables:
            state_var.service = self

        # bind actions to ourselves
        for action in actions:
            action.service = self

    @property
    def device(self) -> UpnpDevice:
        """Get parent UpnpDevice."""
        if not self._device:
            raise UpnpError("UpnpService not bound to UpnpDevice")

        return self._device

    @device.setter
    def device(self, device: UpnpDevice) -> None:
        """Set parent UpnpDevice."""
        self._device = device

    @property
    def service_type(self) -> str:
        """Get service type for this UpnpService."""
        return self._service_info.service_type

    @property
    def service_id(self) -> str:
        """Get service ID for this UpnpService."""
        return self._service_info.service_id

    @property
    def scpd_url(self) -> str:
        """Get full SCPD-url for this UpnpService."""
        url: str = urllib.parse.urljoin(
            self.device.device_url, self._service_info.scpd_url
        )
        return url

    @property
    def control_url(self) -> str:
        """Get full control-url for this UpnpService."""
        url: str = urllib.parse.urljoin(
            self.device.device_url, self._service_info.control_url
        )
        return url

    @property
    def event_sub_url(self) -> str:
        """Get full event sub-url for this UpnpService."""
        url: str = urllib.parse.urljoin(
            self.device.device_url, self._service_info.event_sub_url
        )
        return url

    @property
    def xml(self) -> ET.Element:
        """Get the XML description for this service."""
        return self._service_info.xml

    def has_state_variable(self, name: str) -> bool:
        """Check if self has state variable called name."""
        if name not in self.state_variables and "}" in name:
            # possibly messed up namespaces, try again without namespace
            name = name.split("}")[1]

        return name in self.state_variables

    def state_variable(self, name: str) -> "UpnpStateVariable":
        """Get UPnpStateVariable by name."""
        state_var = self.state_variables.get(name, None)

        # possibly messed up namespaces, try again without namespace
        if not state_var and "}" in name:
            name = name.split("}")[1]
            state_var = self.state_variables.get(name, None)

        if state_var is None:
            raise KeyError(name)

        return state_var

    def has_action(self, name: str) -> bool:
        """Check if self has action called name."""
        return name in self.actions

    def action(self, name: str) -> "UpnpAction":
        """Get UPnpAction by name."""
        return self.actions[name]

    async def async_call_action(
        self, action: "UpnpAction" | str, **kwargs: Any
    ) -> Mapping[str, Any]:
        """
        Call a UpnpAction.

        Parameters are in Python-values and coerced automatically to UPnP values.
        """
        if isinstance(action, str):
            action = self.actions[action]

        result = await action.async_call(**kwargs)
        return result

    def notify_changed_state_variables(self, changes: Mapping[str, str]) -> None:
        """Do callback on UpnpStateVariable.value changes."""
        changed_state_variables = []

        for name, value in changes.items():
            if not self.has_state_variable(name):
                _LOGGER.debug("State variable %s does not exist, ignoring", name)
                continue

            state_var = self.state_variable(name)
            try:
                state_var.upnp_value = value
                changed_state_variables.append(state_var)
            except UpnpValueError:
                _LOGGER.error("Got invalid value for %s: %s", state_var, value)

        if self.on_event:
            # pylint: disable=not-callable
            self.on_event(self, changed_state_variables)

    def __str__(self) -> str:
        """To string."""
        udn = "unbound"
        if self._device:
            udn = self._device.udn
        return f"<UpnpService({self.service_id}, {udn})>"

    def __repr__(self) -> str:
        """To repr."""
        udn = "unbound"
        if self._device:
            udn = self._device.udn
        return f"<UpnpService({self.service_id}, {udn})>"


class UpnpAction:
    """Representation of an Action."""

    class Argument:
        """Representation of an Argument of an Action."""

        def __init__(
            self, argument_info: ActionArgumentInfo, state_variable: "UpnpStateVariable"
        ) -> None:
            """Initialize."""
            self._argument_info = argument_info
            self._related_state_variable = state_variable
            self._value = None
            self.raw_upnp_value: str | None = None

        def validate_value(self, value: Any) -> None:
            """Validate value against related UpnpStateVariable."""
            self.related_state_variable.validate_value(value)

        @property
        def name(self) -> str:
            """Get the name."""
            return self._argument_info.name

        @property
        def direction(self) -> str:
            """Get the direction."""
            return self._argument_info.direction

        @property
        def related_state_variable(self) -> "UpnpStateVariable":
            """Get the related state variable."""
            return self._related_state_variable

        @property
        def xml(self) -> ET.Element:
            """Get the XML description for this device."""
            return self._argument_info.xml

        @property
        def value(self) -> Any:
            """Get Python value for this argument."""
            return self._value

        @value.setter
        def value(self, value: Any) -> None:
            """Set Python value for this argument."""
            self.validate_value(value)
            self._value = value

        @property
        def upnp_value(self) -> str:
            """Get UPnP value for this argument."""
            return self.coerce_upnp(self.value)

        @upnp_value.setter
        def upnp_value(self, upnp_value: str) -> None:
            """Set UPnP value for this argument."""
            self._value = self.coerce_python(upnp_value)

        def coerce_python(self, upnp_value: str) -> Any:
            """Coerce UPnP value to Python."""
            return self.related_state_variable.coerce_python(upnp_value)

        def coerce_upnp(self, value: Any) -> str:
            """Coerce Python value to UPnP value."""
            return self.related_state_variable.coerce_upnp(value)

        def __repr__(self) -> str:
            """To repr."""
            return f"<UpnpAction.Argument({self.name}, {self.direction})>"

    def __init__(
        self,
        action_info: ActionInfo,
        arguments: list["UpnpAction.Argument"],
        non_strict: bool = False,
    ) -> None:
        """Initialize."""
        self._action_info = action_info
        self._arguments = arguments
        self._service: UpnpService | None = None
        self._non_strict = non_strict

    @property
    def name(self) -> str:
        """Get the name."""
        return self._action_info.name

    @property
    def arguments(self) -> list["UpnpAction.Argument"]:
        """Get the arguments."""
        return self._arguments

    @property
    def xml(self) -> ET.Element:
        """Get the XML for this action."""
        return self._action_info.xml

    @property
    def service(self) -> UpnpService:
        """Get parent UpnpService."""
        if not self._service:
            raise UpnpError("UpnpAction not bound to UpnpService")

        return self._service

    @service.setter
    def service(self, service: UpnpService) -> None:
        """Set parent UpnpService."""
        self._service = service

    def __str__(self) -> str:
        """To string."""
        return f"<UpnpAction({self.name})>"

    def __repr__(self) -> str:
        """To repr."""
        return f"<UpnpAction({self.name})({self.in_arguments()}) -> {self.out_arguments()}>"

    def validate_arguments(self, **kwargs: Any) -> None:
        """
        Validate arguments against in-arguments of self.

        The python type is expected.
        """
        for arg in self.in_arguments():
            if arg.name not in kwargs:
                raise UpnpError(f"Missing argument: {arg.name}")

            value = kwargs[arg.name]
            arg.validate_value(value)

    def in_arguments(self) -> list["UpnpAction.Argument"]:
        """Get all in-arguments."""
        return [arg for arg in self.arguments if arg.direction == "in"]

    def out_arguments(self) -> list["UpnpAction.Argument"]:
        """Get all out-arguments."""
        return [arg for arg in self.arguments if arg.direction == "out"]

    def argument(
        self, name: str, direction: str | None = None
    ) -> "UpnpAction.Argument" | None:
        """Get an UpnpAction.Argument by name (and possibliy direction)."""
        for arg in self.arguments:
            if arg.name != name:
                continue
            if direction is not None and arg.direction != direction:
                continue

            return arg
        return None

    async def async_call(self, **kwargs: Any) -> Mapping[str, Any]:
        """Call an action with arguments."""
        # do request
        _LOGGER.debug("Calling action: %s, args: %s", self.name, kwargs)
        bare_request = self.create_request(**kwargs)
        request = self.service.on_pre_call_action(self, kwargs, bare_request)
        bare_response = await self.service.requester.async_http_request(request)
        response = self.service.on_post_call_action(self, bare_response)
        if not isinstance(response.body, str):
            raise UpnpError(
                f"Did not receive a body when calling action: {self.name}, args: {kwargs}"
            )

        if response.status_code != 200:
            try:
                xml = DET.fromstring(response.body)
            except ET.ParseError:
                pass
            else:
                self._parse_fault(xml, response.status_code, response.headers)

            # Couldn't parse body for fault details, raise generic response error
            _LOGGER.debug(
                "Error calling action, no information, action: %s, args: %s",
                self.name,
                kwargs,
            )
            raise UpnpResponseError(
                status=response.status_code,
                headers=response.headers,
                message=f"Error during async_call(), "
                f"action: {self.name}, "
                f"args: {kwargs}, "
                f"status: {response.status_code}, "
                f"body: {response.body}",
            )

        # parse body
        response_args = self.parse_response(self.service.service_type, response)
        _LOGGER.debug(
            "Called action: %s, args: %s, response_args: %s",
            self.name,
            kwargs,
            response_args,
        )
        return response_args

    def create_request(self, **kwargs: Any) -> HttpRequest:
        """Create HTTP request for this to-be-called UpnpAction."""
        # build URL
        control_url = self.service.control_url

        # construct SOAP body
        service_type = self.service.service_type
        soap_args = self._format_request_args(**kwargs)
        body = (
            f'<?xml version="1.0"?>'
            f'<s:Envelope s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"'
            f' xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">'
            f"<s:Body>"
            f'<u:{self.name} xmlns:u="{service_type}">'
            f"{soap_args}"
            f"</u:{self.name}>"
            f"</s:Body>"
            f"</s:Envelope>"
        )

        # construct SOAP header
        soap_action = f"{service_type}#{self.name}"
        headers = {
            "SOAPAction": f'"{soap_action}"',
            "Host": urllib.parse.urlparse(control_url).netloc,
            "Content-Type": 'text/xml; charset="utf-8"',
        }

        return HttpRequest("POST", control_url, headers, body)

    def _format_request_args(self, **kwargs: Any) -> str:
        self.validate_arguments(**kwargs)
        arg_strs = [
            f"<{arg.name}>{escape(arg.coerce_upnp(kwargs[arg.name]))}</{arg.name}>"
            for arg in self.in_arguments()
        ]
        return "\n".join(arg_strs)

    def parse_response(
        self, service_type: str, http_response: HttpResponse
    ) -> Mapping[str, Any]:
        """Parse response from called Action."""
        # pylint: disable=unused-argument
        stripped_response_body = http_response.body
        try:
            xml = DET.fromstring(stripped_response_body)
        except ET.ParseError as err:
            if self._non_strict:
                # Try again ignoring namespaces.
                try:
                    with DisableXmlNamespaces():
                        parser = DET.XMLParser()

                    source = io.StringIO(stripped_response_body)
                    it = DET.iterparse(source, parser=parser)
                    for _, el in it:
                        _, _, el.tag = el.tag.rpartition(":")  # Strip namespace.
                    it_root = it.root  # type: ET.Element
                    xml = it_root
                except ET.ParseError as err2:
                    _LOGGER.debug(
                        "Unable to parse XML: %s\nXML:\n%s", err2, http_response.body
                    )
                    raise UpnpXmlParseError(err2) from err2
            else:
                _LOGGER.debug(
                    "Unable to parse XML: %s\nXML:\n%s", err, http_response.body
                )
                raise UpnpXmlParseError(err) from err

        # Check if a SOAP fault occurred. It should have been caught earlier, by
        # the device sending an HTTP 500 status, but not all devices do.
        self._parse_fault(xml)

        try:
            return self._parse_response_args(service_type, xml)
        except AttributeError:
            _LOGGER.debug("Could not parse response: %s", http_response.body)
            raise

    def _parse_response_args(
        self, service_type: str, xml: ET.Element
    ) -> Mapping[str, Any]:
        """Parse response arguments."""
        args = {}
        query = f".//{{{service_type}}}{self.name}Response"
        response = xml.find(query, NS)

        # If no response was found, do a search ignoring namespaces when in non-strict mode.
        if self._non_strict:
            if response is None:
                query = f".//{{*}}{self.name}Response"
                response = xml.find(query, NS)

            # Perhaps namespaces were removed/ignored, try searching again.
            if response is None:
                query = ".//*Response"
                response = xml.find(query)

        if response is None:
            xml_str = ET.tostring(xml, encoding="unicode")
            raise UpnpError(f"Invalid response: {xml_str}")

        for arg_xml in response.findall("./"):
            name = arg_xml.tag
            arg = self.argument(name, "out")
            if not arg:
                if self._non_strict:
                    continue

                xml_str = ET.tostring(xml, encoding="unicode")
                raise UpnpError(
                    f"Invalid response, unknown argument: {name}, {xml_str}"
                )

            arg.raw_upnp_value = arg_xml.text
            arg.upnp_value = arg_xml.text or ""
            args[name] = arg.value

        return args

    def _parse_fault(
        self,
        xml: ET.Element,
        status_code: int | None = None,
        response_headers: Mapping | None = None,
    ) -> None:
        """Parse SOAP fault and raise appropriate exception."""
        # pylint: disable=too-many-branches
        fault = xml.find(".//soap_envelope:Body/soap_envelope:Fault", NS)
        if self._non_strict:
            if fault is None:
                fault = xml.find(".//{{*}}Body/{{*}}Fault", NS)

            if fault is None:
                fault = xml.find(".//{{*}}Body/{{*}}Fault")

        if fault is None:
            return

        error_code_str = fault.findtext(".//control:errorCode", None, NS)
        if self._non_strict:
            if not error_code_str:
                error_code_str = fault.findtext(".//{{*}}:errorCode", None, NS)

            if not error_code_str:
                error_code_str = fault.findtext(".//errorCode")

        if error_code_str:
            error_code: int | None = int(error_code_str)
        else:
            error_code = None

        error_desc = fault.findtext(".//control:errorDescription", None, NS)
        if self._non_strict:
            if not error_desc:
                error_desc = fault.findtext(".//{{*}}:errorDescription", None, NS)

            if not error_desc:
                error_desc = fault.findtext(".//errorDescription")
        _LOGGER.debug(
            "Error calling action: %s, error code: %s, error desc: %s",
            self.name,
            error_code,
            error_desc,
        )

        if status_code is not None:
            raise UpnpActionResponseError(
                error_code=error_code,
                error_desc=error_desc,
                status=status_code,
                headers=response_headers,
                message=f"Error during async_call(), "
                f"action: {self.name}, "
                f"status: {status_code}, "
                f"upnp error: {error_code} ({error_desc})",
            )

        raise UpnpActionError(
            error_code=error_code,
            error_desc=error_desc,
            message=f"Error during async_call(), "
            f"action: {self.name}, "
            f"upnp error: {error_code} ({error_desc})",
        )


T = TypeVar("T")  # pylint: disable=invalid-name

_UNDEFINED = object()


class UpnpStateVariable(Generic[T]):
    """Representation of a State Variable."""

    # pylint: disable=too-many-instance-attributes

    UPNP_VALUE_ERROR = object()

    def __init__(
        self, state_variable_info: StateVariableInfo, schema: vol.Schema
    ) -> None:
        """Initialize."""
        self._state_variable_info = state_variable_info
        self._schema = schema

        self._service: UpnpService | None = None
        self._value: Any | None = None  # None, T or UPNP_VALUE_ERROR
        self._updated_at: datetime | None = None

        # When py3.12 is the minimum version, we can switch
        # these to be @cached_property
        self._min_value: T | None = _UNDEFINED  # type: ignore[assignment]
        self._max_value: T | None = _UNDEFINED  # type: ignore[assignment]
        self._step_value: T | None = _UNDEFINED  # type: ignore[assignment]
        self._allowed_values: set[T] = _UNDEFINED  # type: ignore[assignment]
        self._normalized_allowed_values: set[str] = _UNDEFINED  # type: ignore[assignment]

    @property
    def service(self) -> UpnpService:
        """Get parent UpnpService."""
        if not self._service:
            raise UpnpError("UpnpStateVariable not bound to UpnpService")

        return self._service

    @service.setter
    def service(self, service: UpnpService) -> None:
        """Set parent UpnpService."""
        self._service = service

    @property
    def xml(self) -> ET.Element:
        """Get the XML for this State Variable."""
        return self._state_variable_info.xml

    @property
    def data_type_mapping(self) -> Mapping[str, Callable]:
        """Get the data type (coercer) for this State Variable."""
        return self._state_variable_info.type_info.data_type_mapping

    @property
    def data_type_python(self) -> Callable[[str], Any]:
        """Get the Python data type for this State Variable."""
        return self.data_type_mapping["type"]

    @property
    def min_value(self) -> T | None:
        """Min value for this UpnpStateVariable, if defined."""
        if self._min_value is _UNDEFINED:
            min_ = self._state_variable_info.type_info.allowed_value_range.get("min")
            if min_ is not None:
                self._min_value = self.coerce_python(min_)
            else:
                self._min_value = None
        return self._min_value

    @property
    def max_value(self) -> T | None:
        """Max value for this UpnpStateVariable, if defined."""
        if self._max_value is _UNDEFINED:
            max_ = self._state_variable_info.type_info.allowed_value_range.get("max")
            if max_ is not None:
                self._max_value = self.coerce_python(max_)
            else:
                self._max_value = None
        return self._max_value

    @property
    def step_value(self) -> T | None:
        """Step value for this UpnpStateVariable, if defined."""
        if self._step_value is _UNDEFINED:
            step = self._state_variable_info.type_info.allowed_value_range.get("step")
            if step is not None:
                self._step_value = self.coerce_python(step)
            else:
                self._step_value = None
        return self._step_value

    @property
    def allowed_values(self) -> set[T]:
        """Set with allowed values for this UpnpStateVariable, if defined."""
        if self._allowed_values is _UNDEFINED:
            allowed_values = self._state_variable_info.type_info.allowed_values or []
            self._allowed_values = {
                self.coerce_python(allowed_value) for allowed_value in allowed_values
            }
        return self._allowed_values

    @property
    def normalized_allowed_values(self) -> set[str]:
        """Set with normalized allowed values for this UpnpStateVariable, if defined."""
        if self._normalized_allowed_values is _UNDEFINED:
            self._normalized_allowed_values = {
                str(allowed_value).lower().strip()
                for allowed_value in self.allowed_values
            }
        return self._normalized_allowed_values

    @property
    def send_events(self) -> bool:
        """Check if this UpnpStatevariable send events."""
        return self._state_variable_info.send_events

    @property
    def name(self) -> str:
        """Name of the UpnpStatevariable."""
        return self._state_variable_info.name

    @property
    def data_type(self) -> str:
        """UPNP data type of UpnpStateVariable."""
        return self._state_variable_info.type_info.data_type

    @property
    def default_value(self) -> T | None:
        """Get default value for UpnpStateVariable, if defined."""
        type_info = self._state_variable_info.type_info
        default_value = type_info.default_value
        if default_value is not None:
            value: T = self.coerce_python(default_value)
            return value

        return None

    def validate_value(self, value: T) -> None:
        """Validate value."""
        try:
            self._schema(value)
        except vol.error.MultipleInvalid as ex:
            raise UpnpValueError(self.name, value) from ex

    @property
    def value(self) -> T | None:
        """
        Get the value, python typed.

        Invalid values are returned as None.
        """
        if self._value is UpnpStateVariable.UPNP_VALUE_ERROR:
            return None

        return self._value

    @value.setter
    def value(self, value: Any) -> None:
        """Set value, python typed."""
        self.validate_value(value)
        self._value = value
        self._updated_at = datetime.now(timezone.utc)

    @property
    def value_unchecked(self) -> T | None:
        """
        Get the value, python typed.

        If an event was received with an invalid value for this StateVariable
        (e.g., 'abc' for a 'ui4' StateVariable), then this will return
        UpnpStateVariable.UPNP_VALUE_ERROR instead of None.
        """
        return self._value

    @property
    def upnp_value(self) -> str:
        """Get the value, UPnP typed."""
        return self.coerce_upnp(self.value)

    @upnp_value.setter
    def upnp_value(self, upnp_value: str) -> None:
        """Set the value, UPnP typed."""
        try:
            self.value = self.coerce_python(upnp_value)
        except ValueError as err:
            _LOGGER.debug('Error setting upnp_value "%s", error: %s', upnp_value, err)
            self._value = UpnpStateVariable.UPNP_VALUE_ERROR

    def coerce_python(self, upnp_value: str) -> Any:
        """Coerce value from UPNP to python."""
        coercer = self.data_type_mapping["in"]
        return coercer(upnp_value)

    def coerce_upnp(self, value: Any) -> str:
        """Coerce value from python to UPNP."""
        coercer = self.data_type_mapping["out"]
        coerced_value: str = coercer(value)
        return coerced_value

    @property
    def updated_at(self) -> datetime | None:
        """
        Get timestamp at which this UpnpStateVariable was updated.

        Return time in UTC.
        """
        return self._updated_at

    def __str__(self) -> str:
        """To string."""
        return f"<UpnpStateVariable({self.name}, {self.data_type})>"

    def __repr__(self) -> str:
        """To repr."""
        return f"<UpnpStateVariable({self.name}: {self.data_type} = {self.value!r})>"
