#   Copyright 2015 iWeb Technologies Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain
#   a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.
#

import copy
from unittest import mock
from unittest.mock import call

from osc_lib.cli import format_columns
from osc_lib import exceptions
from osc_lib import utils

from openstackclient.tests.unit.volume.v1 import fakes as volume_fakes
from openstackclient.volume.v1 import qos_specs


class TestQos(volume_fakes.TestVolumev1):

    def setUp(self):
        super(TestQos, self).setUp()

        self.qos_mock = self.app.client_manager.volume.qos_specs
        self.qos_mock.reset_mock()

        self.types_mock = self.app.client_manager.volume.volume_types
        self.types_mock.reset_mock()


class TestQosAssociate(TestQos):

    volume_type = volume_fakes.FakeVolumeType.create_one_volume_type()
    qos_spec = volume_fakes.FakeQos.create_one_qos()

    def setUp(self):
        super(TestQosAssociate, self).setUp()

        self.qos_mock.get.return_value = self.qos_spec
        self.types_mock.get.return_value = self.volume_type
        # Get the command object to test
        self.cmd = qos_specs.AssociateQos(self.app, None)

    def test_qos_associate(self):
        arglist = [
            self.qos_spec.id,
            self.volume_type.id
        ]
        verifylist = [
            ('qos_spec', self.qos_spec.id),
            ('volume_type', self.volume_type.id)
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.associate.assert_called_with(
            self.qos_spec.id,
            self.volume_type.id
        )
        self.assertIsNone(result)


class TestQosCreate(TestQos):

    columns = (
        'consumer',
        'id',
        'name',
        'properties'
    )

    def setUp(self):
        super(TestQosCreate, self).setUp()
        self.new_qos_spec = volume_fakes.FakeQos.create_one_qos()
        self.datalist = (
            self.new_qos_spec.consumer,
            self.new_qos_spec.id,
            self.new_qos_spec.name,
            format_columns.DictColumn(self.new_qos_spec.specs)
        )
        self.qos_mock.create.return_value = self.new_qos_spec
        # Get the command object to test
        self.cmd = qos_specs.CreateQos(self.app, None)

    def test_qos_create_without_properties(self):
        arglist = [
            self.new_qos_spec.name,
        ]
        verifylist = [
            ('name', self.new_qos_spec.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.qos_mock.create.assert_called_with(
            self.new_qos_spec.name,
            {'consumer': 'both'}
        )

        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.datalist, data)

    def test_qos_create_with_consumer(self):
        arglist = [
            '--consumer', self.new_qos_spec.consumer,
            self.new_qos_spec.name,
        ]
        verifylist = [
            ('consumer', self.new_qos_spec.consumer),
            ('name', self.new_qos_spec.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.qos_mock.create.assert_called_with(
            self.new_qos_spec.name,
            {'consumer': self.new_qos_spec.consumer}
        )
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.datalist, data)

    def test_qos_create_with_properties(self):
        arglist = [
            '--consumer', self.new_qos_spec.consumer,
            '--property', 'foo=bar',
            '--property', 'iops=9001',
            self.new_qos_spec.name,
        ]
        verifylist = [
            ('consumer', self.new_qos_spec.consumer),
            ('property', self.new_qos_spec.specs),
            ('name', self.new_qos_spec.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.new_qos_spec.specs.update(
            {'consumer': self.new_qos_spec.consumer})
        self.qos_mock.create.assert_called_with(
            self.new_qos_spec.name,
            self.new_qos_spec.specs
        )

        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.datalist, data)


class TestQosDelete(TestQos):

    qos_specs = volume_fakes.FakeQos.create_qoses(count=2)

    def setUp(self):
        super(TestQosDelete, self).setUp()

        self.qos_mock.get = (
            volume_fakes.FakeQos.get_qoses(self.qos_specs))
        # Get the command object to test
        self.cmd = qos_specs.DeleteQos(self.app, None)

    def test_qos_delete_with_id(self):
        arglist = [
            self.qos_specs[0].id
        ]
        verifylist = [
            ('qos_specs', [self.qos_specs[0].id])
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.delete.assert_called_with(self.qos_specs[0].id, False)
        self.assertIsNone(result)

    def test_qos_delete_with_name(self):
        arglist = [
            self.qos_specs[0].name
        ]
        verifylist = [
            ('qos_specs', [self.qos_specs[0].name])
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.delete.assert_called_with(self.qos_specs[0].id, False)
        self.assertIsNone(result)

    def test_qos_delete_with_force(self):
        arglist = [
            '--force',
            self.qos_specs[0].id
        ]
        verifylist = [
            ('force', True),
            ('qos_specs', [self.qos_specs[0].id])
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.delete.assert_called_with(self.qos_specs[0].id, True)
        self.assertIsNone(result)

    def test_delete_multiple_qoses(self):
        arglist = []
        for q in self.qos_specs:
            arglist.append(q.id)
        verifylist = [
            ('qos_specs', arglist),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        calls = []
        for q in self.qos_specs:
            calls.append(call(q.id, False))
        self.qos_mock.delete.assert_has_calls(calls)
        self.assertIsNone(result)

    def test_delete_multiple_qoses_with_exception(self):
        arglist = [
            self.qos_specs[0].id,
            'unexist_qos',
        ]
        verifylist = [
            ('qos_specs', arglist),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        find_mock_result = [self.qos_specs[0], exceptions.CommandError]
        with mock.patch.object(utils, 'find_resource',
                               side_effect=find_mock_result) as find_mock:
            try:
                self.cmd.take_action(parsed_args)
                self.fail('CommandError should be raised.')
            except exceptions.CommandError as e:
                self.assertEqual(
                    '1 of 2 QoS specifications failed to delete.', str(e))

            find_mock.assert_any_call(self.qos_mock, self.qos_specs[0].id)
            find_mock.assert_any_call(self.qos_mock, 'unexist_qos')

            self.assertEqual(2, find_mock.call_count)
            self.qos_mock.delete.assert_called_once_with(
                self.qos_specs[0].id, False
            )


class TestQosDisassociate(TestQos):

    volume_type = volume_fakes.FakeVolumeType.create_one_volume_type()
    qos_spec = volume_fakes.FakeQos.create_one_qos()

    def setUp(self):
        super(TestQosDisassociate, self).setUp()

        self.qos_mock.get.return_value = self.qos_spec
        self.types_mock.get.return_value = self.volume_type
        # Get the command object to test
        self.cmd = qos_specs.DisassociateQos(self.app, None)

    def test_qos_disassociate_with_volume_type(self):
        arglist = [
            '--volume-type', self.volume_type.id,
            self.qos_spec.id,
        ]
        verifylist = [
            ('volume_type', self.volume_type.id),
            ('qos_spec', self.qos_spec.id),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.disassociate.assert_called_with(
            self.qos_spec.id,
            self.volume_type.id
        )
        self.assertIsNone(result)

    def test_qos_disassociate_with_all_volume_types(self):
        arglist = [
            '--all',
            self.qos_spec.id,
        ]
        verifylist = [
            ('qos_spec', self.qos_spec.id)
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.disassociate_all.assert_called_with(self.qos_spec.id)
        self.assertIsNone(result)


class TestQosList(TestQos):

    qos_specs = volume_fakes.FakeQos.create_qoses(count=2)
    qos_association = volume_fakes.FakeQos.create_one_qos_association()

    columns = (
        'ID',
        'Name',
        'Consumer',
        'Associations',
        'Properties',
    )
    data = []
    for q in qos_specs:
        data.append((
            q.id,
            q.name,
            q.consumer,
            format_columns.ListColumn([qos_association.name]),
            format_columns.DictColumn(q.specs),
        ))

    def setUp(self):
        super(TestQosList, self).setUp()

        self.qos_mock.list.return_value = self.qos_specs
        self.qos_mock.get_associations.return_value = [self.qos_association]

        # Get the command object to test
        self.cmd = qos_specs.ListQos(self.app, None)

    def test_qos_list(self):
        arglist = []
        verifylist = []

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        self.qos_mock.list.assert_called_with()

        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_qos_list_no_association(self):
        self.qos_mock.reset_mock()
        self.qos_mock.get_associations.side_effect = [
            [self.qos_association],
            exceptions.NotFound("NotFound"),
        ]

        arglist = []
        verifylist = []

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        self.qos_mock.list.assert_called_with()

        self.assertEqual(self.columns, columns)

        ex_data = copy.deepcopy(self.data)
        ex_data[1] = (
            self.qos_specs[1].id,
            self.qos_specs[1].name,
            self.qos_specs[1].consumer,
            format_columns.ListColumn(None),
            format_columns.DictColumn(self.qos_specs[1].specs),
        )
        self.assertCountEqual(ex_data, list(data))


class TestQosSet(TestQos):

    qos_spec = volume_fakes.FakeQos.create_one_qos()

    def setUp(self):
        super(TestQosSet, self).setUp()

        self.qos_mock.get.return_value = self.qos_spec
        # Get the command object to test
        self.cmd = qos_specs.SetQos(self.app, None)

    def test_qos_set_with_properties_with_id(self):
        arglist = [
            '--property', 'foo=bar',
            '--property', 'iops=9001',
            self.qos_spec.id,
        ]
        verifylist = [
            ('property', self.qos_spec.specs),
            ('qos_spec', self.qos_spec.id),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.set_keys.assert_called_with(
            self.qos_spec.id,
            self.qos_spec.specs
        )
        self.assertIsNone(result)


class TestQosShow(TestQos):

    qos_spec = volume_fakes.FakeQos.create_one_qos()
    qos_association = volume_fakes.FakeQos.create_one_qos_association()

    def setUp(self):
        super(TestQosShow, self).setUp()
        self.qos_mock.get.return_value = self.qos_spec
        self.qos_mock.get_associations.return_value = [self.qos_association]
        # Get the command object to test
        self.cmd = qos_specs.ShowQos(self.app, None)

    def test_qos_show(self):
        arglist = [
            self.qos_spec.id
        ]
        verifylist = [
            ('qos_spec', self.qos_spec.id)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        self.qos_mock.get.assert_called_with(
            self.qos_spec.id
        )

        collist = (
            'associations',
            'consumer',
            'id',
            'name',
            'properties'
        )
        self.assertEqual(collist, columns)
        datalist = (
            format_columns.ListColumn([self.qos_association.name]),
            self.qos_spec.consumer,
            self.qos_spec.id,
            self.qos_spec.name,
            format_columns.DictColumn(self.qos_spec.specs),
        )
        self.assertCountEqual(datalist, tuple(data))


class TestQosUnset(TestQos):

    qos_spec = volume_fakes.FakeQos.create_one_qos()

    def setUp(self):
        super(TestQosUnset, self).setUp()

        self.qos_mock.get.return_value = self.qos_spec
        # Get the command object to test
        self.cmd = qos_specs.UnsetQos(self.app, None)

    def test_qos_unset_with_properties(self):
        arglist = [
            '--property', 'iops',
            '--property', 'foo',
            self.qos_spec.id,
        ]
        verifylist = [
            ('property', ['iops', 'foo']),
            ('qos_spec', self.qos_spec.id),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        self.qos_mock.unset_keys.assert_called_with(
            self.qos_spec.id,
            ['iops', 'foo']
        )
        self.assertIsNone(result)

    def test_qos_unset_nothing(self):
        arglist = [
            self.qos_spec.id,
        ]

        verifylist = [
            ('qos_spec', self.qos_spec.id),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        self.assertIsNone(result)
