# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.

"""Fetches album art."""

from __future__ import annotations

import os
import re
from abc import ABC, abstractmethod
from collections import OrderedDict
from contextlib import closing
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, AnyStr, ClassVar, Literal

import confuse
import requests
from mediafile import image_mime_type

from beets import config, importer, plugins, ui, util
from beets.util import bytestring_path, get_temp_filename, sorted_walk, syspath
from beets.util.artresizer import ArtResizer
from beets.util.config import sanitize_pairs

if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator, Sequence

    from beets.importer import ImportSession, ImportTask
    from beets.library import Album, Library
    from beets.logging import BeetsLogger as Logger

try:
    from bs4 import BeautifulSoup, Tag

    HAS_BEAUTIFUL_SOUP = True
except ImportError:
    HAS_BEAUTIFUL_SOUP = False


CONTENT_TYPES = {"image/jpeg": [b"jpg", b"jpeg"], "image/png": [b"png"]}
IMAGE_EXTENSIONS = [ext for exts in CONTENT_TYPES.values() for ext in exts]


class ImageAction(Enum):
    """Indicates whether an image is useable or requires post-processing."""

    BAD = 0
    EXACT = 1
    DOWNSCALE = 2
    DOWNSIZE = 3
    DEINTERLACE = 4
    REFORMAT = 5


class MetadataMatch(Enum):
    """Indicates whether a `Candidate` matches the search criteria exactly."""

    EXACT = 0
    FALLBACK = 1


SourceLocation = Literal["local", "remote"]


class Candidate:
    """Holds information about a matching artwork, deals with validation of
    dimension restrictions and resizing.
    """

    def __init__(
        self,
        log: Logger,
        source_name: str,
        path: None | bytes = None,
        url: None | str = None,
        match: None | MetadataMatch = None,
        size: None | tuple[int, int] = None,
    ):
        self._log = log
        self.path = path
        self.url = url
        self.source_name = source_name
        self._check: None | ImageAction = None
        self.match = match
        self.size = size

    def _validate(
        self,
        plugin: FetchArtPlugin,
        skip_check_for: None | list[ImageAction] = None,
    ) -> ImageAction:
        """Determine whether the candidate artwork is valid based on
        its dimensions (width and ratio).

        `skip_check_for` is a check or list of checks to skip. This is used to
        avoid redundant checks when the candidate has already been
        validated for a particular operation without changing
        plugin configuration.

        Return `ImageAction.BAD` if the file is unusable.
        Return `ImageAction.EXACT` if the file is usable as-is.
        Return `ImageAction.DOWNSCALE` if the file must be rescaled.
        Return `ImageAction.DOWNSIZE` if the file must be resized, and possibly
            also rescaled.
        Return `ImageAction.DEINTERLACE` if the file must be deinterlaced.
        Return `ImageAction.REFORMAT` if the file has to be converted.
        """
        if not self.path:
            return ImageAction.BAD

        if not (
            plugin.enforce_ratio
            or plugin.minwidth
            or plugin.maxwidth
            or plugin.max_filesize
            or plugin.deinterlace
            or plugin.cover_format
        ):
            return ImageAction.EXACT

        # get_size returns None if no local imaging backend is available
        if not self.size:
            self.size = ArtResizer.shared.get_size(self.path)
        self._log.debug("image size: {.size}", self)

        if not self.size:
            self._log.warning(
                "Could not get size of image (please see "
                "documentation for dependencies). "
                "The configuration options `minwidth`, "
                "`enforce_ratio` and `max_filesize` "
                "may be violated."
            )
            return ImageAction.EXACT

        short_edge = min(self.size)
        long_edge = max(self.size)

        # Check minimum dimension.
        if plugin.minwidth and self.size[0] < plugin.minwidth:
            self._log.debug(
                "image too small ({} < {.minwidth})", self.size[0], plugin
            )
            return ImageAction.BAD

        # Check aspect ratio.
        edge_diff = long_edge - short_edge
        if plugin.enforce_ratio:
            if plugin.margin_px:
                if edge_diff > plugin.margin_px:
                    self._log.debug(
                        "image is not close enough to being "
                        "square, ({} - {} > {.margin_px})",
                        long_edge,
                        short_edge,
                        plugin,
                    )
                    return ImageAction.BAD
            elif plugin.margin_percent:
                margin_px = plugin.margin_percent * long_edge
                if edge_diff > margin_px:
                    self._log.debug(
                        "image is not close enough to being "
                        "square, ({} - {} > {})",
                        long_edge,
                        short_edge,
                        margin_px,
                    )
                    return ImageAction.BAD
            elif edge_diff:
                # also reached for margin_px == 0 and margin_percent == 0.0
                self._log.debug(
                    "image is not square ({} != {})", self.size[0], self.size[1]
                )
                return ImageAction.BAD

        # Check maximum dimension.
        downscale = False
        if plugin.maxwidth and self.size[0] > plugin.maxwidth:
            self._log.debug(
                "image needs rescaling ({} > {.maxwidth})", self.size[0], plugin
            )
            downscale = True

        # Check filesize.
        downsize = False
        if plugin.max_filesize:
            filesize = os.stat(syspath(self.path)).st_size
            if filesize > plugin.max_filesize:
                self._log.debug(
                    "image needs resizing ({}B > {.max_filesize}B)",
                    filesize,
                    plugin,
                )
                downsize = True

        # Check image format
        reformat = False
        if plugin.cover_format:
            fmt = ArtResizer.shared.get_format(self.path)
            reformat = fmt != plugin.cover_format
            if reformat:
                self._log.debug(
                    "image needs reformatting: {} -> {.cover_format}",
                    fmt,
                    plugin,
                )

        skip_check_for = skip_check_for or []

        if downscale and (ImageAction.DOWNSCALE not in skip_check_for):
            return ImageAction.DOWNSCALE
        if reformat and (ImageAction.REFORMAT not in skip_check_for):
            return ImageAction.REFORMAT
        if plugin.deinterlace and (
            ImageAction.DEINTERLACE not in skip_check_for
        ):
            return ImageAction.DEINTERLACE
        if downsize and (ImageAction.DOWNSIZE not in skip_check_for):
            return ImageAction.DOWNSIZE
        return ImageAction.EXACT

    def validate(
        self,
        plugin: FetchArtPlugin,
        skip_check_for: None | list[ImageAction] = None,
    ) -> ImageAction:
        self._check = self._validate(plugin, skip_check_for)
        return self._check

    def resize(self, plugin: FetchArtPlugin) -> None:
        """Resize the candidate artwork according to the plugin's
        configuration until it is valid or no further resizing is
        possible.
        """
        # validate the candidate in case it hasn't been done yet
        current_check = self.validate(plugin)
        checks_performed = []

        # we don't want to resize the image if it's valid or bad
        while current_check not in [ImageAction.BAD, ImageAction.EXACT]:
            self._resize(plugin, current_check)
            checks_performed.append(current_check)
            current_check = self.validate(
                plugin, skip_check_for=checks_performed
            )

    def _resize(
        self, plugin: FetchArtPlugin, check: None | ImageAction = None
    ) -> None:
        """Resize the candidate artwork according to the plugin's
        configuration and the specified check.
        """
        # This must only be called when _validate returned something other than
        # ImageAction.Bad or ImageAction.EXACT; then path and size are known.
        assert self.path is not None
        assert self.size is not None

        if check == ImageAction.DOWNSCALE:
            self.path = ArtResizer.shared.resize(
                plugin.maxwidth,
                self.path,
                quality=plugin.quality,
                max_filesize=plugin.max_filesize,
            )
        elif check == ImageAction.DOWNSIZE:
            # dimensions are correct, so maxwidth is set to maximum dimension
            self.path = ArtResizer.shared.resize(
                max(self.size),
                self.path,
                quality=plugin.quality,
                max_filesize=plugin.max_filesize,
            )
        elif check == ImageAction.DEINTERLACE:
            self.path = ArtResizer.shared.deinterlace(self.path)
        elif check == ImageAction.REFORMAT:
            self.path = ArtResizer.shared.reformat(
                self.path,
                # TODO: fix this gnarly logic to remove the need for type ignore
                plugin.cover_format,  # type: ignore[arg-type]
                deinterlaced=plugin.deinterlace,
            )


