# This file is part of beets.
# Copyright 2016, Thomas Scholtes.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
from __future__ import annotations

import fnmatch
import os.path
import re
import sys
import unittest
from typing import TYPE_CHECKING

import pytest
from mediafile import MediaFile

from beets import util
from beets.library import Item
from beets.test import _common
from beets.test.helper import (
    AsIsImporterMixin,
    ImportHelper,
    IOMixin,
    PluginTestCase,
    capture_log,
)
from beetsplug import convert

if TYPE_CHECKING:
    from pathlib import Path


def shell_quote(text):
    import shlex

    return shlex.quote(text)


class ConvertMixin:
    def tagged_copy_cmd(self, tag):
        """Return a conversion command that copies files and appends
        `tag` to the copy.
        """
        if re.search("[^a-zA-Z0-9]", tag):
            raise ValueError(
                f"tag '{tag}' must only contain letters and digits"
            )

        # A Python script that copies the file and appends a tag.
        stub = os.path.join(_common.RSRC, b"convert_stub.py").decode("utf-8")
        return f"{shell_quote(sys.executable)} {shell_quote(stub)} $source $dest {tag}"

    def file_endswith(self, path: Path, tag: str):
        """Check the path is a file and if its content ends with `tag`."""
        assert path.exists()
        assert path.is_file()
        return path.read_bytes().endswith(tag.encode("utf-8"))


class ConvertTestCase(IOMixin, ConvertMixin, PluginTestCase):
    db_on_disk = True
    plugin = "convert"


@_common.slow_test()
class ImportConvertTest(AsIsImporterMixin, ImportHelper, ConvertTestCase):
    def setUp(self):
        super().setUp()
        self.config["convert"] = {
            "dest": os.path.join(self.temp_dir, b"convert"),
            "command": self.tagged_copy_cmd("convert"),
            # Enforce running convert
            "max_bitrate": 1,
            "auto": True,
            "quiet": False,
        }

    def test_import_converted(self):
        self.run_asis_importer()
        item = self.lib.items().get()
        assert self.file_endswith(item.filepath, "convert")

    # FIXME: fails on windows
    @unittest.skipIf(sys.platform == "win32", "win32")
    def test_import_original_on_convert_error(self):
        # `false` exits with non-zero code
        self.config["convert"]["command"] = "false"
        self.run_asis_importer()

        item = self.lib.items().get()
        assert item is not None
        assert item.filepath.is_file()

    def test_delete_originals(self):
        self.config["convert"]["delete_originals"] = True
        self.run_asis_importer()
        for path in self.importer.paths:
            for root, dirnames, filenames in os.walk(path):
                assert len(fnmatch.filter(filenames, "*.mp3")) == 0, (
                    f"Non-empty import directory {util.displayable_path(path)}"
                )

    def get_count_of_import_files(self):
        import_file_count = 0

        for path in self.importer.paths:
            for root, _, filenames in os.walk(path):
                import_file_count += len(filenames)

        return import_file_count


class ConvertCommand:
    """A mixin providing a utility method to run the `convert`command
    in tests.
    """

    def run_convert_path(self, item, *args):
        """Run the `convert` command on a given path."""
        return self.run_command("convert", *args, f"path:{item.filepath}")

    def run_convert(self, *args):
        """Run the `convert` command on `self.item`."""
        return self.run_convert_path(self.item, *args)


