# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from copy import deepcopy
from unittest import mock

from openstack import exceptions
from openstack.network.v2.firewall_group import FirewallGroup
from openstack.network.v2.firewall_policy import FirewallPolicy
from openstack.network.v2.firewall_rule import FirewallRule
from openstack.tests.unit import base


class FirewallTestCase(base.TestCase):
    def _make_mock_url(self, *args, **params):
        params_list = ['='.join([k, v]) for k, v in params.items()]
        return self.get_mock_url(
            'network',
            'public',
            append=['v2.0', 'fwaas'] + list(args),
            qs_elements=params_list or None,
        )


class TestFirewallRule(FirewallTestCase):
    firewall_rule_name = 'deny_ssh'
    firewall_rule_id = 'd525a9b2-ab28-493d-b988-b824c8c033b1'
    _mock_firewall_rule_attrs = {
        'action': 'deny',
        'description': 'Deny SSH access',
        'destination_ip_address': None,
        'destination_port': 22,
        'enabled': True,
        'id': firewall_rule_id,
        'ip_version': 4,
        'name': firewall_rule_name,
        'project_id': 'ef44f1efcb9548d9a441cdc252a979a6',
        'protocol': 'tcp',
        'shared': False,
        'source_ip_address': None,
        'source_port': None,
    }
    mock_firewall_rule = None

    def setUp(self, cloud_config_fixture='clouds.yaml'):
        super().setUp()
        self.mock_firewall_rule = FirewallRule(
            connection=self.cloud, **self._mock_firewall_rule_attrs
        ).to_dict()

    def test_create_firewall_rule(self):
        # attributes that are passed to the tested function
        passed_attrs = self._mock_firewall_rule_attrs.copy()
        del passed_attrs['id']

        self.register_uris(
            [
                # no validate due to added location key
                dict(
                    method='POST',
                    uri=self._make_mock_url('firewall_rules'),
                    json={'firewall_rule': self.mock_firewall_rule.copy()},
                )
            ]
        )
        r = self.cloud.create_firewall_rule(**passed_attrs)
        self.assertDictEqual(self.mock_firewall_rule, r.to_dict())
        self.assert_calls()

    def test_create_firewall_rule_bad_protocol(self):
        bad_rule = self._mock_firewall_rule_attrs.copy()
        del bad_rule['id']  # id not allowed
        bad_rule['ip_version'] = 5
        self.register_uris(
            [
                # no validate due to added location key
                dict(
                    method='POST',
                    uri=self._make_mock_url('firewall_rules'),
                    status_code=400,
                    json={},
                )
            ]
        )
        self.assertRaises(
            exceptions.BadRequestException,
            self.cloud.create_firewall_rule,
            **bad_rule,
        )
        self.assert_calls()

    def test_delete_firewall_rule(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', name=self.firewall_rule_name
                    ),
                    json={'firewall_rules': [self.mock_firewall_rule]},
                ),
                dict(
                    method='DELETE',
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_id
                    ),
                    json={},
                    status_code=204,
                ),
            ]
        )
        self.assertTrue(
            self.cloud.delete_firewall_rule(self.firewall_rule_name)
        )
        self.assert_calls()

    def test_delete_firewall_rule_filters(self):
        filters = {'project_id': self.mock_firewall_rule['project_id']}
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_name, **filters
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules',
                        name=self.firewall_rule_name,
                        **filters,
                    ),
                    json={'firewall_rules': [self.mock_firewall_rule]},
                ),
                dict(
                    method='DELETE',
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_id
                    ),
                    json={},
                    status_code=204,
                ),
            ]
        )
        self.assertTrue(
            self.cloud.delete_firewall_rule(self.firewall_rule_name, filters)
        )
        self.assert_calls()

    def test_delete_firewall_rule_not_found(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_rules'),
                    json={'firewall_rules': []},
                ),
            ]
        )

        with (
            mock.patch.object(self.cloud.network, 'delete_firewall_rule'),
            mock.patch.object(self.cloud.log, 'debug'),
        ):
            self.assertFalse(
                self.cloud.delete_firewall_rule(self.firewall_rule_name)
            )

            self.cloud.network.delete_firewall_rule.assert_not_called()
            self.cloud.log.debug.assert_called_once()

    def test_delete_firewall_multiple_matches(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', name=self.firewall_rule_name
                    ),
                    json={
                        'firewall_rules': [
                            self.mock_firewall_rule,
                            self.mock_firewall_rule,
                        ]
                    },
                ),
            ]
        )
        self.assertRaises(
            exceptions.DuplicateResource,
            self.cloud.delete_firewall_rule,
            self.firewall_rule_name,
        )
        self.assert_calls()

    def test_get_firewall_rule(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', name=self.firewall_rule_name
                    ),
                    json={'firewall_rules': [self.mock_firewall_rule]},
                ),
            ]
        )
        r = self.cloud.get_firewall_rule(self.firewall_rule_name)
        self.assertDictEqual(self.mock_firewall_rule, r)
        self.assert_calls()

    def test_get_firewall_rule_not_found(self):
        name = 'not_found'
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', name),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_rules', name=name),
                    json={'firewall_rules': []},
                ),
            ]
        )
        self.assertIsNone(self.cloud.get_firewall_rule(name))
        self.assert_calls()

    def test_list_firewall_rules(self):
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_rules'),
                    json={'firewall_rules': [self.mock_firewall_rule]},
                )
            ]
        )
        self.assertDictEqual(
            self.mock_firewall_rule, self.cloud.list_firewall_rules()[0]
        )
        self.assert_calls()

    def test_update_firewall_rule(self):
        params = {'description': 'UpdatedDescription'}
        updated = self.mock_firewall_rule.copy()
        updated.update(params)
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', name=self.firewall_rule_name
                    ),
                    json={'firewall_rules': [self.mock_firewall_rule]},
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_id
                    ),
                    json={'firewall_rule': updated},
                    validate=dict(json={'firewall_rule': params}),
                ),
            ]
        )
        self.assertDictEqual(
            updated,
            self.cloud.update_firewall_rule(self.firewall_rule_name, **params),
        )
        self.assert_calls()

    def test_update_firewall_rule_filters(self):
        params = {'description': 'Updated!'}
        filters = {'project_id': self.mock_firewall_rule['project_id']}
        updated = self.mock_firewall_rule.copy()
        updated.update(params)
        updated_dict = self._mock_firewall_rule_attrs.copy()
        updated_dict.update(params)
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_name, **filters
                    ),
                    json={'firewall_rule': self._mock_firewall_rule_attrs},
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_rules', self.firewall_rule_id
                    ),
                    json={'firewall_rule': updated_dict},
                    validate={
                        'json': {'firewall_rule': params},
                    },
                ),
            ]
        )
        updated_rule = self.cloud.update_firewall_rule(
            self.firewall_rule_name, filters, **params
        )
        self.assertDictEqual(updated, updated_rule)
        self.assert_calls()