def _logged_get(log: Logger, *args, **kwargs) -> requests.Response:
    """Like `requests.get`, but logs the effective URL to the specified
    `log` at the `DEBUG` level.

    Use the optional `message` parameter to specify what to log before
    the URL. By default, the string is "getting URL".

    Also sets the User-Agent header to indicate beets.
    """
    # Use some arguments with the `send` call but most with the
    # `Request` construction. This is a cheap, magic-filled way to
    # emulate `requests.get` or, more pertinently,
    # `requests.Session.request`.
    req_kwargs = kwargs
    send_kwargs = {}
    for arg in ("stream", "verify", "proxies", "cert", "timeout"):
        if arg in kwargs:
            send_kwargs[arg] = req_kwargs.pop(arg)
    if "timeout" not in send_kwargs:
        send_kwargs["timeout"] = 10

    # Our special logging message parameter.
    if "message" in kwargs:
        message = kwargs.pop("message")
    else:
        message = "getting URL"

    req = requests.Request("GET", *args, **req_kwargs)

    with requests.Session() as s:
        s.headers = {"User-Agent": "beets"}
        prepped = s.prepare_request(req)
        settings = s.merge_environment_settings(
            prepped.url, {}, None, None, None
        )
        send_kwargs.update(settings)
        log.debug("{}: {.url}", message, prepped)
        return s.send(prepped, **send_kwargs)


class RequestMixin:
    """Adds a Requests wrapper to the class that uses the logger, which
    must be named `self._log`.
    """

    _log: Logger

    def request(self, *args, **kwargs) -> requests.Response:
        """Like `requests.get`, but uses the logger `self._log`.

        See also `_logged_get`.
        """
        return _logged_get(self._log, *args, **kwargs)


# ART SOURCES ################################################################


class ArtSource(RequestMixin, ABC):
    # Specify whether this source fetches local or remote images
    LOC: ClassVar[SourceLocation]
    # A list of methods to match metadata, sorted by descending accuracy
    VALID_MATCHING_CRITERIA: ClassVar[list[str]] = ["default"]
    # A human-readable name for the art source
    NAME: ClassVar[str]
    # The key to select the art source in the config. This value will also be
    # stored in the database.
    ID: ClassVar[str]

    def __init__(
        self,
        log: Logger,
        config: confuse.ConfigView,
        match_by: None | list[str] = None,
    ) -> None:
        self._log = log
        self._config = config
        self.match_by = match_by or self.VALID_MATCHING_CRITERIA

    @cached_property
    def description(self) -> str:
        return f"{self.ID}[{', '.join(self.match_by)}]"

    @staticmethod
    def add_default_config(config: confuse.ConfigView) -> None:
        pass

    @classmethod
    def available(cls, log: Logger, config: confuse.ConfigView) -> bool:
        """Return whether or not all dependencies are met and the art source is
        in fact usable.
        """
        return True

    @abstractmethod
    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        pass

    def _candidate(self, **kwargs) -> Candidate:
        return Candidate(source_name=self.ID, log=self._log, **kwargs)

    @abstractmethod
    def fetch_image(self, candidate: Candidate, plugin: FetchArtPlugin) -> None:
        """Fetch the image to a temporary file if it is not already available
        as a local file.

        After calling this, `Candidate.path` is set to the image path if
        successful, or to `None` otherwise.
        """
        pass

    def cleanup(self, candidate: Candidate) -> None:
        pass