@_common.slow_test()
class ConvertCliTest(ConvertTestCase, ConvertCommand):
    def setUp(self):
        super().setUp()
        self.album = self.add_album_fixture(ext="ogg")
        self.item = self.album.items()[0]

        self.convert_dest = self.temp_dir_path / "convert_dest"
        self.converted_mp3 = self.convert_dest / "converted.mp3"
        self.config["convert"] = {
            "dest": str(self.convert_dest),
            "paths": {"default": "converted"},
            "format": "mp3",
            "formats": {
                "mp3": self.tagged_copy_cmd("mp3"),
                "ogg": self.tagged_copy_cmd("ogg"),
                "opus": {
                    "command": self.tagged_copy_cmd("opus"),
                    "extension": "ops",
                },
            },
        }

    def test_convert(self):
        self.io.addinput("y")
        self.run_convert()
        assert self.file_endswith(self.converted_mp3, "mp3")

    def test_convert_with_auto_confirmation(self):
        self.run_convert("--yes")
        assert self.file_endswith(self.converted_mp3, "mp3")

    def test_reject_confirmation(self):
        self.io.addinput("n")
        self.run_convert()
        assert not self.converted_mp3.exists()

    def test_convert_keep_new(self):
        assert os.path.splitext(self.item.path)[1] == b".ogg"

        self.io.addinput("y")
        self.run_convert("--keep-new")

        self.item.load()
        assert os.path.splitext(self.item.path)[1] == b".mp3"

    def test_format_option(self):
        self.io.addinput("y")
        self.run_convert("--format", "opus")
        assert self.file_endswith(self.convert_dest / "converted.ops", "opus")

    def test_embed_album_art(self):
        self.config["convert"]["embed"] = True
        image_path = os.path.join(_common.RSRC, b"image-2x3.jpg")
        self.album.artpath = image_path
        self.album.store()
        with open(os.path.join(image_path), "rb") as f:
            image_data = f.read()

        self.io.addinput("y")
        self.run_convert()
        mediafile = MediaFile(self.converted_mp3)
        assert mediafile.images[0].data == image_data

    def test_skip_existing(self):
        converted = self.converted_mp3
        self.touch(converted, content="XXX")
        self.run_convert("--yes")
        with open(converted) as f:
            assert f.read() == "XXX"

    def test_pretend(self):
        self.run_convert("--pretend")
        assert not self.converted_mp3.exists()

    def test_empty_query(self):
        with capture_log("beets.convert") as logs:
            self.run_convert("An impossible query")
        assert logs[0] == "convert: Empty query result."

    def test_no_transcode_when_maxbr_set_high_and_different_formats(self):
        self.config["convert"]["max_bitrate"] = 5000
        self.io.addinput("y")
        self.run_convert()
        assert self.file_endswith(self.converted_mp3, "mp3")

    def test_transcode_when_maxbr_set_low_and_different_formats(self):
        self.config["convert"]["max_bitrate"] = 5
        self.io.addinput("y")
        self.run_convert()
        assert self.file_endswith(self.converted_mp3, "mp3")

    def test_transcode_when_maxbr_set_to_none_and_different_formats(self):
        self.io.addinput("y")
        self.run_convert()
        assert self.file_endswith(self.converted_mp3, "mp3")

    def test_no_transcode_when_maxbr_set_high_and_same_formats(self):
        self.config["convert"]["max_bitrate"] = 5000
        self.config["convert"]["format"] = "ogg"
        self.io.addinput("y")
        self.run_convert()
        assert not self.file_endswith(
            self.convert_dest / "converted.ogg", "ogg"
        )

    def test_force_overrides_max_bitrate_and_same_formats(self):
        self.config["convert"]["max_bitrate"] = 5000
        self.config["convert"]["format"] = "ogg"

        self.io.addinput("y")
        self.run_convert("--force")

        converted = self.convert_dest / "converted.ogg"
        assert self.file_endswith(converted, "ogg")

    def test_transcode_when_maxbr_set_low_and_same_formats(self):
        self.config["convert"]["max_bitrate"] = 5
        self.config["convert"]["format"] = "ogg"
        self.io.addinput("y")
        self.run_convert()
        assert self.file_endswith(self.convert_dest / "converted.ogg", "ogg")

    def test_transcode_when_maxbr_set_to_none_and_same_formats(self):
        self.config["convert"]["format"] = "ogg"
        self.io.addinput("y")
        self.run_convert()
        assert not self.file_endswith(
            self.convert_dest / "converted.ogg", "ogg"
        )

    def test_playlist(self):
        self.io.addinput("y")
        self.run_convert("--playlist", "playlist.m3u8")
        assert (self.convert_dest / "playlist.m3u8").exists()

    def test_playlist_pretend(self):
        self.run_convert("--playlist", "playlist.m3u8", "--pretend")
        assert not (self.convert_dest / "playlist.m3u8").exists()

    def test_force_overrides_no_convert(self):
        self.config["convert"]["formats"]["opus"] = {
            "command": self.tagged_copy_cmd("opus"),
            "extension": "ops",
        }
        self.config["convert"]["no_convert"] = "format:ogg"

        [item] = self.add_item_fixtures(ext="ogg")

        self.io.addinput("y")
        self.run_convert_path(item, "--format", "opus", "--force")

        converted = self.convert_dest / "converted.ops"
        assert self.file_endswith(converted, "opus")

    def assert_playlist_entry(self, expected_entry, *args):
        self.io.addinput("y")
        self.run_convert(*args, "--playlist", "playlist.m3u8")
        lines = (self.convert_dest / "playlist.m3u8").read_text().splitlines()
        assert lines[0] == "#EXTM3U"
        assert lines[1] == expected_entry

    def test_playlist_entry_uses_config_format(self):
        self.assert_playlist_entry("converted.mp3")

    def test_playlist_entry_uses_cli_format(self):
        self.assert_playlist_entry("converted.ops", "--format", "opus")

    def test_playlist_entry_keeps_original_extension_when_not_transcoded(self):
        self.config["convert"]["no_convert"] = "format:ogg"
        self.assert_playlist_entry("converted.ogg")

    def test_playlist_entry_keep_new_points_to_destination_file(self):
        self.assert_playlist_entry("converted.ogg", "--keep-new")


