# This file is part of beets.
# Copyright 2019, Rahul Ahuja.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.

"""Adds Deezer release and track search support to the autotagger"""

from __future__ import annotations

import collections
import time
from typing import TYPE_CHECKING, Literal, Sequence

import requests

from beets import ui
from beets.autotag import AlbumInfo, TrackInfo
from beets.dbcore import types
from beets.metadata_plugins import (
    IDResponse,
    SearchApiMetadataSourcePlugin,
    SearchFilter,
)

if TYPE_CHECKING:
    from beets.library import Item, Library

    from ._typing import JSONDict


class DeezerPlugin(SearchApiMetadataSourcePlugin[IDResponse]):
    item_types = {
        "deezer_track_rank": types.INTEGER,
        "deezer_track_id": types.INTEGER,
        "deezer_updated": types.DATE,
    }
    # Base URLs for the Deezer API
    # Documentation: https://developers.deezer.com/api/
    search_url = "https://api.deezer.com/search/"
    album_url = "https://api.deezer.com/album/"
    track_url = "https://api.deezer.com/track/"

    def __init__(self) -> None:
        super().__init__()

    def commands(self):
        """Add beet UI commands to interact with Deezer."""
        deezer_update_cmd = ui.Subcommand(
            "deezerupdate", help=f"Update {self.data_source} rank"
        )

        def func(lib: Library, opts, args):
            items = lib.items(args)
            self.deezerupdate(list(items), ui.should_write())

        deezer_update_cmd.func = func

        return [deezer_update_cmd]

    def album_for_id(self, album_id: str) -> AlbumInfo | None:
        """Fetch an album by its Deezer ID or URL."""
        if not (deezer_id := self._extract_id(album_id)):
            return None

        album_url = f"{self.album_url}{deezer_id}"
        if not (album_data := self.fetch_data(album_url)):
            return None

        contributors = album_data.get("contributors")
        if contributors is not None:
            artist, artist_id = self.get_artist(contributors)
        else:
            artist, artist_id = None, None

        release_date = album_data["release_date"]
        date_parts = [int(part) for part in release_date.split("-")]
        num_date_parts = len(date_parts)

        if num_date_parts == 3:
            year, month, day = date_parts
        elif num_date_parts == 2:
            year, month = date_parts
            day = None
        elif num_date_parts == 1:
            year = date_parts[0]
            month = None
            day = None
        else:
            raise ui.UserError(
                f"Invalid `release_date` returned by {self.data_source} API: "
                f"{release_date!r}"
            )
        tracks_obj = self.fetch_data(f"{self.album_url}{deezer_id}/tracks")
        if tracks_obj is None:
            return None
        try:
            tracks_data = tracks_obj["data"]
        except KeyError:
            self._log.debug("Error fetching album tracks for {}", deezer_id)
            tracks_data = None
        if not tracks_data:
            return None
        while "next" in tracks_obj:
            tracks_obj = requests.get(
                tracks_obj["next"],
                timeout=10,
            ).json()
            tracks_data.extend(tracks_obj["data"])

        tracks = []
        medium_totals: dict[int | None, int] = collections.defaultdict(int)
        for i, track_data in enumerate(tracks_data, start=1):
            track = self._get_track(track_data)
            track.index = i
            medium_totals[track.medium] += 1
            tracks.append(track)
        for track in tracks:
            track.medium_total = medium_totals[track.medium]

        return AlbumInfo(
            album=album_data["title"],
            album_id=deezer_id,
            deezer_album_id=deezer_id,
            artist=artist,
            artist_credit=self.get_artist([album_data["artist"]])[0],
            artist_id=artist_id,
            tracks=tracks,
            albumtype=album_data["record_type"],
            va=(
                len(album_data["contributors"]) == 1
                and (artist or "").lower() == "various artists"
            ),
            year=year,
            month=month,
            day=day,
            label=album_data["label"],
            mediums=max(filter(None, medium_totals.keys())),
            data_source=self.data_source,
            data_url=album_data["link"],
            cover_art_url=album_data.get("cover_xl"),
        )

    def track_for_id(self, track_id: str) -> None | TrackInfo:
        """Fetch a track by its Deezer ID or URL and return a
        TrackInfo object or None if the track is not found.

        :param track_id: (Optional) Deezer ID or URL for the track. Either
            ``track_id`` or ``track_data`` must be provided.

        """
        if not (deezer_id := self._extract_id(track_id)):
            self._log.debug("Invalid Deezer track_id: {}", track_id)
            return None

        if not (track_data := self.fetch_data(f"{self.track_url}{deezer_id}")):
            self._log.debug("Track not found: {}", track_id)
            return None

        track = self._get_track(track_data)

        # Get album's tracks to set `track.index` (position on the entire
        # release) and `track.medium_total` (total number of tracks on
        # the track's disc).
        if not (
            album_tracks_obj := self.fetch_data(
                f"{self.album_url}{track_data['album']['id']}/tracks"
            )
        ):
            return None

        try:
            album_tracks_data = album_tracks_obj["data"]
        except KeyError:
            self._log.debug(
                "Error fetching album tracks for {}", track_data["album"]["id"]
            )
            return None
        medium_total = 0
        for i, track_data in enumerate(album_tracks_data, start=1):
            if track_data["disk_number"] == track.medium:
                medium_total += 1
                if track_data["id"] == track.track_id:
                    track.index = i
        track.medium_total = medium_total
        return track

    def _get_track(self, track_data: JSONDict) -> TrackInfo:
        """Convert a Deezer track object dict to a TrackInfo object.

        :param track_data: Deezer Track object dict
        """
        artist, artist_id = self.get_artist(
            track_data.get("contributors", [track_data["artist"]])
        )
        return TrackInfo(
            title=track_data["title"],
            track_id=track_data["id"],
            deezer_track_id=track_data["id"],
            isrc=track_data.get("isrc"),
            artist=artist,
            artist_id=artist_id,
            length=track_data["duration"],
            index=track_data.get("track_position"),
            medium=track_data.get("disk_number"),
            deezer_track_rank=track_data.get("rank"),
            medium_index=track_data.get("track_position"),
            data_source=self.data_source,
            data_url=track_data["link"],
            deezer_updated=time.time(),
        )

    def _search_api(
        self,
        query_type: Literal[
            "album",
            "track",
            "artist",
            "history",
            "playlist",
            "podcast",
            "radio",
            "user",
        ],
        filters: SearchFilter,
        query_string: str = "",
    ) -> Sequence[IDResponse]:
        """Query the Deezer Search API for the specified ``query_string``, applying
        the provided ``filters``.

        :param filters: Field filters to apply.
        :param query_string: Additional query to include in the search.
        :return: JSON data for the class:`Response <Response>` object or None
            if no search results are returned.
        """
        query = self._construct_search_query(
            query_string=query_string, filters=filters
        )
        self._log.debug("Searching {.data_source} for '{}'", self, query)
        try:
            response = requests.get(
                f"{self.search_url}{query_type}",
                params={
                    "q": query,
                    "limit": self.config["search_limit"].get(),
                },
                timeout=10,
            )
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            self._log.error(
                "Error fetching data from {.data_source} API\n Error: {}",
                self,
                e,
            )
            return ()
        response_data: Sequence[IDResponse] = response.json().get("data", [])
        self._log.debug(
            "Found {} result(s) from {.data_source} for '{}'",
            len(response_data),
            self,
            query,
        )
        return response_data

    def deezerupdate(self, items: Sequence[Item], write: bool):
        """Obtain rank information from Deezer."""
        for index, item in enumerate(items, start=1):
            self._log.info(
                "Processing {}/{} tracks - {} ", index, len(items), item
            )
            try:
                deezer_track_id = item.deezer_track_id
            except AttributeError:
                self._log.debug("No deezer_track_id present for: {}", item)
                continue
            try:
                rank = self.fetch_data(
                    f"{self.track_url}{deezer_track_id}"
                ).get("rank")
                self._log.debug(
                    "Deezer track: {} has {} rank", deezer_track_id, rank
                )
            except Exception as e:
                self._log.debug("Invalid Deezer track_id: {}", e)
                continue
            item.deezer_track_rank = int(rank)
            item.store()
            item.deezer_updated = time.time()
            if write:
                item.try_write()

    def fetch_data(self, url: str):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            data = response.json()
        except requests.exceptions.RequestException as e:
            self._log.error("Error fetching data from {}\n Error: {}", url, e)
            return None
        if "error" in data:
            self._log.debug("Deezer API error: {}", data["error"]["message"])
            return None
        return data