class LocalArtSource(ArtSource):
    LOC = "local"

    def fetch_image(self, candidate: Candidate, plugin: FetchArtPlugin) -> None:
        pass


class RemoteArtSource(ArtSource):
    LOC = "remote"

    def fetch_image(self, candidate: Candidate, plugin: FetchArtPlugin) -> None:
        """Downloads an image from a URL and checks whether it seems to
        actually be an image.
        """
        # This must only be called for candidates that were returned by
        # self.get, which are expected to have a url and no path (because they
        # haven't been downloaded yet).
        assert candidate.path is None
        assert candidate.url is not None

        if plugin.maxwidth:
            candidate.url = ArtResizer.shared.proxy_url(
                plugin.maxwidth, candidate.url
            )
        try:
            with closing(
                self.request(
                    candidate.url, stream=True, message="downloading image"
                )
            ) as resp:
                ct = resp.headers.get("Content-Type", None)

                # Download the image to a temporary file. As some servers
                # (notably fanart.tv) have proven to return wrong Content-Types
                # when images were uploaded with a bad file extension, do not
                # rely on it. Instead validate the type using the file magic
                # and only then determine the extension.
                data = resp.iter_content(chunk_size=1024)
                header = b""
                for chunk in data:
                    header += chunk
                    if len(header) >= 32:
                        # The imghdr module will only read 32 bytes, and our
                        # own additions in mediafile even less.
                        break
                else:
                    # server didn't return enough data, i.e. corrupt image
                    return

                real_ct = image_mime_type(header)
                if real_ct is None:
                    # detection by file magic failed, fall back to the
                    # server-supplied Content-Type
                    # Is our type detection failsafe enough to drop this?
                    real_ct = ct

                if real_ct not in CONTENT_TYPES:
                    self._log.debug(
                        "not a supported image: {}",
                        real_ct or "unknown content type",
                    )
                    return

                ext = b"." + CONTENT_TYPES[real_ct][0]

                if real_ct != ct:
                    self._log.warning(
                        "Server specified {}, but returned a "
                        "{} image. Correcting the extension "
                        "to {}",
                        ct,
                        real_ct,
                        ext,
                    )

                filename = get_temp_filename(__name__, suffix=ext.decode())
                with open(filename, "wb") as fh:
                    # write the first already loaded part of the image
                    fh.write(header)
                    # download the remaining part of the image
                    for chunk in data:
                        fh.write(chunk)
                self._log.debug(
                    "downloaded art to: {}", util.displayable_path(filename)
                )
                candidate.path = util.bytestring_path(filename)
                return

        except (OSError, requests.RequestException, TypeError) as exc:
            # Handling TypeError works around a urllib3 bug:
            # https://github.com/shazow/urllib3/issues/556
            self._log.debug("error fetching art: {}", exc)
            return

    def cleanup(self, candidate: Candidate) -> None:
        if candidate.path:
            try:
                util.remove(path=candidate.path)
            except util.FilesystemError as exc:
                self._log.debug("error cleaning up tmp art: {}", exc)


class CoverArtArchive(RemoteArtSource):
    NAME = "Cover Art Archive"
    ID = "coverart"
    VALID_MATCHING_CRITERIA: ClassVar[list[str]] = ["release", "releasegroup"]
    VALID_THUMBNAIL_SIZES: ClassVar[list[int]] = [250, 500, 1200]

    URL = "https://coverartarchive.org/release/{mbid}"
    GROUP_URL = "https://coverartarchive.org/release-group/{mbid}"

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        """Return the Cover Art Archive and Cover Art Archive release
        group URLs using album MusicBrainz release ID and release group
        ID.
        """

        def get_image_urls(
            url: str,
            preferred_width: None | str = None,
        ) -> Iterator[str]:
            try:
                response = self.request(url)
            except requests.RequestException:
                self._log.debug("{.NAME}: error receiving response", self)
                return

            try:
                data = response.json()
            except ValueError:
                self._log.debug(
                    "{.NAME}: error loading response: {.text}", self, response
                )
                return

            for item in data.get("images", []):
                try:
                    if "Front" not in item["types"]:
                        continue

                    # If there is a pre-sized thumbnail of the desired size
                    # we select it. Otherwise, we return the raw image.
                    image_url: str = item["image"]
                    if preferred_width is not None:
                        if isinstance(item.get("thumbnails"), dict):
                            image_url = item["thumbnails"].get(
                                preferred_width, image_url
                            )
                    yield image_url
                except KeyError:
                    pass

        release_url = self.URL.format(mbid=album.mb_albumid)
        release_group_url = self.GROUP_URL.format(mbid=album.mb_releasegroupid)

        # Cover Art Archive API offers pre-resized thumbnails at several sizes.
        # If the maxwidth config matches one of the already available sizes
        # fetch it directly instead of fetching the full sized image and
        # resizing it.
        preferred_width = None
        if plugin.maxwidth in self.VALID_THUMBNAIL_SIZES:
            preferred_width = str(plugin.maxwidth)

        if "release" in self.match_by and album.mb_albumid:
            for url in get_image_urls(release_url, preferred_width):
                yield self._candidate(url=url, match=MetadataMatch.EXACT)

        if "releasegroup" in self.match_by and album.mb_releasegroupid:
            for url in get_image_urls(release_group_url, preferred_width):
                yield self._candidate(url=url, match=MetadataMatch.FALLBACK)


