# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Generic & dependency free Bumble (reference) device."""

from __future__ import annotations

from typing import Any

from bumble import transport
from bumble.core import (
    BT_GENERIC_AUDIO_SERVICE,
    BT_HANDSFREE_SERVICE,
    BT_L2CAP_PROTOCOL_ID,
    BT_RFCOMM_PROTOCOL_ID,
)
from bumble.device import Device, DeviceConfiguration
from bumble.host import Host
from bumble.sdp import (
    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
    SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
    DataElement,
    ServiceAttribute,
)

# Default rootcanal HCI TCP address
ROOTCANAL_HCI_ADDRESS = "localhost:6402"


class PandoraDevice:
    """
    Small wrapper around a Bumble device and it's HCI transport.
    Notes:
      - The Bumble device is idle by default.
      - Repetitive calls to `open`/`close` will result on new Bumble device instances.
    """

    # Bumble device instance & configuration.
    device: Device
    config: dict[str, Any]

    # HCI transport name & instance.
    _hci_name: str
    _hci: transport.Transport | None  # type: ignore[name-defined]

    def __init__(self, config: dict[str, Any]) -> None:
        self.config = config
        self.device = _make_device(config)
        self._hci_name = config.get(
            'transport', f"tcp-client:{config.get('tcp', ROOTCANAL_HCI_ADDRESS)}"
        )
        self._hci = None

    @property
    def idle(self) -> bool:
        return self._hci is None

    async def open(self) -> None:
        if self._hci is not None:
            return

        # open HCI transport & set device host.
        self._hci = await transport.open_transport(self._hci_name)
        self.device.host = Host(
            controller_source=self._hci.source, controller_sink=self._hci.sink
        )  # type: ignore[no-untyped-call]

        # power-on.
        await self.device.power_on()

    async def close(self) -> None:
        if self._hci is None:
            return

        # flush & re-initialize device.
        await self.device.host.flush()
        self.device.host = None  # type: ignore[assignment]
        self.device = _make_device(self.config)

        # close HCI transport.
        await self._hci.close()
        self._hci = None

    async def reset(self) -> None:
        await self.close()
        await self.open()

    def info(self) -> dict[str, str] | None:
        return {
            'public_bd_address': str(self.device.public_address),
            'random_address': str(self.device.random_address),
        }


def _make_device(config: dict[str, Any]) -> Device:
    """Initialize an idle Bumble device instance."""

    # initialize bumble device.
    device_config = DeviceConfiguration()
    device_config.load_from_dict(config)
    device = Device(config=device_config, host=None)

    # Add fake a2dp service to avoid Android disconnect
    device.sdp_service_records = _make_sdp_records(1)

    return device


# TODO(b/267540823): remove when Pandora A2dp is supported
def _make_sdp_records(rfcomm_channel: int) -> dict[int, list[ServiceAttribute]]:
    return {
        0x00010001: [
            ServiceAttribute(
                SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
                DataElement.unsigned_integer_32(0x00010001),
            ),
            ServiceAttribute(
                SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
                DataElement.sequence(
                    [
                        DataElement.uuid(BT_HANDSFREE_SERVICE),
                        DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
                    ]
                ),
            ),
            ServiceAttribute(
                SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
                DataElement.sequence(
                    [
                        DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
                        DataElement.sequence(
                            [
                                DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
                                DataElement.unsigned_integer_8(rfcomm_channel),
                            ]
                        ),
                    ]
                ),
            ),
            ServiceAttribute(
                SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
                DataElement.sequence(
                    [
                        DataElement.sequence(
                            [
                                DataElement.uuid(BT_HANDSFREE_SERVICE),
                                DataElement.unsigned_integer_16(0x0105),
                            ]
                        )
                    ]
                ),
            ),
        ]
    }