class TestFirewallPolicy(FirewallTestCase):
    firewall_policy_id = '78d05d20-d406-41ec-819d-06b65c2684e4'
    firewall_policy_name = 'block_popular_services'
    _mock_firewall_policy_attrs = {
        'audited': True,
        'description': 'block ports of well-known services',
        'firewall_rules': ['deny_ssh'],
        'id': firewall_policy_id,
        'name': firewall_policy_name,
        'project_id': 'b64238cb-a25d-41af-9ee1-42deb4587d20',
        'shared': False,
    }
    mock_firewall_policy = None

    def setUp(self, cloud_config_fixture='clouds.yaml'):
        super().setUp()
        self.mock_firewall_policy = FirewallPolicy(
            connection=self.cloud, **self._mock_firewall_policy_attrs
        ).to_dict()

    def test_create_firewall_policy(self):
        # attributes that are passed to the tested method
        passed_attrs = deepcopy(self._mock_firewall_policy_attrs)
        del passed_attrs['id']

        # policy that is returned by the POST request
        created_attrs = deepcopy(self._mock_firewall_policy_attrs)
        created_attrs['firewall_rules'][0] = TestFirewallRule.firewall_rule_id
        created_policy = FirewallPolicy(connection=self.cloud, **created_attrs)

        # attributes used to validate the request inside register_uris()
        validate_attrs = deepcopy(created_attrs)
        del validate_attrs['id']

        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', TestFirewallRule.firewall_rule_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules',
                        name=TestFirewallRule.firewall_rule_name,
                    ),
                    json={
                        'firewall_rules': [
                            TestFirewallRule._mock_firewall_rule_attrs
                        ]
                    },
                ),
                dict(
                    method='POST',
                    uri=self._make_mock_url('firewall_policies'),
                    json={'firewall_policy': created_attrs},
                    validate=dict(json={'firewall_policy': validate_attrs}),
                ),
            ]
        )
        res = self.cloud.create_firewall_policy(**passed_attrs)
        self.assertDictEqual(created_policy, res.to_dict())
        self.assert_calls()

    def test_create_firewall_policy_rule_not_found(self):
        posted_policy = deepcopy(self._mock_firewall_policy_attrs)
        del posted_policy['id']
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_rules', posted_policy['firewall_rules'][0]
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules',
                        name=posted_policy['firewall_rules'][0],
                    ),
                    json={'firewall_rules': []},
                ),
            ]
        )

        with mock.patch.object(self.cloud.network, 'create_firewall_policy'):
            self.assertRaises(
                exceptions.NotFoundException,
                self.cloud.create_firewall_policy,
                **posted_policy,
            )
            self.cloud.network.create_firewall_policy.assert_not_called()
            self.assert_calls()

    def test_delete_firewall_policy(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={'firewall_policies': [self.mock_firewall_policy]},
                ),
                dict(
                    method='DELETE',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={},
                    status_code=204,
                ),
            ]
        )

        with mock.patch.object(self.cloud.log, 'debug'):
            self.assertTrue(
                self.cloud.delete_firewall_policy(self.firewall_policy_name)
            )
            self.assert_calls()
            self.cloud.log.debug.assert_not_called()

    def test_delete_firewall_policy_filters(self):
        filters = {'project_id': self.mock_firewall_policy['project_id']}
        self.register_uris(
            [
                dict(
                    method='DELETE',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={},
                    status_code=204,
                )
            ]
        )

        with (
            mock.patch.object(
                self.cloud.network,
                'find_firewall_policy',
                return_value=self.mock_firewall_policy,
            ),
            mock.patch.object(self.cloud.log, 'debug'),
        ):
            self.assertTrue(
                self.cloud.delete_firewall_policy(
                    self.firewall_policy_name, filters
                )
            )
            self.assert_calls()
            self.cloud.network.find_firewall_policy.assert_called_once_with(
                self.firewall_policy_name, ignore_missing=False, **filters
            )
            self.cloud.log.debug.assert_not_called()

    def test_delete_firewall_policy_not_found(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={'firewall_policies': []},
                ),
            ]
        )

        with mock.patch.object(self.cloud.log, 'debug'):
            self.assertFalse(
                self.cloud.delete_firewall_policy(self.firewall_policy_name)
            )
            self.assert_calls()
            self.cloud.log.debug.assert_called_once()

    def test_get_firewall_policy(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={'firewall_policies': [self.mock_firewall_policy]},
                ),
            ]
        )
        self.assertDictEqual(
            self.mock_firewall_policy,
            self.cloud.get_firewall_policy(self.firewall_policy_name),
        )
        self.assert_calls()

    def test_get_firewall_policy_not_found(self):
        name = 'not_found'
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_policies', name),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_policies', name=name),
                    json={'firewall_policies': []},
                ),
            ]
        )
        self.assertIsNone(self.cloud.get_firewall_policy(name))
        self.assert_calls()

    def test_list_firewall_policies(self):
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_policies'),
                    json={
                        'firewall_policies': [
                            self.mock_firewall_policy.copy(),
                            self.mock_firewall_policy.copy(),
                        ]
                    },
                )
            ]
        )
        policy = FirewallPolicy(
            connection=self.cloud, **self.mock_firewall_policy
        )
        self.assertListEqual(
            self.cloud.list_firewall_policies(), [policy, policy]
        )
        self.assert_calls()

    def test_list_firewall_policies_filters(self):
        filters = {'project_id': self.mock_firewall_policy['project_id']}
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_policies', **filters),
                    json={'firewall_policies': [self.mock_firewall_policy]},
                )
            ]
        )
        self.assertListEqual(
            self.cloud.list_firewall_policies(filters),
            [
                FirewallPolicy(
                    connection=self.cloud, **self.mock_firewall_policy
                )
            ],
        )
        self.assert_calls()

    def test_update_firewall_policy(self):
        lookup_rule = FirewallRule(
            connection=self.cloud, **TestFirewallRule._mock_firewall_rule_attrs
        ).to_dict()
        params = {
            'firewall_rules': [lookup_rule['id']],
            'description': 'updated!',
        }
        retrieved_policy = deepcopy(self.mock_firewall_policy)
        del retrieved_policy['firewall_rules'][0]
        updated_policy = deepcopy(self.mock_firewall_policy)
        updated_policy.update(params)
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={'firewall_policies': [retrieved_policy]},
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', lookup_rule['id']
                    ),
                    json={'firewall_rule': lookup_rule},
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={'firewall_policy': updated_policy},
                    validate=dict(json={'firewall_policy': params}),
                ),
            ]
        )
        self.assertDictEqual(
            updated_policy,
            self.cloud.update_firewall_policy(
                self.firewall_policy_name, **params
            ),
        )
        self.assert_calls()

    def test_update_firewall_policy_no_rules(self):
        params = {'description': 'updated!'}
        updated_policy = deepcopy(self.mock_firewall_policy)
        updated_policy.update(params)
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={
                        'firewall_policies': [
                            deepcopy(self.mock_firewall_policy)
                        ]
                    },
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={'firewall_policy': updated_policy},
                    validate=dict(json={'firewall_policy': params}),
                ),
            ]
        )
        self.assertDictEqual(
            updated_policy,
            self.cloud.update_firewall_policy(
                self.firewall_policy_name, **params
            ),
        )
        self.assert_calls()

    def test_update_firewall_policy_filters(self):
        filters = {'project_id': self.mock_firewall_policy['project_id']}
        params = {'description': 'updated!'}
        updated_policy = deepcopy(self.mock_firewall_policy)
        updated_policy.update(params)

        self.register_uris(
            [
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={'firewall_policy': updated_policy},
                    validate=dict(json={'firewall_policy': params}),
                ),
            ]
        )

        with mock.patch.object(
            self.cloud.network,
            'find_firewall_policy',
            return_value=deepcopy(self.mock_firewall_policy),
        ):
            self.assertDictEqual(
                updated_policy,
                self.cloud.update_firewall_policy(
                    self.firewall_policy_name, filters, **params
                ),
            )
            self.assert_calls()
            self.cloud.network.find_firewall_policy.assert_called_once_with(
                self.firewall_policy_name, ignore_missing=False, **filters
            )

    def test_insert_rule_into_policy(self):
        rule0 = FirewallRule(
            connection=self.cloud, **TestFirewallRule._mock_firewall_rule_attrs
        )

        _rule1_attrs = deepcopy(TestFirewallRule._mock_firewall_rule_attrs)
        _rule1_attrs.update(
            id='8068fc06-0e72-43f2-a76f-a51a33b46e08', name='after_rule'
        )
        rule1 = FirewallRule(**_rule1_attrs)

        _rule2_attrs = deepcopy(TestFirewallRule._mock_firewall_rule_attrs)
        _rule2_attrs.update(
            id='c716382d-183b-475d-b500-dcc762f45ce3', name='before_rule'
        )
        rule2 = FirewallRule(**_rule2_attrs)
        retrieved_policy = deepcopy(self.mock_firewall_policy)
        retrieved_policy['firewall_rules'] = [rule1['id'], rule2['id']]
        updated_policy = deepcopy(self.mock_firewall_policy)
        updated_policy['firewall_rules'] = [
            rule0['id'],
            rule1['id'],
            rule2['id'],
        ]
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',  # get policy
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={'firewall_policies': [retrieved_policy]},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', rule0['name']),
                    status_code=404,
                ),
                dict(
                    method='GET',  # get rule to add
                    uri=self._make_mock_url(
                        'firewall_rules', name=rule0['name']
                    ),
                    json={'firewall_rules': [rule0]},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', rule1['name']),
                    status_code=404,
                ),
                dict(
                    method='GET',  # get after rule
                    uri=self._make_mock_url(
                        'firewall_rules', name=rule1['name']
                    ),
                    json={'firewall_rules': [rule1]},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', rule2['name']),
                    status_code=404,
                ),
                dict(
                    method='GET',  # get before rule
                    uri=self._make_mock_url(
                        'firewall_rules', name=rule2['name']
                    ),
                    json={'firewall_rules': [rule2]},
                ),
                dict(
                    method='PUT',  # add rule
                    uri=self._make_mock_url(
                        'firewall_policies',
                        self.firewall_policy_id,
                        'insert_rule',
                    ),
                    json=updated_policy,
                    validate=dict(
                        json={
                            'firewall_rule_id': rule0['id'],
                            'insert_after': rule1['id'],
                            'insert_before': rule2['id'],
                        }
                    ),
                ),
            ]
        )
        r = self.cloud.insert_rule_into_policy(
            name_or_id=self.firewall_policy_name,
            rule_name_or_id=rule0['name'],
            insert_after=rule1['name'],
            insert_before=rule2['name'],
        )
        self.assertDictEqual(updated_policy, r.to_dict())
        self.assert_calls()

    def test_insert_rule_into_policy_compact(self):
        """
        Tests without insert_after and insert_before
        """
        rule = FirewallRule(**TestFirewallRule._mock_firewall_rule_attrs)
        retrieved_policy = deepcopy(self.mock_firewall_policy)
        retrieved_policy['firewall_rules'] = []
        updated_policy = deepcopy(retrieved_policy)
        updated_policy['firewall_rules'].append(rule['id'])
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={'firewall_policies': [retrieved_policy]},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', rule['name']),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', name=rule['name']
                    ),
                    json={'firewall_rules': [rule]},
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_policies',
                        retrieved_policy['id'],
                        'insert_rule',
                    ),
                    json=updated_policy,
                    validate=dict(
                        json={
                            'firewall_rule_id': rule['id'],
                            'insert_after': None,
                            'insert_before': None,
                        }
                    ),
                ),
            ]
        )
        r = self.cloud.insert_rule_into_policy(
            self.firewall_policy_name, rule['name']
        )
        self.assertDictEqual(updated_policy, r.to_dict())
        self.assert_calls()

    def test_insert_rule_into_policy_not_found(self):
        policy_name = 'bogus_policy'
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_policies', policy_name),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=policy_name
                    ),
                    json={'firewall_policies': []},
                ),
            ]
        )

        with mock.patch.object(self.cloud.network, 'find_firewall_rule'):
            self.assertRaises(
                exceptions.NotFoundException,
                self.cloud.insert_rule_into_policy,
                policy_name,
                'bogus_rule',
            )
            self.assert_calls()
            self.cloud.network.find_firewall_rule.assert_not_called()

    def test_insert_rule_into_policy_rule_not_found(self):
        rule_name = 'unknown_rule'
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={'firewall_policy': self.mock_firewall_policy},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', rule_name),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_rules', name=rule_name),
                    json={'firewall_rules': []},
                ),
            ]
        )
        self.assertRaises(
            exceptions.NotFoundException,
            self.cloud.insert_rule_into_policy,
            self.firewall_policy_id,
            rule_name,
        )
        self.assert_calls()

    def test_insert_rule_into_policy_already_associated(self):
        rule = FirewallRule(
            **TestFirewallRule._mock_firewall_rule_attrs
        ).to_dict()
        policy = deepcopy(self.mock_firewall_policy)
        policy['firewall_rules'] = [rule['id']]
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={'firewall_policy': policy},
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_rules', rule['id']),
                    json={'firewall_rule': rule},
                ),
            ]
        )

        with mock.patch.object(self.cloud.log, 'debug'):
            r = self.cloud.insert_rule_into_policy(policy['id'], rule['id'])
            self.assertDictEqual(policy, r.to_dict())
            self.assert_calls()
            self.cloud.log.debug.assert_called()

    def test_remove_rule_from_policy(self):
        policy_name = self.firewall_policy_name
        rule = FirewallRule(**TestFirewallRule._mock_firewall_rule_attrs)

        retrieved_policy = deepcopy(self.mock_firewall_policy)
        retrieved_policy['firewall_rules'][0] = rule['id']

        updated_policy = deepcopy(self.mock_firewall_policy)
        del updated_policy['firewall_rules'][0]
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_policies', policy_name),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=policy_name
                    ),
                    json={'firewall_policies': [retrieved_policy]},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', rule['name']),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', name=rule['name']
                    ),
                    json={'firewall_rules': [rule]},
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_policies',
                        self.firewall_policy_id,
                        'remove_rule',
                    ),
                    json=updated_policy,
                    validate=dict(json={'firewall_rule_id': rule['id']}),
                ),
            ]
        )
        r = self.cloud.remove_rule_from_policy(policy_name, rule['name'])
        self.assertDictEqual(updated_policy, r.to_dict())
        self.assert_calls()

    def test_remove_rule_from_policy_not_found(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', name=self.firewall_policy_name
                    ),
                    json={'firewall_policies': []},
                ),
            ]
        )

        with mock.patch.object(self.cloud.network, 'find_firewall_rule'):
            self.assertRaises(
                exceptions.NotFoundException,
                self.cloud.remove_rule_from_policy,
                self.firewall_policy_name,
                TestFirewallRule.firewall_rule_name,
            )
            self.assert_calls()
            self.cloud.network.find_firewall_rule.assert_not_called()

    def test_remove_rule_from_policy_rule_not_found(self):
        retrieved_policy = deepcopy(self.mock_firewall_policy)
        rule = FirewallRule(**TestFirewallRule._mock_firewall_rule_attrs)
        retrieved_policy['firewall_rules'][0] = rule['id']
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies', self.firewall_policy_id
                    ),
                    json={'firewall_policy': retrieved_policy},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_rules', rule['name']),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_rules', name=rule['name']
                    ),
                    json={'firewall_rules': []},
                ),
            ]
        )
        r = self.cloud.remove_rule_from_policy(
            self.firewall_policy_id, rule['name']
        )
        self.assertDictEqual(retrieved_policy, r.to_dict())
        self.assert_calls()

    def test_remove_rule_from_policy_not_associated(self):
        rule = FirewallRule(
            **TestFirewallRule._mock_firewall_rule_attrs
        ).to_dict()
        policy = deepcopy(self.mock_firewall_policy)
        del policy['firewall_rules'][0]

        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_policies', policy['id']),
                    json={'firewall_policy': policy},
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_rules', rule['id']),
                    json={'firewall_rule': rule},
                ),
            ]
        )

        with (
            mock.patch.object(self.cloud.network, 'remove_rule_from_policy'),
            mock.patch.object(self.cloud.log, 'debug'),
        ):
            r = self.cloud.remove_rule_from_policy(policy['id'], rule['id'])
            self.assertDictEqual(policy, r.to_dict())
            self.assert_calls()
            self.cloud.log.debug.assert_called_once()
            self.cloud.network.remove_rule_from_policy.assert_not_called()