@_common.slow_test()
class NeverConvertLossyFilesTest(ConvertTestCase, ConvertCommand):
    """Test the effect of the `never_convert_lossy_files` option."""

    def setUp(self):
        super().setUp()

        self.convert_dest = self.temp_dir_path / "convert_dest"
        self.config["convert"] = {
            "dest": str(self.convert_dest),
            "paths": {"default": "converted"},
            "never_convert_lossy_files": True,
            "format": "mp3",
            "formats": {
                "mp3": self.tagged_copy_cmd("mp3"),
            },
        }

    def test_transcode_from_lossless(self):
        [item] = self.add_item_fixtures(ext="flac")
        self.io.addinput("y")
        self.run_convert_path(item)
        converted = self.convert_dest / "converted.mp3"
        assert self.file_endswith(converted, "mp3")

    def test_transcode_from_lossy(self):
        self.config["convert"]["never_convert_lossy_files"] = False
        [item] = self.add_item_fixtures(ext="ogg")
        self.io.addinput("y")
        self.run_convert_path(item)
        converted = self.convert_dest / "converted.mp3"
        assert self.file_endswith(converted, "mp3")

    def test_transcode_from_lossy_prevented(self):
        [item] = self.add_item_fixtures(ext="ogg")
        self.io.addinput("y")
        self.run_convert_path(item)
        converted = self.convert_dest / "converted.ogg"
        assert not self.file_endswith(converted, "mp3")

    def test_force_overrides_never_convert_lossy_files(self):
        self.config["convert"]["formats"]["opus"] = {
            "command": self.tagged_copy_cmd("opus"),
            "extension": "ops",
        }
        [item] = self.add_item_fixtures(ext="ogg")

        self.io.addinput("y")
        self.run_convert_path(item, "--format", "opus", "--force")

        converted = self.convert_dest / "converted.ops"
        assert self.file_endswith(converted, "opus")


class TestNoConvert:
    """Test the effect of the `no_convert` option."""

    @pytest.mark.parametrize(
        "config_value, should_skip",
        [
            ("", False),
            ("bitrate:320", False),
            ("bitrate:320 format:ogg", False),
            ("bitrate:320 , format:ogg", True),
        ],
    )
    def test_no_convert_skip(self, config_value, should_skip):
        item = Item(format="ogg", bitrate=256)
        convert.config["convert"]["no_convert"] = config_value
        assert convert.in_no_convert(item) == should_skip