class Amazon(RemoteArtSource):
    NAME = "Amazon"
    ID = "amazon"
    URL = "https://images.amazon.com/images/P/{}.{:02d}.LZZZZZZZ.jpg"
    INDICES = (1, 2)

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        """Generate URLs using Amazon ID (ASIN) string."""
        if album.asin:
            for index in self.INDICES:
                yield self._candidate(
                    url=self.URL.format(album.asin, index),
                    match=MetadataMatch.EXACT,
                )


class AlbumArtOrg(RemoteArtSource):
    NAME = "AlbumArt.org scraper"
    ID = "albumart"
    URL = "https://www.albumart.org/index_detail.php"
    PAT = r'href\s*=\s*"([^>"]*)"[^>]*title\s*=\s*"View larger image"'

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ):
        """Return art URL from AlbumArt.org using album ASIN."""
        if not album.asin:
            return
        # Get the page from albumart.org.
        try:
            resp = self.request(self.URL, params={"asin": album.asin})
            self._log.debug("scraped art URL: {.url}", resp)
        except requests.RequestException:
            self._log.debug("error scraping art page")
            return

        # Search the page for the image URL.
        m = re.search(self.PAT, resp.text)
        if m:
            image_url = m.group(1)
            yield self._candidate(url=image_url, match=MetadataMatch.EXACT)
        else:
            self._log.debug("no image found on page")


class GoogleImages(RemoteArtSource):
    NAME = "Google Images"
    ID = "google"
    URL = "https://www.googleapis.com/customsearch/v1"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.key = (self._config["google_key"].get(),)
        self.cx = (self._config["google_engine"].get(),)

    @staticmethod
    def add_default_config(config: confuse.ConfigView):
        config.add(
            {
                "google_key": None,
                "google_engine": "001442825323518660753:hrh5ch1gjzm",
            }
        )
        config["google_key"].redact = True
        config["google_engine"].redact = True

    @classmethod
    def available(cls, log: Logger, config: confuse.ConfigView) -> bool:
        has_key = bool(config["google_key"].get())
        if not has_key:
            log.debug("google: Disabling art source due to missing key")
        return has_key

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        """Return art URL from google custom search engine
        given an album title and interpreter.
        """
        if not (album.albumartist and album.album):
            return
        search_string = f"{album.albumartist},{album.album}".encode()

        try:
            response = self.request(
                self.URL,
                params={
                    "key": self.key,
                    "cx": self.cx,
                    "q": search_string,
                    "searchType": "image",
                },
            )
        except requests.RequestException:
            self._log.debug("google: error receiving response")
            return

        # Get results using JSON.
        try:
            data = response.json()
        except ValueError:
            self._log.debug("google: error loading response: {.text}", response)
            return

        if "error" in data:
            reason = data["error"]["errors"][0]["reason"]
            self._log.debug("google fetchart error: {}", reason)
            return

        if "items" in data.keys():
            for item in data["items"]:
                yield self._candidate(
                    url=item["link"], match=MetadataMatch.EXACT
                )


class FanartTV(RemoteArtSource):
    """Art from fanart.tv requested using their API"""

    NAME = "fanart.tv"
    ID = "fanarttv"
    API_URL = "https://webservice.fanart.tv/v3/"
    API_ALBUMS = f"{API_URL}music/albums/"
    PROJECT_KEY = "61a7d0ab4e67162b7a0c7c35915cd48e"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client_key = self._config["fanarttv_key"].get()

    @staticmethod
    def add_default_config(config: confuse.ConfigView):
        config.add(
            {
                "fanarttv_key": None,
            }
        )
        config["fanarttv_key"].redact = True

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        if not album.mb_releasegroupid:
            return

        try:
            response = self.request(
                f"{self.API_ALBUMS}{album.mb_releasegroupid}",
                headers={
                    "api-key": self.PROJECT_KEY,
                    "client-key": self.client_key,
                },
            )
        except requests.RequestException:
            self._log.debug("fanart.tv: error receiving response")
            return

        try:
            data = response.json()
        except ValueError:
            self._log.debug(
                "fanart.tv: error loading response: {.text}", response
            )
            return

        if "status" in data and data["status"] == "error":
            if "not found" in data["error message"].lower():
                self._log.debug("fanart.tv: no image found")
            elif "api key" in data["error message"].lower():
                self._log.warning(
                    "fanart.tv: Invalid API key given, please "
                    "enter a valid one in your config file."
                )
            else:
                self._log.debug(
                    "fanart.tv: error on request: {}", data["error message"]
                )
            return

        matches = []
        # can there be more than one releasegroupid per response?
        for mbid, art in data.get("albums", {}).items():
            # there might be more art referenced, e.g. cdart, and an albumcover
            # might not be present, even if the request was successful
            if album.mb_releasegroupid == mbid and "albumcover" in art:
                matches.extend(art["albumcover"])
            # can this actually occur?
            else:
                self._log.debug(
                    "fanart.tv: unexpected mb_releasegroupid in response!"
                )

        matches.sort(key=lambda x: int(x["likes"]), reverse=True)
        for item in matches:
            # fanart.tv has a strict size requirement for album art to be
            # uploaded
            yield self._candidate(
                url=item["url"], match=MetadataMatch.EXACT, size=(1000, 1000)
            )


