import unittest
import os
import shutil
import overrides_hack

from utils import create_sparse_tempfile, create_lio_device, delete_lio_device, fake_utils, TestTags, tag_test, required_plugins, setup_scsi_debug, clean_scsi_debug

import gi
gi.require_version('GLib', '2.0')
gi.require_version('BlockDev', '3.0')
from gi.repository import BlockDev, GLib


@required_plugins(("smartmontools",))
class SmartmontoolsTest(unittest.TestCase):

    # dumps from real drives, both HDD and SSD
    ATA_JSON_DUMPS = ["TOSHIBA_THNSNH128GBST", "Hitachi_HDS5C3020ALA632", "HGST_HMS5C4040BLE640",
                      "HGST_HUS726060ALA640", "INTEL_SSDSC2BB120G4L", "WDC_WD10EFRX-68PJCN0"]
    SCSI_JSON_DUMPS = ["WD4001FYYG-01SL3", "HGST_HUSMR3280ASS200", "SEAGATE_ST600MP0036",
                      "TOSHIBA_AL15SEB120NY", "TOSHIBA_AL15SEB18EQY", "TOSHIBA_KPM5XMUG400G"]

    @classmethod
    def setUpClass(cls):
        cls.ps = BlockDev.PluginSpec(name=BlockDev.Plugin.SMART, so_name="libbd_smartmontools.so.3")
        cls.ps2 = BlockDev.PluginSpec(name=BlockDev.Plugin.LOOP)
        if not BlockDev.is_initialized():
            BlockDev.init([cls.ps, cls.ps2], None)
        else:
            BlockDev.reinit([cls.ps, cls.ps2], True, None)

    def _setup_lio(self):
        self.lio_dev_file = create_sparse_tempfile("smart_test", 1024**3)
        try:
            self.lio_dev = create_lio_device(self.lio_dev_file)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup LIO device for testing: %s" % e)

    def _clean_lio(self):
        try:
            delete_lio_device(self.lio_dev)
        except:
            pass
        os.unlink(self.lio_dev_file)

    def _setup_loop(self):
        self.loop_dev_file = create_sparse_tempfile("smart_test", 1024**3)
        succ, self.loop_dev = BlockDev.loop_setup(self.loop_dev_file)
        self.assertTrue(succ)
        self.assertTrue(self.loop_dev)
        self.loop_dev = '/dev/' + self.loop_dev

    def _clean_loop(self):
        try:
            BlockDev.loop_teardown(self.loop_dev)
        except:
            pass
        os.unlink(self.loop_dev_file)

    @tag_test(TestTags.NOSTORAGE)
    def test_plugin_version(self):
        self.assertEqual(BlockDev.get_plugin_soname(BlockDev.Plugin.SMART), "libbd_smartmontools.so.3")

    @tag_test(TestTags.CORE)
    def test_ata_info(self):
        """Test SMART ATA info on LIO, loop and scsi_debug devices"""

        if not shutil.which("smartctl"):
            raise unittest.SkipTest("smartctl executable not found in $PATH, skipping.")

        # non-existing device
        msg = r".*Error getting ATA SMART info: /dev/.*: Unable to detect device type"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info("/dev/nonexistent", None)
        msg = r".*Error getting ATA SMART info: /dev/.*: Unknown device type 'xxx'"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info("/dev/nonexistent", [BlockDev.ExtraArg.new("--device=xxx", "")])
        msg = r".*Error getting ATA SMART info: Smartctl open device: /dev/.* failed: No such device"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info("/dev/nonexistent", [BlockDev.ExtraArg.new("--device=ata", "")])

        # LIO device (SCSI)
        self._setup_lio()
        self.addCleanup(self._clean_lio)
        msg = r"Error parsing smartctl JSON data: The member .ata_smart_data. is not defined in the object at the current position."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.lio_dev, None)
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.lio_dev, [BlockDev.ExtraArg.new("--device=scsi", "")])
        msg = r"Error getting ATA SMART info: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.lio_dev, [BlockDev.ExtraArg.new("--device=ata", "")])
        msg = r"Error getting ATA SMART info: Read NVMe Identify Controller failed: NVME_IOCTL_ADMIN_CMD: Invalid argument"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.lio_dev, [BlockDev.ExtraArg.new("--device=nvme", "")])

        # loop device
        self._setup_loop()
        self.addCleanup(self._clean_loop)
        msg = r".*Error getting ATA SMART info: /dev/.*: Unable to detect device type"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.loop_dev, None)
        msg = r"Error getting ATA SMART info: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.loop_dev, [BlockDev.ExtraArg.new("--device=scsi", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.loop_dev, [BlockDev.ExtraArg.new("--device=ata", "")])

        # scsi_debug
        self.scsi_debug_dev = setup_scsi_debug()
        self.addCleanup(clean_scsi_debug, self.scsi_debug_dev)
        msg = r"Error parsing smartctl JSON data: The member .ata_smart_data. is not defined in the object at the current position."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.scsi_debug_dev, None)
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.scsi_debug_dev, [BlockDev.ExtraArg.new("--device=scsi", "")])
        msg = r"Error getting ATA SMART info: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_ata_get_info(self.scsi_debug_dev, [BlockDev.ExtraArg.new("--device=ata", "")])

    @tag_test(TestTags.CORE)
    def test_ata_real_dumps_fake_tool(self):
        """Test SMART ATA info on supplied JSON dumps (from real devices) via fake smartctl"""

        with fake_utils("tests/fake_utils/smartctl"):
            for d in self.ATA_JSON_DUMPS:
                data = BlockDev.smart_ata_get_info(d)
                self.assertIsNotNone(data)
                self.assertTrue(data.overall_status_passed)
                self.assertGreater(data.power_cycle_count, 0)
                self.assertGreater(data.power_on_time, 0)
                self.assertEqual(data.self_test_percent_remaining, 0)
                self.assertGreater(data.smart_capabilities, 0)
                self.assertTrue(data.smart_enabled)
                self.assertTrue(data.smart_supported)
                self.assertGreater(data.temperature, 0)
                self.assertGreater(len(data.attributes), 0)
                for attr in data.attributes:
                    self.assertGreater(attr.id, 0)
                    self.assertGreater(len(attr.name), 0)
                    self.assertGreater(len(attr.pretty_value_string), 0)

    @tag_test(TestTags.CORE)
    def test_ata_real_dumps_by_blob(self):
        """Test SMART ATA info on supplied skdump blobs (from real devices)"""

        # feed it with garbage
        for d in ["/dev/zero", "/dev/random"]:
            with open(d, "rb") as f:
                content = f.read(1024)
                msg = r"Error getting ATA SMART info: (Empty response|JSON data must be UTF-8 encoded|.*unexpected character.*)"
                with self.assertRaisesRegex(GLib.GError, msg):
                    BlockDev.smart_ata_get_info_from_data(content)

        # feed it with proper JSON
        for d in self.ATA_JSON_DUMPS:
            with open(os.path.join("tests", "smart_dumps", "%s.json" % d), "rb") as f:
                content = f.read()
                data = BlockDev.smart_ata_get_info_from_data(content)
                self.assertIsNotNone(data)
                self.assertTrue(data.overall_status_passed)
                self.assertGreater(data.power_cycle_count, 0)
                self.assertGreater(data.power_on_time, 0)
                self.assertEqual(data.self_test_percent_remaining, 0)
                self.assertGreater(data.smart_capabilities, 0)
                self.assertTrue(data.smart_enabled)
                self.assertTrue(data.smart_supported)
                self.assertGreater(data.temperature, 0)
                self.assertGreater(len(data.attributes), 0)
                for attr in data.attributes:
                    self.assertGreater(attr.id, 0)
                    self.assertGreater(len(attr.name), 0)
                    self.assertGreater(len(attr.pretty_value_string), 0)

    @tag_test(TestTags.CORE)
    def test_ata_error_dumps(self):
        """Test SMART ATA info on supplied JSON dumps (error cases)"""

        with fake_utils("tests/fake_utils/smartctl"):
            msg = r"Reported smartctl JSON format version too low: 0"
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_ata_get_info("01_old_ver")

            msg = r"Error getting ATA SMART info: Command line did not parse."
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_ata_get_info("02_exit_err")

            data = BlockDev.smart_ata_get_info("03_exit_err_32")
            self.assertIsNotNone(data)

            msg = r"Error getting ATA SMART info: .* Parse error: unexpected character"
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_ata_get_info("04_malformed")

            msg = r"Empty response"
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_ata_get_info("05_empty")

    @tag_test(TestTags.CORE)
    def test_smart_enable_disable(self):
        """Test turning SMART functionality on/off over LIO, loop and scsi_debug devices"""
        # FYI: take this test with a grain of salt - smartctl behaves unpredictably on unsupported
        #      devices, often not reporting any error at all despite the device clearly not supporting
        #      the SMART functionality

        if not shutil.which("smartctl"):
            raise unittest.SkipTest("smartctl executable not found in $PATH, skipping.")

        # non-existing device
        msg = r"Error setting SMART functionality: /dev/.*: Unable to detect device type"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled("/dev/nonexistent", False)
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled("/dev/nonexistent", True)
        msg = r"Error setting SMART functionality: Smartctl open device: /dev/.* failed: No such device"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled("/dev/nonexistent", False, [BlockDev.ExtraArg.new("--device=ata", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled("/dev/nonexistent", True, [BlockDev.ExtraArg.new("--device=ata", "")])

        # LIO device (SCSI)
        self._setup_lio()
        self.addCleanup(self._clean_lio)
        msg = r"Error setting SMART functionality: Some SMART or other ATA command to the disk failed, or there was a checksum error in a SMART data structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.lio_dev, False)
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.lio_dev, False, [BlockDev.ExtraArg.new("--device=scsi", "")])
        # smartctl doesn't report failure when turning SMART on
        BlockDev.smart_set_enabled(self.lio_dev, True)
        BlockDev.smart_set_enabled(self.lio_dev, True, [BlockDev.ExtraArg.new("--device=scsi", "")])
        msg = r"Error setting SMART functionality: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.lio_dev, False, [BlockDev.ExtraArg.new("--device=ata", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.lio_dev, True, [BlockDev.ExtraArg.new("--device=ata", "")])

        # loop device
        self._setup_loop()
        self.addCleanup(self._clean_loop)
        # Sadly there's no specific message reported for loop devices (not being an ATA device)
        msg = r"Error setting SMART functionality: /dev/.*: Unable to detect device type"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.loop_dev, False)
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.loop_dev, True)
        msg = r"Error setting SMART functionality: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.loop_dev, False, [BlockDev.ExtraArg.new("--device=scsi", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.loop_dev, True, [BlockDev.ExtraArg.new("--device=scsi", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.loop_dev, False, [BlockDev.ExtraArg.new("--device=ata", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.loop_dev, True, [BlockDev.ExtraArg.new("--device=ata", "")])

        # scsi_debug
        self.scsi_debug_dev = setup_scsi_debug()
        self.addCleanup(clean_scsi_debug, self.scsi_debug_dev)
        BlockDev.smart_set_enabled(self.scsi_debug_dev, False)
        BlockDev.smart_set_enabled(self.scsi_debug_dev, True)
        BlockDev.smart_set_enabled(self.scsi_debug_dev, False, [BlockDev.ExtraArg.new("--device=scsi", "")])
        BlockDev.smart_set_enabled(self.scsi_debug_dev, True, [BlockDev.ExtraArg.new("--device=scsi", "")])
        msg = r"Error setting SMART functionality: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.scsi_debug_dev, False, [BlockDev.ExtraArg.new("--device=ata", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_set_enabled(self.scsi_debug_dev, True, [BlockDev.ExtraArg.new("--device=ata", "")])

    @tag_test(TestTags.CORE)
    def test_smart_selftest(self):
        """Test SMART self-test functionality over LIO, loop and scsi_debug devices"""

        if not shutil.which("smartctl"):
            raise unittest.SkipTest("smartctl executable not found in $PATH, skipping.")

        # non-existing device
        for t in [BlockDev.SmartSelfTestOp.OFFLINE, BlockDev.SmartSelfTestOp.SHORT,
                  BlockDev.SmartSelfTestOp.LONG, BlockDev.SmartSelfTestOp.CONVEYANCE,
                  BlockDev.SmartSelfTestOp.ABORT]:
            msg = r"Error executing SMART self-test: /dev/.*: Unable to detect device type"
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test("/dev/nonexistent", t)
            msg = r"Error executing SMART self-test: /dev/.*: Unknown device type 'xxx'"
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test("/dev/nonexistent", t, [BlockDev.ExtraArg.new("--device=xxx", "")])
            msg = r"Error executing SMART self-test: Smartctl open device: /dev/.* failed: No such device"
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test("/dev/nonexistent", t, [BlockDev.ExtraArg.new("--device=scsi", "")])

        # LIO device (SCSI)
        self._setup_lio()
        self.addCleanup(self._clean_lio)
        for t in [BlockDev.SmartSelfTestOp.OFFLINE, BlockDev.SmartSelfTestOp.SHORT,
                  BlockDev.SmartSelfTestOp.LONG, BlockDev.SmartSelfTestOp.ABORT]:
            msg = r"Error executing SMART self-test: Some SMART or other ATA command to the disk failed, or there was a checksum error in a SMART data structure."
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test(self.lio_dev, t)
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test(self.lio_dev, t, [BlockDev.ExtraArg.new("--device=scsi", "")])
            msg = r"Error executing SMART self-test: Device open failed or device did not return an IDENTIFY DEVICE structure."
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test(self.lio_dev, t, [BlockDev.ExtraArg.new("--device=ata", "")])
        BlockDev.smart_device_self_test(self.lio_dev, BlockDev.SmartSelfTestOp.CONVEYANCE)

        # loop device
        self._setup_loop()
        self.addCleanup(self._clean_loop)
        for t in [BlockDev.SmartSelfTestOp.OFFLINE, BlockDev.SmartSelfTestOp.SHORT,
                  BlockDev.SmartSelfTestOp.LONG, BlockDev.SmartSelfTestOp.CONVEYANCE,
                  BlockDev.SmartSelfTestOp.ABORT]:
            msg = r"Error executing SMART self-test: /dev/.*: Unable to detect device type"
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test(self.loop_dev, t)
            msg = r"Error executing SMART self-test: Device open failed or device did not return an IDENTIFY DEVICE structure."
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test(self.loop_dev, t, [BlockDev.ExtraArg.new("--device=ata", "")])
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test(self.loop_dev, t, [BlockDev.ExtraArg.new("--device=scsi", "")])

        # scsi_debug
        self.scsi_debug_dev = setup_scsi_debug()
        self.addCleanup(clean_scsi_debug, self.scsi_debug_dev)
        for t in [BlockDev.SmartSelfTestOp.OFFLINE, BlockDev.SmartSelfTestOp.SHORT,
                  BlockDev.SmartSelfTestOp.LONG, BlockDev.SmartSelfTestOp.CONVEYANCE,
                  BlockDev.SmartSelfTestOp.ABORT]:
            BlockDev.smart_device_self_test(self.scsi_debug_dev, t)
            BlockDev.smart_device_self_test(self.scsi_debug_dev, t, [BlockDev.ExtraArg.new("--device=scsi", "")])
            msg = r"Error executing SMART self-test: Device open failed or device did not return an IDENTIFY DEVICE structure."
            with self.assertRaisesRegex(GLib.GError, msg):
                BlockDev.smart_device_self_test(self.scsi_debug_dev, t, [BlockDev.ExtraArg.new("--device=ata", "")])

    @tag_test(TestTags.CORE)
    def test_scsi_info(self):
        """Test SMART SCSI info on LIO, loop and scsi_debug devices"""

        if not shutil.which("smartctl"):
            raise unittest.SkipTest("smartctl executable not found in $PATH, skipping.")

        # non-existing device
        msg = r"Error getting SCSI SMART info: /dev/.*: Unable to detect device type"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info("/dev/nonexistent", None)
        msg = r"Error getting SCSI SMART info: Smartctl open device: /dev/.* failed: No such device"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info("/dev/nonexistent", [BlockDev.ExtraArg.new("--device=scsi", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info("/dev/nonexistent", [BlockDev.ExtraArg.new("--device=ata", "")])

        # LIO device (SCSI)
        self._setup_lio()
        self.addCleanup(self._clean_lio)
        msg = r"Error getting SCSI SMART info: Some SMART or other ATA command to the disk failed, or there was a checksum error in a SMART data structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.lio_dev, None)
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.lio_dev, [BlockDev.ExtraArg.new("--device=scsi", "")])
        msg = r"Error getting SCSI SMART info: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.lio_dev, [BlockDev.ExtraArg.new("--device=ata", "")])

        # loop device
        self._setup_loop()
        self.addCleanup(self._clean_loop)
        msg = r"Error getting SCSI SMART info: /dev/.*: Unable to detect device type"
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.loop_dev, None)
        msg = r"Error getting SCSI SMART info: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.loop_dev, [BlockDev.ExtraArg.new("--device=scsi", "")])
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.loop_dev, [BlockDev.ExtraArg.new("--device=ata", "")])

        # scsi_debug
        self.scsi_debug_dev = setup_scsi_debug()
        self.addCleanup(clean_scsi_debug, self.scsi_debug_dev)
        msg = r"Error getting SCSI SMART info: Some SMART or other ATA command to the disk failed, or there was a checksum error in a SMART data structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.scsi_debug_dev, None)
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.scsi_debug_dev, [BlockDev.ExtraArg.new("--device=scsi", "")])
        msg = r"Error getting SCSI SMART info: Device open failed or device did not return an IDENTIFY DEVICE structure."
        with self.assertRaisesRegex(GLib.GError, msg):
            BlockDev.smart_scsi_get_info(self.scsi_debug_dev, [BlockDev.ExtraArg.new("--device=ata", "")])

    @tag_test(TestTags.CORE)
    def test_scsi_real_dumps(self):
        """Test SMART SCSI info on supplied JSON dumps (from real devices)"""

        with fake_utils("tests/fake_utils/smartctl"):
            for d in self.SCSI_JSON_DUMPS:
                data = BlockDev.smart_scsi_get_info(d)
                self.assertIsNotNone(data)
                self.assertTrue(data.overall_status_passed)
                self.assertTrue(data.smart_supported)
                self.assertGreater(data.power_on_time, 0)
                self.assertGreater(data.temperature, 0)
                self.assertGreater(data.temperature_drive_trip, 0)
                self.assertGreater(data.read_processed_bytes, 1000000)
                self.assertGreater(data.write_processed_bytes, 1000000)
