#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
import random

from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc

from telemetry_tempest_plugin.aodh.api import base

CONF = config.CONF


class TelemetryAlarmingAPITest(base.BaseAlarmingTest):

    @classmethod
    def resource_setup(cls):
        super(TelemetryAlarmingAPITest, cls).resource_setup()

        if CONF.alarming_plugin.create_alarms:
            cls.rule = {
                "event_type": "compute.instance.*",
                "query": [
                    {"field": "traits.name",
                     "type": "string",
                     "op": "eq",
                     "value": "test"}]
            }
            for i in range(2):
                cls.create_alarm(event_rule=cls.rule)

    @decorators.idempotent_id('1c918e06-210b-41eb-bd45-14676dd77cd7')
    def test_alarm_list(self):
        # List alarms
        alarm_list = self.alarming_client.list_alarms()

        # Verify created alarm in the list
        fetched_ids = [a['alarm_id'] for a in alarm_list]
        missing_alarms = [a for a in self.alarm_ids if a not in fetched_ids]
        self.assertEqual(0, len(missing_alarms),
                         "Failed to find the following created alarm(s)"
                         " in a fetched list: %s" %
                         ', '.join(str(a) for a in missing_alarms))

    @decorators.idempotent_id('1297b095-39c1-4e74-8a1f-4ae998cedd68')
    def test_create_update_get_delete_alarm(self):
        # Create an alarm
        alarm_name = data_utils.rand_name('telemetry_alarm')
        body = self.alarming_client.create_alarm(
            name=alarm_name, type='event',
            event_rule=self.rule)
        self.assertEqual(alarm_name, body['name'])
        alarm_id = body['alarm_id']
        self.assertLessEqual(self.rule.items(), body['event_rule'].items())

        # Update alarm with same rule and name
        body = self.alarming_client.update_alarm(
            alarm_id,
            event_rule=self.rule,
            name=alarm_name,
            type='event')
        self.assertEqual(alarm_name, body['name'])
        self.assertLessEqual(self.rule.items(), body['event_rule'].items())

        # Update alarm with new rule and new name
        new_rule = {
            "event_type": "compute.instance.create",
            "query": [
                {"field": "traits.name",
                 "type": "string",
                 "op": "eq",
                 "value": "test"}]
        }
        alarm_name_updated = data_utils.rand_name('telemetry-alarm-update')
        body = self.alarming_client.update_alarm(
            alarm_id,
            event_rule=new_rule,
            name=alarm_name_updated,
            type='event')
        self.assertEqual(alarm_name_updated, body['name'])
        self.assertLessEqual(new_rule.items(), body['event_rule'].items())

        # Update severity
        body = self.alarming_client.update_alarm(
            alarm_id,
            event_rule=new_rule,
            name=alarm_name_updated,
            type='event',
            severity='low')

        # Get and verify details of an alarm after update
        body = self.alarming_client.show_alarm(alarm_id)
        self.assertEqual(alarm_name_updated, body['name'])
        self.assertLessEqual(new_rule.items(), body['event_rule'].items())
        self.assertEqual('low', body['severity'])

        # Get history for the alarm and verify the same
        body = self.alarming_client.show_alarm_history(alarm_id)
        self.assertEqual("rule change", body[0]['type'])
        self.assertIn(alarm_name_updated, body[0]['detail'])
        self.assertEqual("creation", body[1]['type'])
        self.assertIn(alarm_name, body[1]['detail'])

        # Query by state and verify
        query = ['state', 'eq', 'insufficient data']
        body = self.alarming_client.list_alarms(query)
        self.assertNotEqual(0, len(body))
        self.assertEqual(set(['insufficient data']),
                         set(alarm['state'] for alarm in body))

        # Query by type and verify
        query = ['type', 'eq', 'event']
        body = self.alarming_client.list_alarms(query)
        self.assertNotEqual(0, len(body))
        self.assertEqual(set(['event']), set(alarm['type'] for alarm in body))

        # Delete alarm and verify if deleted
        self.alarming_client.delete_alarm(alarm_id)
        self.assertRaises(lib_exc.NotFound,
                          self.alarming_client.show_alarm, alarm_id)

    @decorators.idempotent_id('b666f7d5-ce8c-4b2d-887e-15993433c2e9')
    def test_create_query_delete_disabled_alarm(self):
        # Create an alarm
        alarm_name = data_utils.rand_name('telemetry_alarm')
        body = self.alarming_client.create_alarm(
            name=alarm_name, type='event', enabled=False,
            event_rule=self.rule)
        self.assertEqual(alarm_name, body['name'])
        alarm_id = body['alarm_id']
        self.assertLessEqual(self.rule.items(), body['event_rule'].items())
        self.assertFalse(body['enabled'])

        # Query by enabled false and verify
        query = ['enabled', 'eq', 'false']
        body = self.alarming_client.list_alarms(query)
        self.assertNotEqual(0, len(body))
        self.assertEqual(set([False]),
                         set(alarm['enabled'] for alarm in body))

        # Delete alarm and verify if deleted
        self.alarming_client.delete_alarm(alarm_id)
        self.assertRaises(lib_exc.NotFound,
                          self.alarming_client.show_alarm, alarm_id)

    @decorators.idempotent_id('bf44b72f-0384-4b34-ab78-bdfd3bf1b16c')
    def test_create_delete_alarm_defaults(self):
        # Create an alarm
        alarm_name = data_utils.rand_name('telemetry_alarm')
        body = self.alarming_client.create_alarm(
            name=alarm_name, type='event',
            event_rule=self.rule)
        self.assertEqual(alarm_name, body['name'])
        alarm_id = body['alarm_id']
        self.assertLessEqual(self.rule.items(), body['event_rule'].items())

        # Verify default
        expected_defaults = {
            'enabled': True,
            'ok_actions': [],
            'alarm_actions': [],
            'insufficient_data_actions': [],
            'repeat_actions': False,
        }
        for key in expected_defaults:
            self.assertEqual(expected_defaults[key], body[key])

        # Delete alarm and verify if deleted
        self.alarming_client.delete_alarm(alarm_id)
        self.assertRaises(lib_exc.NotFound,
                          self.alarming_client.show_alarm, alarm_id)

    @decorators.idempotent_id('0a883365-7158-4dbb-a946-3b07dc171c93')
    def test_create_excercise_delete_alarm_state(self):
        # Create an alarm
        alarm_name = data_utils.rand_name('telemetry_alarm')
        body = self.alarming_client.create_alarm(
            name=alarm_name, type='event',
            event_rule=self.rule)
        self.assertEqual(alarm_name, body['name'])
        alarm_id = body['alarm_id']
        self.assertLessEqual(self.rule.items(), body['event_rule'].items())

        # Verify initial state
        body = self.alarming_client.show_alarm(alarm_id)
        self.assertEqual("insufficient data", body['state'])
        self.assertEqual("Not evaluated yet", body['state_reason'])

        # Update state and verify
        self.alarming_client.alarm_set_state(alarm_id, state='ok')
        body = self.alarming_client.show_alarm(alarm_id)
        self.assertEqual('ok', body['state'])
        self.assertEqual('Manually set via API', body['state_reason'])

        # Update state and verify reason read only
        body['state'] = 'alarm'
        body['state_reason'] = 'Oops!'
        self.alarming_client.update_alarm(**body)
        body = self.alarming_client.show_alarm(alarm_id)
        self.assertEqual('alarm', body['state'])
        self.assertEqual('Manually set via API', body['state_reason'])

        # Delete alarm and verify if deleted
        self.alarming_client.delete_alarm(alarm_id)
        self.assertRaises(lib_exc.NotFound,
                          self.alarming_client.show_alarm, alarm_id)

    @decorators.idempotent_id('ced16dd6-cbd8-4a2b-a9c3-aed6e4a7102c')
    def test_create_query_delete_alarm_same_name(self):
        # Create two alarms with same name
        alarm_name = data_utils.rand_name('telemetry_alarm')
        body1 = self.alarming_client.create_alarm(
            name=alarm_name, type='event',
            event_rule=self.rule)
        body2 = self.alarming_client.create_alarm(
            name=alarm_name, type='event',
            event_rule=self.rule)
        alarm1_id = body1['alarm_id']
        alarm2_id = body2['alarm_id']
        self.assertEqual(alarm_name, body1['name'])
        self.assertEqual(alarm_name, body2['name'])
        self.assertNotEqual(alarm1_id, alarm2_id)

        # Query by name and verify
        query = ['name', 'eq', alarm_name]
        body = self.alarming_client.list_alarms(query)
        self.assertEqual(2, len(body))
        self.assertEqual(set([alarm_name]),
                         set(alarm['name'] for alarm in body))

        # Delete alarms and verify if deleted
        self.alarming_client.delete_alarm(alarm1_id)
        self.assertRaises(lib_exc.NotFound,
                          self.alarming_client.show_alarm, alarm1_id)
        self.alarming_client.delete_alarm(alarm2_id)
        self.assertRaises(lib_exc.NotFound,
                          self.alarming_client.show_alarm, alarm2_id)

    @decorators.idempotent_id('aca49486-70bb-4016-87e0-f6131374f742')
    def test_set_get_alarm_state(self):
        alarm_states = ['ok', 'alarm', 'insufficient data']
        alarm = self.create_alarm(event_rule=self.rule)

        # Set alarm state and verify
        new_state =\
            [elem for elem in alarm_states if elem != alarm['state']][0]
        state = self.alarming_client.alarm_set_state(alarm['alarm_id'],
                                                     new_state)

        self.assertEqual(new_state, state.data)
        # Get alarm state and verify
        state = self.alarming_client.show_alarm_state(alarm['alarm_id'])
        self.assertEqual(new_state, state.data)

    @decorators.idempotent_id('0cc2f5d1-6f48-4274-bfa8-f62f82eab6ed')
    def test_get_capabilities(self):

        response = self.alarming_client.show_capabilities()
        self.assertIsNotNone(response)
        self.assertNotEqual({}, response)
        self.assertIn('api', response)
        self.assertIn('alarm_storage', response)

    @decorators.idempotent_id('d42d0103-0497-4109-9746-dacaa17e831c')
    def test_get_versions(self):

        response = self.alarming_client.show_version()
        media_types = [
            {
                'base': 'application/json',
                'type': 'application/vnd.openstack.telemetry-v2+json'
            }, {
                'base': 'application/xml',
                'type': 'application/vnd.openstack.telemetry-v2+xml'
            }
        ]
        self.assertIsNotNone(response)
        self.assertNotEqual({}, response)
        self.assertEqual('v2', response['versions']['values'][0]['id'])
        self.assertIn('links', response['versions']['values'][0])
        self.assertEqual(media_types, response['versions']['values'][0][
            'media-types'])
        self.assertIn('status', response['versions']['values'][0])
        self.assertIn('updated', response['versions']['values'][0])

    @decorators.idempotent_id('c1dcefdf-3b96-40d0-8f39-04fc0702ab6b')
    def test_create_n_delete_alarm_rule_loadbalancer(self):
        # create dual actions
        alarm_name = data_utils.rand_name('telemetry_alarm')
        rule = {
            "pool_id": "2177ccd8-b09c-417a-89a0-e8d2419be612",
            "stack_id": "1b974012-ebcb-4888-8ae2-47714d4d2c4d",
            "autoscaling_group_id": "681c9266-61d2-4c9a-ad18-526807f6adc0"
        }
        body = self.alarming_client.create_alarm(
            name=alarm_name,
            type='loadbalancer_member_health',
            loadbalancer_member_health_rule=rule
        )
        alarm_id = body['alarm_id']
        alarms = self.alarming_client.list_alarms(['name', 'eq', alarm_name])
        self.assertEqual(1, len(alarms))

        # Check the actions are empty
        self.assertEqual([], alarms[0]['alarm_actions'])

        # Delete alarm and verify if deleted
        self.alarming_client.delete_alarm(alarm_id)
        self.assertRaises(lib_exc.NotFound,
                          self.alarming_client.show_alarm, alarm_id)

    @decorators.idempotent_id('e1d65c3c-a64d-4968-949c-96f2b2d8b363')
    def test_create_list_sort_limit_delete_alarm(self):
        # create test alarms
        alarms = {}
        for i in range(3):
            alarm_name = data_utils.rand_name('sorted_alarms')
            alarms[alarm_name] = []
            for j in range(random.randint(2, 4)):
                body = self.alarming_client.create_alarm(
                    name=alarm_name, type='event',
                    event_rule=self.rule)
                alarms[alarm_name].append(body['alarm_id'])
        ordered_alarms = []
        for key in sorted(alarms):
            ordered_alarms.extend([(key, a) for a in sorted(alarms[key])])

        # Sort by name and verify
        sort = ['name:asc']
        body = self.alarming_client.list_alarms(sort=sort)
        self.assertEqual([alarm[0] for alarm in ordered_alarms],
                         [alarm['name'] for alarm in body if alarm[
                             'name'].startswith('tempest-sorted_alarms')])
        sort = ['name']
        body = self.alarming_client.list_alarms(sort=sort)
        self.assertEqual([alarm[0] for alarm in ordered_alarms],
                         [alarm['name'] for alarm in body if alarm[
                             'name'].startswith('tempest-sorted_alarms')])

        # multiple sorts
        sort = ['name:asc', 'alarm_id:asc']
        body = self.alarming_client.list_alarms(sort=sort)
        name_ids = [(a['name'], a['alarm_id']) for a in body if a[
            'name'].startswith('tempest-sorted_alarms')]
        self.assertEqual(ordered_alarms, name_ids)

        # limit and sort
        sort = ['name:asc', 'alarm_id:asc']
        limit = 2
        body = self.alarming_client.list_alarms(limit=limit)
        self.assertEqual(2, len(body))
        body = self.alarming_client.list_alarms(sort=sort, limit=limit)
        self.assertEqual(2, len(body))
        self.assertEqual([ordered_alarms[0][0], ordered_alarms[1][0]],
                         [body[0]['name'], body[1]['name']])
        body = self.alarming_client.list_alarms(
            sort=sort, marker=ordered_alarms[1][1])
        name_ids = [(a['name'], a['alarm_id']) for a in body if a[
            'name'].startswith('tempest-sorted_alarms')]
        self.assertEqual(ordered_alarms[2:], name_ids)

        # Delete alarms and verify if deleted
        for name, alarm_id in ordered_alarms:
            self.alarming_client.delete_alarm(alarm_id)
            self.assertRaises(lib_exc.NotFound,
                              self.alarming_client.show_alarm, alarm_id)