class ITunesStore(RemoteArtSource):
    NAME = "iTunes Store"
    ID = "itunes"
    API_URL = "https://itunes.apple.com/search"

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        """Return art URL from iTunes Store given an album title."""
        if not (album.albumartist and album.album):
            return

        payload = {
            "term": f"{album.albumartist} {album.album}",
            "entity": "album",
            "media": "music",
            "limit": 200,
        }
        try:
            r = self.request(self.API_URL, params=payload)
            r.raise_for_status()
        except requests.RequestException as e:
            self._log.debug("iTunes search failed: {}", e)
            return

        try:
            candidates = r.json()["results"]
        except ValueError as e:
            self._log.debug("Could not decode json response: {}", e)
            return
        except KeyError as e:
            self._log.debug(
                "{} not found in json. Fields are {} ", e, list(r.json().keys())
            )
            return

        if not candidates:
            self._log.debug(
                "iTunes search for {!r} got no results", payload["term"]
            )
            return

        if self._config["high_resolution"]:
            image_suffix = "100000x100000-999"
        else:
            image_suffix = "1200x1200bb"

        for c in candidates:
            try:
                if (
                    c["artistName"] == album.albumartist
                    and c["collectionName"] == album.album
                ):
                    art_url = c["artworkUrl100"]
                    art_url = art_url.replace("100x100bb", image_suffix)
                    yield self._candidate(
                        url=art_url, match=MetadataMatch.EXACT
                    )
            except KeyError as e:
                self._log.debug(
                    "Malformed itunes candidate: {} not found in {}",
                    e,
                    list(c.keys()),
                )

        try:
            fallback_art_url = candidates[0]["artworkUrl100"]
            fallback_art_url = fallback_art_url.replace(
                "100x100bb", image_suffix
            )
            yield self._candidate(
                url=fallback_art_url, match=MetadataMatch.FALLBACK
            )
        except KeyError as e:
            self._log.debug(
                "Malformed itunes candidate: {} not found in {}",
                e,
                list(c.keys()),
            )


class Wikipedia(RemoteArtSource):
    NAME = "Wikipedia (queried through DBpedia)"
    ID = "wikipedia"
    DBPEDIA_URL = "https://dbpedia.org/sparql"
    WIKIPEDIA_URL = "https://en.wikipedia.org/w/api.php"
    SPARQL_QUERY = """PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
                 PREFIX dbpprop: <http://dbpedia.org/property/>
                 PREFIX owl: <http://dbpedia.org/ontology/>
                 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
                 PREFIX foaf: <http://xmlns.com/foaf/0.1/>

                 SELECT DISTINCT ?pageId ?coverFilename WHERE {{
                   ?subject owl:wikiPageID ?pageId .
                   ?subject dbpprop:name ?name .
                   ?subject rdfs:label ?label .
                   {{ ?subject dbpprop:artist ?artist }}
                     UNION
                   {{ ?subject owl:artist ?artist }}
                   {{ ?artist foaf:name "{artist}"@en }}
                     UNION
                   {{ ?artist dbpprop:name "{artist}"@en }}
                   ?subject rdf:type <http://dbpedia.org/ontology/Album> .
                   ?subject dbpprop:cover ?coverFilename .
                   FILTER ( regex(?name, "{album}", "i") )
                  }}
                 Limit 1"""

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        if not (album.albumartist and album.album):
            return

        # Find the name of the cover art filename on DBpedia
        cover_filename, page_id = None, None

        try:
            dbpedia_response = self.request(
                self.DBPEDIA_URL,
                params={
                    "format": "application/sparql-results+json",
                    "timeout": 2500,
                    "query": self.SPARQL_QUERY.format(
                        artist=album.albumartist.title(), album=album.album
                    ),
                },
                headers={"content-type": "application/json"},
            )
        except requests.RequestException:
            self._log.debug("dbpedia: error receiving response")
            return

        try:
            data = dbpedia_response.json()
            results = data["results"]["bindings"]
            if results:
                cover_filename = f"File:{results[0]['coverFilename']['value']}"
                page_id = results[0]["pageId"]["value"]
            else:
                self._log.debug("wikipedia: album not found on dbpedia")
        except (ValueError, KeyError, IndexError):
            self._log.debug(
                "wikipedia: error scraping dbpedia response: {.text}",
                dbpedia_response,
            )

        # Ensure we have a filename before attempting to query wikipedia
        if not (cover_filename and page_id):
            return

        # DBPedia sometimes provides an incomplete cover_filename, indicated
        # by the filename having a space before the extension, e.g., 'foo .bar'
        # An additional Wikipedia call can help to find the real filename.
        # This may be removed once the DBPedia issue is resolved, see:
        # https://github.com/dbpedia/extraction-framework/issues/396
        if " ." in cover_filename and "." not in cover_filename.split(" .")[-1]:
            self._log.debug(
                "wikipedia: dbpedia provided incomplete cover_filename"
            )
            lpart, rpart = cover_filename.rsplit(" .", 1)

            # Query all the images in the page
            try:
                wikipedia_response = self.request(
                    self.WIKIPEDIA_URL,
                    params={
                        "format": "json",
                        "action": "query",
                        "continue": "",
                        "prop": "images",
                        "pageids": page_id,
                    },
                    headers={"content-type": "application/json"},
                )
            except requests.RequestException:
                self._log.debug("wikipedia: error receiving response")
                return

            # Try to see if one of the images on the pages matches our
            # incomplete cover_filename
            try:
                data = wikipedia_response.json()
                results = data["query"]["pages"][page_id]["images"]
                for result in results:
                    if re.match(
                        rf"{re.escape(lpart)}.*?\.{re.escape(rpart)}",
                        result["title"],
                    ):
                        cover_filename = result["title"]
                        break
            except (ValueError, KeyError):
                self._log.debug(
                    "wikipedia: failed to retrieve a cover_filename"
                )
                return

        # Find the absolute url of the cover art on Wikipedia
        try:
            wikipedia_response = self.request(
                self.WIKIPEDIA_URL,
                params={
                    "format": "json",
                    "action": "query",
                    "continue": "",
                    "prop": "imageinfo",
                    "iiprop": "url",
                    "titles": cover_filename.encode("utf-8"),
                },
                headers={"content-type": "application/json"},
            )
        except requests.RequestException:
            self._log.debug("wikipedia: error receiving response")
            return

        try:
            data = wikipedia_response.json()
            results = data["query"]["pages"]
            for _, result in results.items():
                image_url = result["imageinfo"][0]["url"]
                yield self._candidate(url=image_url, match=MetadataMatch.EXACT)
        except (ValueError, KeyError, IndexError):
            self._log.debug("wikipedia: error scraping imageinfo")
            return


