# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.

__author__ = "Marco Trevisan"
__copyright__ = """
(c) 2021 Canonical Ltd.
(c) 2017 - 2022 Martin Pitt <martin@piware.de>
"""

import fcntl
import os
import shutil
import subprocess
import sys
import time
import unittest

import dbus
import dbus.mainloop.glib
from gi.repository import GLib

import dbusmock

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

have_monitor_sensor = shutil.which("monitor-sensor")


class TestIIOSensorsProxyBase(dbusmock.DBusTestCase):
    """Test mocking iio-sensors-proxy"""

    dbus_interface = ""

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.start_system_bus()
        cls.dbus_con = cls.get_dbus(True)

    def setUp(self):
        super().setUp()
        (self.p_mock, self.p_obj) = self.spawn_server_template("iio-sensors-proxy", {}, stdout=subprocess.PIPE)

    def tearDown(self):
        if self.p_mock:
            self.p_mock.stdout.close()
            self.p_mock.terminate()
            self.p_mock.wait()

        super().tearDown()

    def get_property(self, name):
        return self.p_obj.Get(self.dbus_interface, name, dbus_interface=dbus.PROPERTIES_IFACE)

    def get_internal_property(self, name):
        return self.p_obj.GetInternalProperty(name)

    def set_internal_property(self, name, value):
        return self.p_obj.SetInternalProperty(self.dbus_interface, name, value)

    def wait_for_properties_changed(self, max_wait=2000):
        changed_properties = []
        timeout_id = 0

        def on_properties_changed(interface, properties, _invalidated):
            nonlocal changed_properties

            if interface == self.dbus_interface:
                changed_properties = properties.keys()

        def on_timeout():
            nonlocal timeout_id

            timeout_id = 0

        loop = GLib.MainLoop()
        timeout_id = GLib.timeout_add(max_wait, on_timeout)
        match = self.p_obj.connect_to_signal("PropertiesChanged", on_properties_changed, dbus.PROPERTIES_IFACE)

        while not changed_properties and timeout_id != 0:
            loop.get_context().iteration(True)

        if timeout_id:
            GLib.source_remove(timeout_id)

        match.remove()

        return changed_properties

    def wait_for_property_changed(self, property_name, expected_value):
        self.assertIn(property_name, self.wait_for_properties_changed())
        self.assertEqual(self.get_internal_property(property_name), expected_value)