class TestFirewallGroup(FirewallTestCase):
    firewall_group_id = '700eed7a-b979-4b80-a06d-14f000d0f645'
    firewall_group_name = 'max_security_group'
    mock_port = {
        'name': 'mock_port',
        'id': '7d90977c-45ec-467e-a16d-dcaed772a161',
    }
    _mock_egress_policy_attrs = {
        'id': '34335e5b-44af-4ffd-9dcf-518133f897c7',
        'name': 'safe_outgoing_data',
    }
    _mock_ingress_policy_attrs = {
        'id': 'cd28fb50-85d0-4f36-89af-50fac08ac174',
        'name': 'bad_incoming_data',
    }
    _mock_firewall_group_attrs = {
        'admin_state_up': True,
        'description': 'Providing max security!',
        'egress_firewall_policy': _mock_egress_policy_attrs['name'],
        'ingress_firewall_policy': _mock_ingress_policy_attrs['name'],
        'id': firewall_group_id,
        'name': firewall_group_name,
        'ports': [mock_port['name']],
        'project_id': 'da347b09-0b4f-4994-a3ef-05d13eaecb2c',
        'shared': False,
    }
    _mock_returned_firewall_group_attrs = {
        'admin_state_up': True,
        'description': 'Providing max security!',
        'egress_firewall_policy': _mock_egress_policy_attrs['name'],
        'egress_firewall_policy_id': _mock_egress_policy_attrs['id'],
        'ingress_firewall_policy': _mock_ingress_policy_attrs['name'],
        'ingress_firewall_policy_id': _mock_ingress_policy_attrs['id'],
        'id': firewall_group_id,
        'name': firewall_group_name,
        'ports': [mock_port['id']],
        'project_id': 'da347b09-0b4f-4994-a3ef-05d13eaecb2c',
        'shared': False,
    }
    mock_egress_policy = None
    mock_ingress_policy = None
    mock_firewall_rule = None
    mock_returned_firewall_rule = None

    def setUp(self, cloud_config_fixture='clouds.yaml'):
        super().setUp()
        self.mock_egress_policy = FirewallPolicy(
            connection=self.cloud, **self._mock_egress_policy_attrs
        ).to_dict()
        self.mock_ingress_policy = FirewallPolicy(
            connection=self.cloud, **self._mock_ingress_policy_attrs
        ).to_dict()
        self.mock_firewall_group = FirewallGroup(
            connection=self.cloud, **self._mock_firewall_group_attrs
        ).to_dict()
        self.mock_returned_firewall_group = FirewallGroup(
            connection=self.cloud, **self._mock_returned_firewall_group_attrs
        ).to_dict()

    def test_create_firewall_group(self):
        create_group_attrs = self._mock_firewall_group_attrs.copy()
        del create_group_attrs['id']
        posted_group_attrs = self._mock_returned_firewall_group_attrs.copy()
        del posted_group_attrs['egress_firewall_policy']
        del posted_group_attrs['ingress_firewall_policy']
        del posted_group_attrs['id']
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.mock_egress_policy['name']
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies',
                        name=self.mock_egress_policy['name'],
                    ),
                    json={'firewall_policies': [self.mock_egress_policy]},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.mock_ingress_policy['name']
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies',
                        name=self.mock_ingress_policy['name'],
                    ),
                    json={'firewall_policies': [self.mock_ingress_policy]},
                ),
                dict(
                    method='GET',
                    uri=self.get_mock_url(
                        'network',
                        'public',
                        append=['v2.0', 'ports', self.mock_port['name']],
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self.get_mock_url(
                        'network',
                        'public',
                        append=['v2.0', 'ports'],
                        qs_elements=['name={}'.format(self.mock_port['name'])],
                    ),
                    json={'ports': [self.mock_port]},
                ),
                dict(
                    method='POST',
                    uri=self._make_mock_url('firewall_groups'),
                    json={
                        'firewall_group': deepcopy(
                            self.mock_returned_firewall_group
                        )
                    },
                    validate=dict(json={'firewall_group': posted_group_attrs}),
                ),
            ]
        )
        r = self.cloud.create_firewall_group(**create_group_attrs)
        self.assertDictEqual(self.mock_returned_firewall_group, r.to_dict())
        self.assert_calls()

    def test_create_firewall_group_compact(self):
        """
        Tests firewall group creation without policies or ports
        """
        firewall_group = deepcopy(self._mock_firewall_group_attrs)
        del firewall_group['ports']
        del firewall_group['egress_firewall_policy']
        del firewall_group['ingress_firewall_policy']
        created_firewall = deepcopy(firewall_group)
        created_firewall.update(
            egress_firewall_policy_id=None,
            ingress_firewall_policy_id=None,
            ports=[],
        )
        del firewall_group['id']
        self.register_uris(
            [
                dict(
                    method='POST',
                    uri=self._make_mock_url('firewall_groups'),
                    json={'firewall_group': created_firewall},
                    validate=dict(json={'firewall_group': firewall_group}),
                )
            ]
        )
        r = self.cloud.create_firewall_group(**firewall_group)
        self.assertDictEqual(
            FirewallGroup(connection=self.cloud, **created_firewall).to_dict(),
            r.to_dict(),
        )
        self.assert_calls()

    def test_delete_firewall_group(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_groups', name=self.firewall_group_name
                    ),
                    json={
                        'firewall_groups': [
                            deepcopy(self.mock_returned_firewall_group)
                        ]
                    },
                ),
                dict(
                    method='DELETE',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    status_code=204,
                ),
            ]
        )
        self.assertTrue(
            self.cloud.delete_firewall_group(self.firewall_group_name)
        )
        self.assert_calls()

    def test_delete_firewall_group_filters(self):
        filters = {'project_id': self.mock_firewall_group['project_id']}
        self.register_uris(
            [
                dict(
                    method='DELETE',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    status_code=204,
                )
            ]
        )

        with mock.patch.object(
            self.cloud.network,
            'find_firewall_group',
            return_value=deepcopy(self.mock_firewall_group),
        ):
            self.assertTrue(
                self.cloud.delete_firewall_group(
                    self.firewall_group_name, filters
                )
            )
            self.assert_calls()
            self.cloud.network.find_firewall_group.assert_called_once_with(
                self.firewall_group_name, ignore_missing=False, **filters
            )

    def test_delete_firewall_group_not_found(self):
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_groups', name=self.firewall_group_name
                    ),
                    json={'firewall_groups': []},
                ),
            ]
        )

        with mock.patch.object(self.cloud.log, 'debug'):
            self.assertFalse(
                self.cloud.delete_firewall_group(self.firewall_group_name)
            )
            self.assert_calls()
            self.cloud.log.debug.assert_called_once()

    def test_get_firewall_group(self):
        returned_group = deepcopy(self.mock_returned_firewall_group)
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_groups', name=self.firewall_group_name
                    ),
                    json={'firewall_groups': [returned_group]},
                ),
            ]
        )
        self.assertDictEqual(
            returned_group,
            self.cloud.get_firewall_group(self.firewall_group_name),
        )
        self.assert_calls()

    def test_get_firewall_group_not_found(self):
        name = 'not_found'
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url('firewall_groups', name),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_groups', name=name),
                    json={'firewall_groups': []},
                ),
            ]
        )
        self.assertIsNone(self.cloud.get_firewall_group(name))
        self.assert_calls()

    def test_get_firewall_group_by_id(self):
        returned_group = deepcopy(self.mock_returned_firewall_group)
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    json={'firewall_group': returned_group},
                )
            ]
        )
        r = self.cloud.get_firewall_group(self.firewall_group_id)
        self.assertDictEqual(returned_group, r.to_dict())
        self.assert_calls()

    def test_list_firewall_groups(self):
        returned_attrs = deepcopy(self.mock_returned_firewall_group)
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url('firewall_groups'),
                    json={'firewall_groups': [returned_attrs, returned_attrs]},
                )
            ]
        )
        group = FirewallGroup(connection=self.cloud, **returned_attrs)
        self.assertListEqual([group, group], self.cloud.list_firewall_groups())
        self.assert_calls()

    def test_update_firewall_group(self):
        params = {
            'description': 'updated!',
            'egress_firewall_policy': self.mock_egress_policy['name'],
            'ingress_firewall_policy': self.mock_ingress_policy['name'],
            'ports': [self.mock_port['name']],
        }
        updated_group = deepcopy(self.mock_returned_firewall_group)
        updated_group['description'] = params['description']

        returned_group = deepcopy(self.mock_returned_firewall_group)
        # unset attributes that will be updated!
        returned_group.update(
            ingress_firewall_policy_id=None,
            egress_firewall_policy_id=None,
            ports=[],
        )
        self.register_uris(
            [
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_name
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_groups', name=self.firewall_group_name
                    ),
                    json={'firewall_groups': [returned_group]},
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.mock_egress_policy['name']
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies',
                        name=self.mock_egress_policy['name'],
                    ),
                    json={
                        'firewall_policies': [
                            deepcopy(self.mock_egress_policy)
                        ]
                    },
                ),
                dict(
                    method='GET',  # short-circuit
                    uri=self._make_mock_url(
                        'firewall_policies', self.mock_ingress_policy['name']
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_policies',
                        name=self.mock_ingress_policy['name'],
                    ),
                    json={
                        'firewall_policies': [
                            deepcopy(self.mock_ingress_policy)
                        ]
                    },
                ),
                dict(
                    method='GET',
                    uri=self.get_mock_url(
                        'network',
                        'public',
                        append=['v2.0', 'ports', self.mock_port['name']],
                    ),
                    status_code=404,
                ),
                dict(
                    method='GET',
                    uri=self.get_mock_url(
                        'network',
                        'public',
                        append=['v2.0', 'ports'],
                        qs_elements=['name={}'.format(self.mock_port['name'])],
                    ),
                    json={'ports': [self.mock_port]},
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    json={'firewall_group': updated_group},
                    validate=dict(
                        json={
                            'firewall_group': {
                                'description': params['description'],
                                'egress_firewall_policy_id': self.mock_egress_policy[  # noqa: E501
                                    'id'
                                ],
                                'ingress_firewall_policy_id': self.mock_ingress_policy[  # noqa: E501
                                    'id'
                                ],
                                'ports': [self.mock_port['id']],
                            }
                        }
                    ),
                ),
            ]
        )
        self.assertDictEqual(
            updated_group,
            self.cloud.update_firewall_group(
                self.firewall_group_name, **params
            ),
        )
        self.assert_calls()

    def test_update_firewall_group_compact(self):
        params = {'description': 'updated again!'}
        updated_group = deepcopy(self.mock_returned_firewall_group)
        updated_group.update(params)

        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    json={
                        'firewall_group': deepcopy(
                            self.mock_returned_firewall_group
                        )
                    },
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    json={'firewall_group': updated_group},
                    validate=dict(json={'firewall_group': params}),
                ),
            ]
        )
        self.assertDictEqual(
            updated_group,
            self.cloud.update_firewall_group(self.firewall_group_id, **params),
        )
        self.assert_calls()

    def test_update_firewall_group_filters(self):
        filters = {'project_id': self.mock_firewall_group['project_id']}
        params = {'description': 'updated again!'}
        updated_group = deepcopy(self.mock_returned_firewall_group)
        self.register_uris(
            [
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    json={'firewall_group': updated_group},
                    validate=dict(json={'firewall_group': params}),
                )
            ]
        )

        with mock.patch.object(
            self.cloud.network,
            'find_firewall_group',
            return_value=deepcopy(self.mock_firewall_group),
        ):
            r = self.cloud.update_firewall_group(
                self.firewall_group_name, filters, **params
            )
            self.assertDictEqual(updated_group, r.to_dict())
            self.assert_calls()
            self.cloud.network.find_firewall_group.assert_called_once_with(
                self.firewall_group_name, ignore_missing=False, **filters
            )

    def test_update_firewall_group_unset_policies(self):
        transformed_params = {
            'ingress_firewall_policy_id': None,
            'egress_firewall_policy_id': None,
        }
        updated_group = deepcopy(self.mock_returned_firewall_group)
        updated_group.update(**transformed_params)
        returned_group = deepcopy(self.mock_returned_firewall_group)
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    json={'firewall_group': returned_group},
                ),
                dict(
                    method='PUT',
                    uri=self._make_mock_url(
                        'firewall_groups', self.firewall_group_id
                    ),
                    json={'firewall_group': updated_group},
                    validate=dict(json={'firewall_group': transformed_params}),
                ),
            ]
        )
        self.assertDictEqual(
            updated_group,
            self.cloud.update_firewall_group(
                self.firewall_group_id,
                ingress_firewall_policy=None,
                egress_firewall_policy=None,
            ),
        )
        self.assert_calls()