class FileSystem(LocalArtSource):
    NAME = "Filesystem"
    ID = "filesystem"

    @staticmethod
    def filename_priority(
        filename: AnyStr, cover_names: Sequence[AnyStr]
    ) -> list[int]:
        """Sort order for image names.

        Return indexes of cover names found in the image filename. This
        means that images with lower-numbered and more keywords will have
        higher priority.
        """
        return [idx for (idx, x) in enumerate(cover_names) if x in filename]

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        """Look for album art files in the specified directories."""
        if not paths:
            return
        cover_names = list(map(util.bytestring_path, plugin.cover_names))
        cover_names_str = b"|".join(cover_names)
        cover_pat = rb"".join([rb"(\b|_)(", cover_names_str, rb")(\b|_)"])

        for path in paths:
            if not os.path.isdir(syspath(path)):
                continue

            # Find all files that look like images in the directory.
            images = []
            ignore = config["ignore"].as_str_seq()
            ignore_hidden = config["ignore_hidden"].get(bool)
            for _, _, files in sorted_walk(
                path, ignore=ignore, ignore_hidden=ignore_hidden
            ):
                for fn in files:
                    fn = bytestring_path(fn)
                    for ext in IMAGE_EXTENSIONS:
                        if fn.lower().endswith(b"." + ext) and os.path.isfile(
                            syspath(os.path.join(path, fn))
                        ):
                            images.append(fn)

            # Look for "preferred" filenames.
            images = sorted(
                images, key=lambda x: self.filename_priority(x, cover_names)
            )
            remaining = []
            for fn in images:
                if re.search(cover_pat, os.path.splitext(fn)[0], re.I):
                    self._log.debug(
                        "using well-named art file {}",
                        util.displayable_path(fn),
                    )
                    yield self._candidate(
                        path=os.path.join(path, fn), match=MetadataMatch.EXACT
                    )
                else:
                    remaining.append(fn)

            # Fall back to a configured image.
            if plugin.fallback:
                self._log.debug(
                    "using fallback art file {}",
                    util.displayable_path(plugin.fallback),
                )
                yield self._candidate(
                    path=plugin.fallback, match=MetadataMatch.FALLBACK
                )

            # Fall back to any image in the folder.
            if remaining and not plugin.cautious:
                self._log.debug(
                    "using fallback art file {}",
                    util.displayable_path(remaining[0]),
                )
                yield self._candidate(
                    path=os.path.join(path, remaining[0]),
                    match=MetadataMatch.FALLBACK,
                )


class LastFM(RemoteArtSource):
    NAME = "Last.fm"
    ID = "lastfm"

    # Sizes in priority order.
    SIZES: ClassVar[dict[str, tuple[int, int]]] = OrderedDict(
        [
            ("mega", (300, 300)),
            ("extralarge", (300, 300)),
            ("large", (174, 174)),
            ("medium", (64, 64)),
            ("small", (34, 34)),
        ]
    )

    API_URL = "https://ws.audioscrobbler.com/2.0"

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.key = (self._config["lastfm_key"].get(),)

    @staticmethod
    def add_default_config(config: confuse.ConfigView) -> None:
        config.add(
            {
                "lastfm_key": None,
            }
        )
        config["lastfm_key"].redact = True

    @classmethod
    def available(cls, log: Logger, config: confuse.ConfigView) -> bool:
        has_key = bool(config["lastfm_key"].get())
        if not has_key:
            log.debug("lastfm: Disabling art source due to missing key")
        return has_key

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        if not album.mb_albumid:
            return

        try:
            response = self.request(
                self.API_URL,
                params={
                    "method": "album.getinfo",
                    "api_key": self.key,
                    "mbid": album.mb_albumid,
                    "format": "json",
                },
            )
        except requests.RequestException:
            self._log.debug("lastfm: error receiving response")
            return

        try:
            data = response.json()

            if "error" in data:
                if data["error"] == 6:
                    self._log.debug(
                        "lastfm: no results for {.mb_albumid}", album
                    )
                else:
                    self._log.error(
                        "lastfm: failed to get album info: {} ({})",
                        data["message"],
                        data["error"],
                    )
            else:
                images = {
                    image["size"]: image["#text"]
                    for image in data["album"]["image"]
                }

                # Provide candidates in order of size.
                for size in self.SIZES.keys():
                    if size in images:
                        yield self._candidate(
                            url=images[size], size=self.SIZES[size]
                        )
        except ValueError:
            self._log.debug("lastfm: error loading response: {.text}", response)
            return