class TestIIOSensorsProxy(TestIIOSensorsProxyBase):
    """main SensorsProxy interface tests"""

    dbus_interface = "net.hadess.SensorProxy"

    def test_accelerometer_none(self):
        self.assertFalse(self.get_property("HasAccelerometer"))

    def test_accelerometer_claimed(self):
        self.p_obj.ClaimAccelerometer()
        self.assertTrue(self.get_internal_property("AccelerometerOwners"))

    def test_accelerometer_claimed_released(self):
        self.p_obj.ClaimAccelerometer()
        self.assertTrue(self.get_internal_property("AccelerometerOwners"))
        self.p_obj.ReleaseAccelerometer()
        self.assertFalse(self.get_internal_property("AccelerometerOwners"))

    def test_accelerometer_available(self):
        self.assertFalse(self.get_property("HasAccelerometer"))
        self.set_internal_property("HasAccelerometer", True)
        self.assertTrue(self.get_property("HasAccelerometer"))

    def test_accelerometer_property_with_no_sensor(self):
        with self.assertRaises(dbus.exceptions.DBusException) as ctx:
            self.set_internal_property("AccelerometerOrientation", "normal")
        self.assertEqual(ctx.exception.get_dbus_name(), "org.freedesktop.DBus.Mock.Error")
        self.assertIn("No accelerometer sensor available", ctx.exception.get_dbus_message().split("\n"))

    def test_accelerometer_claimed_properties_changes(self):
        self.set_internal_property("HasAccelerometer", True)
        self.p_obj.ClaimAccelerometer()
        self.set_internal_property("AccelerometerOrientation", "normal")
        self.wait_for_property_changed("AccelerometerOrientation", "normal")

    def test_accelerometer_unclaimed_properties_changes(self):
        self.set_internal_property("HasAccelerometer", True)
        self.assertTrue(self.get_property("HasAccelerometer"))
        self.set_internal_property("AccelerometerOrientation", "normal")
        self.assertFalse(self.wait_for_properties_changed(max_wait=500))
        self.assertEqual(self.get_property("AccelerometerOrientation"), "normal")

    def test_ambient_light_none(self):
        self.assertFalse(self.get_property("HasAmbientLight"))

    def test_ambient_light_claimed(self):
        self.p_obj.ClaimLight()
        self.assertTrue(self.get_internal_property("AmbientLightOwners"))
        self.assertFalse(self.get_property("HasAmbientLight"))

    def test_ambient_light_claimed_released(self):
        self.p_obj.ClaimLight()
        self.assertTrue(self.get_internal_property("AmbientLightOwners"))
        self.p_obj.ReleaseLight()
        self.assertFalse(self.get_internal_property("AmbientLightOwners"))

    def test_ambient_light_available(self):
        self.assertFalse(self.get_property("HasAmbientLight"))
        self.set_internal_property("HasAmbientLight", True)
        self.assertTrue(self.get_property("HasAmbientLight"))

    def test_ambient_light_property_with_no_sensor(self):
        with self.assertRaises(dbus.exceptions.DBusException) as ctx:
            self.set_internal_property("LightLevelUnit", "vendor")
        self.assertEqual(ctx.exception.get_dbus_name(), "org.freedesktop.DBus.Mock.Error")
        self.assertIn("No ambient_light sensor available", ctx.exception.get_dbus_message().split("\n"))
        with self.assertRaises(dbus.exceptions.DBusException) as ctx:
            self.set_internal_property("LightLevel", 0.5)
        self.assertEqual(ctx.exception.get_dbus_name(), "org.freedesktop.DBus.Mock.Error")
        self.assertIn("No ambient_light sensor available", ctx.exception.get_dbus_message().split("\n"))

    def test_ambient_light_claimed_properties_changes(self):
        self.set_internal_property("HasAmbientLight", True)
        self.p_obj.ClaimLight()
        self.set_internal_property("LightLevelUnit", "vendor")
        self.wait_for_property_changed("LightLevelUnit", "vendor")
        self.set_internal_property("LightLevel", 111100.0)
        self.wait_for_property_changed("LightLevel", 111100.0)

    def test_ambient_light_unclaimed_properties_changes(self):
        self.set_internal_property("HasAmbientLight", True)
        self.assertTrue(self.get_property("HasAmbientLight"))
        self.set_internal_property("LightLevelUnit", "vendor")
        self.assertFalse(self.wait_for_properties_changed(max_wait=500))
        self.assertEqual(self.get_property("LightLevelUnit"), "vendor")

    def test_proximity_none(self):
        self.assertFalse(self.get_property("HasProximity"))

    def test_proximity_claimed(self):
        self.p_obj.ClaimProximity()
        self.assertTrue(self.get_internal_property("ProximityOwners"))
        self.assertFalse(self.get_property("HasProximity"))

    def test_proximity_claimed_released(self):
        self.p_obj.ClaimProximity()
        self.assertTrue(self.get_internal_property("ProximityOwners"))
        self.assertFalse(self.get_property("HasProximity"))
        self.p_obj.ReleaseProximity()
        self.assertFalse(self.get_internal_property("ProximityOwners"))

    def test_proximity_available(self):
        self.assertFalse(self.get_property("HasProximity"))
        self.set_internal_property("HasProximity", True)
        self.assertTrue(self.get_property("HasProximity"))

    def test_proximity_property_with_no_sensor(self):
        with self.assertRaises(dbus.exceptions.DBusException) as ctx:
            self.set_internal_property("ProximityNear", True)
        self.assertEqual(ctx.exception.get_dbus_name(), "org.freedesktop.DBus.Mock.Error")
        self.assertIn("No proximity sensor available", ctx.exception.get_dbus_message().split("\n"))

    def test_proximity_claimed_properties_changes(self):
        self.set_internal_property("HasProximity", True)
        self.p_obj.ClaimProximity()
        self.set_internal_property("ProximityNear", True)
        self.wait_for_property_changed("ProximityNear", True)

    def test_proximity_unclaimed_properties_changes(self):
        self.set_internal_property("HasProximity", True)
        self.assertTrue(self.get_property("HasProximity"))
        self.set_internal_property("ProximityNear", True)
        self.assertFalse(self.wait_for_properties_changed(max_wait=500))
        self.assertTrue(self.get_property("ProximityNear"))


class TestIIOSensorsProxyCompass(TestIIOSensorsProxyBase):
    """main SensorsProxy compass interface tests"""

    dbus_interface = "net.hadess.SensorProxy.Compass"

    def test_compass_none(self):
        self.assertFalse(self.get_property("HasCompass"))

    def test_compass_claimed(self):
        self.p_obj.ClaimCompass()
        self.assertTrue(self.get_internal_property("CompassOwners"))
        self.assertFalse(self.get_property("HasCompass"))

    def test_compass_claimed_released(self):
        self.p_obj.ClaimCompass()
        self.assertTrue(self.get_internal_property("CompassOwners"))
        self.assertFalse(self.get_property("HasCompass"))
        self.p_obj.ReleaseCompass()
        self.assertFalse(self.get_internal_property("CompassOwners"))

    def test_compass_available(self):
        self.assertFalse(self.get_property("HasCompass"))
        self.set_internal_property("HasCompass", True)
        self.assertTrue(self.get_property("HasCompass"))

    def test_compass_property_with_no_sensor(self):
        with self.assertRaises(dbus.exceptions.DBusException) as ctx:
            self.set_internal_property("CompassHeading", 180)
        self.assertEqual(ctx.exception.get_dbus_name(), "org.freedesktop.DBus.Mock.Error")
        self.assertIn("No compass sensor available", ctx.exception.get_dbus_message().split("\n"))

    def test_compass_claimed_properties_changes(self):
        self.set_internal_property("HasCompass", True)
        self.p_obj.ClaimCompass()
        self.set_internal_property("CompassHeading", 55)
        self.wait_for_property_changed("CompassHeading", 55)

    def test_compass_unclaimed_properties_changes(self):
        self.set_internal_property("HasCompass", True)
        self.assertTrue(self.get_property("HasCompass"))
        self.set_internal_property("CompassHeading", 85)
        self.assertFalse(self.wait_for_properties_changed(max_wait=500))
        self.assertEqual(self.get_property("CompassHeading"), 85)