class Spotify(RemoteArtSource):
    NAME = "Spotify"
    ID = "spotify"

    SPOTIFY_ALBUM_URL = "https://open.spotify.com/album/"

    @classmethod
    def available(cls, log: Logger, config: confuse.ConfigView) -> bool:
        if not HAS_BEAUTIFUL_SOUP:
            log.debug(
                "To use Spotify as an album art source, "
                "you must install the beautifulsoup4 module. See "
                "the documentation for further details."
            )
        return HAS_BEAUTIFUL_SOUP

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        try:
            url = f"{self.SPOTIFY_ALBUM_URL}{album.items().get().spotify_album_id}"
        except AttributeError:
            self._log.debug("Fetchart: no Spotify album ID found")
            return

        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
        except requests.RequestException as e:
            self._log.debug("Error: {!s}", e)
            return

        try:
            html = response.text
            soup = BeautifulSoup(html, "html.parser")
        except ValueError:
            self._log.debug(
                "Spotify: error loading response: {.text}", response
            )
            return

        tag = soup.find("meta", attrs={"property": "og:image"})
        if tag is None or not isinstance(tag, Tag):
            self._log.debug(
                "Spotify: Unexpected response, og:image tag missing"
            )
            return

        image_url = tag["content"]
        yield self._candidate(url=image_url, match=MetadataMatch.EXACT)


class CoverArtUrl(RemoteArtSource):
    # This source is intended to be used with a plugin that sets the
    # cover_art_url field on albums or tracks. Users can also manually update
    # the cover_art_url field using the "set" command. This source will then
    # use that URL to fetch the image.

    NAME = "Cover Art URL"
    ID = "cover_art_url"

    def get(
        self,
        album: Album,
        plugin: FetchArtPlugin,
        paths: None | Sequence[bytes],
    ) -> Iterator[Candidate]:
        image_url = None
        try:
            # look for cover_art_url on album or first track
            if album.get("cover_art_url"):
                image_url = album.cover_art_url
            else:
                image_url = album.items().get().cover_art_url
            self._log.debug("Cover art URL {} found for {}", image_url, album)
        except (AttributeError, TypeError):
            self._log.debug("Cover art URL not found for {}", album)
            return
        if image_url:
            yield self._candidate(url=image_url, match=MetadataMatch.EXACT)
        else:
            self._log.debug("Cover art URL not found for {}", album)
            return


# All art sources. The order they will be tried in is specified by the config.
ART_SOURCES: set[type[ArtSource]] = {
    FileSystem,
    CoverArtArchive,
    ITunesStore,
    AlbumArtOrg,
    Amazon,
    Wikipedia,
    GoogleImages,
    FanartTV,
    LastFM,
    Spotify,
    CoverArtUrl,
}


# PLUGIN LOGIC ###############################################################


class FetchArtPlugin(plugins.BeetsPlugin, RequestMixin):
    PAT_PX = r"(0|[1-9][0-9]*)px"
    PAT_PERCENT = r"(100(\.00?)?|[1-9]?[0-9](\.[0-9]{1,2})?)%"

    def __init__(self) -> None:
        super().__init__()

        # Holds candidates corresponding to downloaded images between
        # fetching them and placing them in the filesystem.
        self.art_candidates: dict[ImportTask, Candidate] = {}

        self.config.add(
            {
                "auto": True,
                "minwidth": 0,
                "maxwidth": 0,
                "quality": 0,
                "max_filesize": 0,
                "enforce_ratio": False,
                "cautious": False,
                "cover_names": ["cover", "front", "art", "album", "folder"],
                "fallback": None,
                "sources": [
                    "filesystem",
                    "coverart",
                    "itunes",
                    "amazon",
                    "albumart",
                    "cover_art_url",
                ],
                "store_source": False,
                "high_resolution": False,
                "deinterlace": False,
                "cover_format": None,
            }
        )
        for source in ART_SOURCES:
            source.add_default_config(self.config)

        self.minwidth = self.config["minwidth"].get(int)
        self.maxwidth = self.config["maxwidth"].get(int)
        self.max_filesize = self.config["max_filesize"].get(int)
        self.quality = self.config["quality"].get(int)

        # allow both pixel and percentage-based margin specifications
        self.enforce_ratio = self.config["enforce_ratio"].get(
            confuse.OneOf[bool | str](
                [
                    bool,
                    confuse.String(pattern=self.PAT_PX),
                    confuse.String(pattern=self.PAT_PERCENT),
                ]
            )
        )
        self.margin_px = None
        self.margin_percent = None
        self.deinterlace = self.config["deinterlace"].get(bool)
        if isinstance(self.enforce_ratio, str):
            if self.enforce_ratio[-1] == "%":
                self.margin_percent = float(self.enforce_ratio[:-1]) / 100
            elif self.enforce_ratio[-2:] == "px":
                self.margin_px = int(self.enforce_ratio[:-2])
            else:
                # shouldn't happen
                raise confuse.ConfigValueError()
            self.enforce_ratio = True

        cover_names = self.config["cover_names"].as_str_seq()
        self.cover_names = list(map(util.bytestring_path, cover_names))
        self.cautious = self.config["cautious"].get(bool)
        self.fallback = self.config["fallback"].get(
            confuse.Optional(confuse.Filename())
        )
        self.store_source = self.config["store_source"].get(bool)

        self.cover_format = self.config["cover_format"].get(
            confuse.Optional(str)
        )

        if self.config["auto"]:
            # Enable two import hooks when fetching is enabled.
            self.import_stages = [self.fetch_art]
            self.register_listener("import_task_files", self.assign_art)

        available_sources = [
            (s_cls.ID, c)
            for s_cls in ART_SOURCES
            if s_cls.available(self._log, self.config)
            for c in s_cls.VALID_MATCHING_CRITERIA
        ]
        sources = sanitize_pairs(
            self.config["sources"].as_pairs(default_value="*"),
            available_sources,
        )

        if "remote_priority" in self.config:
            self._log.warning(
                "The `fetch_art.remote_priority` configuration option has "
                "been deprecated. Instead, place `filesystem` at the end of "
                "your `sources` list."
            )
            if self.config["remote_priority"].get(bool):
                fs = []
                others = []
                for s, c in sources:
                    if s == "filesystem":
                        fs.append((s, c))
                    else:
                        others.append((s, c))
                sources = others + fs

        sources_by_name = {s_cls.ID: s_cls for s_cls in ART_SOURCES}

        self.sources = [
            sources_by_name[s](self._log, self.config, match_by=[c])
            for s, c in sources
        ]

    @staticmethod
    def _is_source_file_removal_enabled() -> bool:
        return config["import"]["delete"].get(bool) or config["import"][
            "move"
        ].get(bool)

    # Asynchronous; after music is added to the library.
    def fetch_art(self, session: ImportSession, task: ImportTask) -> None:
        """Find art for the album being imported."""
        if task.is_album:  # Only fetch art for full albums.
            if task.album.artpath and os.path.isfile(
                syspath(task.album.artpath)
            ):
                # Album already has art (probably a re-import); skip it.
                return
            if task.choice_flag == importer.Action.ASIS:
                # For as-is imports, don't search Web sources for art.
                local = True
            elif task.choice_flag in (
                importer.Action.APPLY,
                importer.Action.RETAG,
            ):
                # Search everywhere for art.
                local = False
            else:
                # For any other choices (e.g., TRACKS), do nothing.
                return

            candidate = self.art_for_album(task.album, task.paths, local)

            if candidate:
                self.art_candidates[task] = candidate

    def _set_art(
        self, album: Album, candidate: Candidate, delete: bool = False
    ) -> None:
        album.set_art(candidate.path, delete)
        if self.store_source:
            # store the source of the chosen artwork in a flexible field
            self._log.debug(
                "Storing art_source for {0.albumartist} - {0.album}", album
            )
            album.art_source = candidate.source_name
        album.store()

    # Synchronous; after music files are put in place.
    def assign_art(self, session: ImportSession, task: ImportTask):
        """Place the discovered art in the filesystem."""
        if task in self.art_candidates:
            candidate = self.art_candidates.pop(task)
            removal_enabled = self._is_source_file_removal_enabled()

            self._set_art(task.album, candidate, not removal_enabled)

            if removal_enabled:
                task.prune(candidate.path)

    # Manual album art fetching.
    def commands(self) -> list[ui.Subcommand]:
        cmd = ui.Subcommand("fetchart", help="download album art")
        cmd.parser.add_option(
            "-f",
            "--force",
            dest="force",
            action="store_true",
            default=False,
            help="re-download art when already present",
        )
        cmd.parser.add_option(
            "-q",
            "--quiet",
            dest="quiet",
            action="store_true",
            default=False,
            help="quiet mode: do not output albums that already have artwork",
        )

        def func(lib: Library, opts, args) -> None:
            self.batch_fetch_art(lib, lib.albums(args), opts.force, opts.quiet)

        cmd.func = func
        return [cmd]

    # Utilities converted from functions to methods on logging overhaul

    def art_for_album(
        self,
        album: Album,
        paths: None | Sequence[bytes],
        local_only: bool = False,
    ) -> None | Candidate:
        """Given an Album object, returns a path to downloaded art for the
        album (or None if no art is found). If `maxwidth`, then images are
        resized to this maximum pixel size. If `quality` then resized images
        are saved at the specified quality level. If `local_only`, then only
        local image files from the filesystem are returned; no network
        requests are made.
        """
        out = None

        for source in self.sources:
            if source.LOC == "local" or not local_only:
                self._log.debug(
                    "trying source {0.description}"
                    " for album {1.albumartist} - {1.album}",
                    source,
                    album,
                )
                # URLs might be invalid at this point, or the image may not
                # fulfill the requirements
                for candidate in source.get(album, self, paths):
                    source.fetch_image(candidate, self)
                    if candidate.validate(self) != ImageAction.BAD:
                        out = candidate
                        assert out.path is not None  # help mypy
                        self._log.debug(
                            "using {.LOC} image {.path}", source, out
                        )
                        break
                    # Remove temporary files for invalid candidates.
                    source.cleanup(candidate)
                if out:
                    break

        if out:
            out.resize(self)

        return out

    def batch_fetch_art(
        self,
        lib: Library,
        albums: Iterable[Album],
        force: bool,
        quiet: bool,
    ) -> None:
        """Fetch album art for each of the albums. This implements the manual
        fetchart CLI command.
        """
        for album in albums:
            if (
                album.artpath
                and not force
                and os.path.isfile(syspath(album.artpath))
            ):
                if not quiet:
                    message = ui.colorize(
                        "text_highlight_minor", "has album art"
                    )
                    ui.print_(f"{album}: {message}")
            else:
                # In ordinary invocations, look for images on the
                # filesystem. When forcing, however, always go to the Web
                # sources.
                local_paths = None if force else [album.path]

                candidate = self.art_for_album(album, local_paths)
                if candidate:
                    self._set_art(album, candidate)
                    message = ui.colorize("text_success", "found album art")
                else:
                    message = ui.colorize("text_error", "no art found")
                ui.print_(f"{album}: {message}")