@unittest.skipUnless(have_monitor_sensor, "monitor-sensor utility not available")
class TestIIOSensorsProxyMonitorSensorBase(TestIIOSensorsProxyBase):
    """Base SensorsProxy interface tests using monitor-sensor"""

    p_monitor_sensor = None

    def start_monitor_sensor(self):
        self.assertIsNone(self.p_monitor_sensor)
        # pylint: disable=consider-using-with
        self.p_monitor_sensor = subprocess.Popen("monitor-sensor", stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        flags = fcntl.fcntl(self.p_monitor_sensor.stdout, fcntl.F_GETFL)
        fcntl.fcntl(self.p_monitor_sensor.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
        self.assertOutputContains(
            [
                "    Waiting for iio-sensor-proxy to appear",
                "+++ iio-sensor-proxy appeared",
            ]
        )

    def stop_monitor_sensor(self):
        self.assertIsNotNone(self.p_monitor_sensor)
        self.assertEmptyOutput()
        self.p_monitor_sensor.stdout.close()
        self.p_monitor_sensor.terminate()
        self.p_monitor_sensor.wait()

    def tearDown(self):
        if self.p_monitor_sensor:
            self.stop_monitor_sensor()
        super().tearDown()

    def assertOutputContains(self, expected_lines, max_wait=2000):
        self.assertIsNotNone(self.p_monitor_sensor)
        start_time = int(time.time() * 1000)
        for line in expected_lines:
            output = None
            while True:
                output = self.p_monitor_sensor.stdout.readline()
                if output:
                    break
                self.assertLessEqual(int(time.time() * 1000) - start_time, max_wait, msg="Timeout exceeded")
            self.assertEqual(output.decode("utf-8"), f"{line}\n")

    def assertOutputEquals(self, expected_lines, max_wait=2000):
        self.assertOutputContains(expected_lines, max_wait)
        self.assertEmptyOutput()

    def assertEmptyOutput(self, max_wait=100):
        start_time = int(time.time() * 1000)
        while int(time.time() * 1000) - start_time < max_wait:
            self.assertFalse(self.p_monitor_sensor.stdout.readline(), msg="Unexpected output")


class TestIIOSensorsProxyMonitorSensor(TestIIOSensorsProxyMonitorSensorBase):
    """main SensorsProxy interface tests using monitor-sensor"""

    dbus_interface = "net.hadess.SensorProxy"

    def test_accelerometer_added(self):
        self.set_internal_property("HasAccelerometer", True)
        self.start_monitor_sensor()

        self.assertOutputEquals(
            [
                "=== Has accelerometer (orientation: undefined)",
                "=== No ambient light sensor",
                "=== No proximity sensor",
            ]
        )

    def test_accelerometer_changes(self):
        self.test_accelerometer_added()
        self.set_internal_property("AccelerometerOrientation", "normal")
        self.set_internal_property("AccelerometerOrientation", "left-up")
        self.set_internal_property("AccelerometerOrientation", "bottom-up")
        self.assertOutputEquals(
            [
                "    Accelerometer orientation changed: normal",
                "    Accelerometer orientation changed: left-up",
                "    Accelerometer orientation changed: bottom-up",
            ]
        )

    def test_ambient_light_added(self):
        self.set_internal_property("HasAmbientLight", True)
        self.start_monitor_sensor()

        self.assertOutputEquals(
            [
                "=== No accelerometer",
                "=== Has ambient light sensor (value: 0.000000, unit: lux)",
                "=== No proximity sensor",
            ]
        )

    def test_ambient_light_changes(self):
        self.test_ambient_light_added()
        self.set_internal_property("LightLevelUnit", "vendor")
        self.set_internal_property("LightLevel", 0.3)
        self.set_internal_property("LightLevel", 0.5)
        self.set_internal_property("LightLevelUnit", "lux")
        self.set_internal_property("LightLevel", 111100.0)

        self.assertOutputEquals(
            [
                "    Light changed: 0.300000 (vendor)",
                "    Light changed: 0.500000 (vendor)",
                "    Light changed: 111100.000000 (lux)",
            ]
        )

    def test_proximity_sensor_added(self):
        self.set_internal_property("HasProximity", True)
        self.start_monitor_sensor()

        self.assertOutputEquals(
            [
                "=== No accelerometer",
                "=== No ambient light sensor",
                "=== Has proximity sensor (near: 0)",
            ]
        )

    def test_proximity_sensor_changes(self):
        self.test_proximity_sensor_added()

        self.set_internal_property("ProximityNear", True)
        self.set_internal_property("ProximityNear", False)

        self.assertOutputEquals(
            [
                "    Proximity value changed: 1",
                "    Proximity value changed: 0",
            ]
        )


if __name__ == "__main__":
    # avoid writing to stderr
    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout))
