#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain
#   a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.
#

import argparse
from unittest import mock
from unittest.mock import call

from osc_lib.cli import format_columns
from osc_lib import exceptions
from osc_lib import utils

from openstackclient.network.v2 import port
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
from openstackclient.tests.unit import utils as tests_utils


LIST_FIELDS_TO_RETRIEVE = ('id', 'name', 'mac_address', 'fixed_ips', 'status')
LIST_FIELDS_TO_RETRIEVE_LONG = ('security_group_ids', 'device_owner', 'tags')


class TestPort(network_fakes.TestNetworkV2):

    def setUp(self):
        super(TestPort, self).setUp()

        # Get a shortcut to the network client
        self.network = self.app.client_manager.network
        # Get a shortcut to the ProjectManager Mock
        self.projects_mock = self.app.client_manager.identity.projects

    @staticmethod
    def _get_common_cols_data(fake_port):
        columns = (
            'admin_state_up',
            'allowed_address_pairs',
            'binding_host_id',
            'binding_profile',
            'binding_vif_details',
            'binding_vif_type',
            'binding_vnic_type',
            'created_at',
            'data_plane_status',
            'description',
            'device_id',
            'device_owner',
            'device_profile',
            'dns_assignment',
            'dns_domain',
            'dns_name',
            'extra_dhcp_opts',
            'fixed_ips',
            'id',
            'ip_allocation',
            'mac_address',
            'name',
            'network_id',
            'numa_affinity_policy',
            'port_security_enabled',
            'project_id',
            'propagate_uplink_status',
            'resource_request',
            'revision_number',
            'qos_network_policy_id',
            'qos_policy_id',
            'security_group_ids',
            'status',
            'tags',
            'trunk_details',
            'updated_at',
        )

        data = (
            port.AdminStateColumn(fake_port.is_admin_state_up),
            format_columns.ListDictColumn(fake_port.allowed_address_pairs),
            fake_port.binding_host_id,
            format_columns.DictColumn(fake_port.binding_profile),
            format_columns.DictColumn(fake_port.binding_vif_details),
            fake_port.binding_vif_type,
            fake_port.binding_vnic_type,
            fake_port.created_at,
            fake_port.data_plane_status,
            fake_port.description,
            fake_port.device_id,
            fake_port.device_owner,
            fake_port.device_profile,
            format_columns.ListDictColumn(fake_port.dns_assignment),
            fake_port.dns_domain,
            fake_port.dns_name,
            format_columns.ListDictColumn(fake_port.extra_dhcp_opts),
            format_columns.ListDictColumn(fake_port.fixed_ips),
            fake_port.id,
            fake_port.ip_allocation,
            fake_port.mac_address,
            fake_port.name,
            fake_port.network_id,
            fake_port.numa_affinity_policy,
            fake_port.is_port_security_enabled,
            fake_port.project_id,
            fake_port.propagate_uplink_status,
            fake_port.resource_request,
            fake_port.revision_number,
            fake_port.qos_network_policy_id,
            fake_port.qos_policy_id,
            format_columns.ListColumn(fake_port.security_group_ids),
            fake_port.status,
            format_columns.ListColumn(fake_port.tags),
            fake_port.trunk_details,
            fake_port.updated_at,
        )

        return columns, data


class TestCreatePort(TestPort):

    _port = network_fakes.create_one_port()
    columns, data = TestPort._get_common_cols_data(_port)

    def setUp(self):
        super(TestCreatePort, self).setUp()

        self.network.create_port = mock.Mock(return_value=self._port)
        self.network.set_tags = mock.Mock(return_value=None)
        fake_net = network_fakes.create_one_network({
            'id': self._port.network_id,
        })
        self.network.find_network = mock.Mock(return_value=fake_net)
        self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
        self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
        self.network.find_extension = mock.Mock(return_value=[])
        # Get the command object to test
        self.cmd = port.CreatePort(self.app, self.namespace)

    def test_create_default_options(self):
        arglist = [
            '--network', self._port.network_id,
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'name': 'test-port',
        })
        self.assertFalse(self.network.set_tags.called)

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_full_options(self):
        arglist = [
            '--mac-address', 'aa:aa:aa:aa:aa:aa',
            '--fixed-ip', 'subnet=%s,ip-address=10.0.0.2'
            % self.fake_subnet.id,
            '--description', self._port.description,
            '--device', 'deviceid',
            '--device-owner', 'fakeowner',
            '--disable',
            '--vnic-type', 'macvtap',
            '--binding-profile', 'foo=bar',
            '--binding-profile', 'foo2=bar2',
            '--network', self._port.network_id,
            '--dns-domain', 'example.org',
            '--dns-name', '8.8.8.8',
            'test-port',

        ]
        verifylist = [
            ('mac_address', 'aa:aa:aa:aa:aa:aa'),
            (
                'fixed_ip',
                [{'subnet': self.fake_subnet.id, 'ip-address': '10.0.0.2'}]
            ),
            ('description', self._port.description),
            ('device', 'deviceid'),
            ('device_owner', 'fakeowner'),
            ('disable', True),
            ('vnic_type', 'macvtap'),
            ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}),
            ('network', self._port.network_id),
            ('dns_domain', 'example.org'),
            ('dns_name', '8.8.8.8'),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'mac_address': 'aa:aa:aa:aa:aa:aa',
            'fixed_ips': [{'subnet_id': self.fake_subnet.id,
                           'ip_address': '10.0.0.2'}],
            'description': self._port.description,
            'device_id': 'deviceid',
            'device_owner': 'fakeowner',
            'admin_state_up': False,
            'binding:vnic_type': 'macvtap',
            'binding:profile': {'foo': 'bar', 'foo2': 'bar2'},
            'network_id': self._port.network_id,
            'dns_domain': 'example.org',
            'dns_name': '8.8.8.8',
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_invalid_json_binding_profile(self):
        arglist = [
            '--network', self._port.network_id,
            '--binding-profile', '{"parent_name":"fake_parent"',
            'test-port',
        ]
        self.assertRaises(argparse.ArgumentTypeError,
                          self.check_parser,
                          self.cmd,
                          arglist,
                          None)

    def test_create_invalid_key_value_binding_profile(self):
        arglist = [
            '--network', self._port.network_id,
            '--binding-profile', 'key',
            'test-port',
        ]
        self.assertRaises(argparse.ArgumentTypeError,
                          self.check_parser,
                          self.cmd,
                          arglist,
                          None)

    def test_create_json_binding_profile(self):
        arglist = [
            '--network', self._port.network_id,
            '--binding-profile', '{"parent_name":"fake_parent"}',
            '--binding-profile', '{"tag":42}',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id),
            ('enable', True),
            ('binding_profile', {'parent_name': 'fake_parent', 'tag': 42}),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'binding:profile': {'parent_name': 'fake_parent', 'tag': 42},
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_with_security_group(self):
        secgroup = network_fakes.FakeSecurityGroup.create_one_security_group()
        self.network.find_security_group = mock.Mock(return_value=secgroup)
        arglist = [
            '--network', self._port.network_id,
            '--security-group', secgroup.id,
            'test-port',
        ]

        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('security_group', [secgroup.id]),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'security_group_ids': [secgroup.id],
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_port_with_dns_name(self):
        arglist = [
            '--network', self._port.network_id,
            '--dns-name', '8.8.8.8',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('dns_name', '8.8.8.8'),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'dns_name': '8.8.8.8',
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_with_security_groups(self):
        sg_1 = network_fakes.FakeSecurityGroup.create_one_security_group()
        sg_2 = network_fakes.FakeSecurityGroup.create_one_security_group()
        self.network.find_security_group = mock.Mock(side_effect=[sg_1, sg_2])
        arglist = [
            '--network', self._port.network_id,
            '--security-group', sg_1.id,
            '--security-group', sg_2.id,
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('security_group', [sg_1.id, sg_2.id]),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'security_group_ids': [sg_1.id, sg_2.id],
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_with_no_security_groups(self):
        arglist = [
            '--network', self._port.network_id,
            '--no-security-group',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id),
            ('enable', True),
            ('no_security_group', True),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'security_group_ids': [],
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_with_no_fixed_ips(self):
        arglist = [
            '--network', self._port.network_id,
            '--no-fixed-ip',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id),
            ('enable', True),
            ('no_fixed_ip', True),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'fixed_ips': [],
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_port_with_allowed_address_pair_ipaddr(self):
        pairs = [{'ip_address': '192.168.1.123'},
                 {'ip_address': '192.168.1.45'}]
        arglist = [
            '--network', self._port.network_id,
            '--allowed-address', 'ip-address=192.168.1.123',
            '--allowed-address', 'ip-address=192.168.1.45',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id),
            ('enable', True),
            ('allowed_address_pairs', [{'ip-address': '192.168.1.123'},
                                       {'ip-address': '192.168.1.45'}]),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'allowed_address_pairs': pairs,
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_port_with_allowed_address_pair(self):
        pairs = [{'ip_address': '192.168.1.123',
                  'mac_address': 'aa:aa:aa:aa:aa:aa'},
                 {'ip_address': '192.168.1.45',
                  'mac_address': 'aa:aa:aa:aa:aa:b1'}]
        arglist = [
            '--network', self._port.network_id,
            '--allowed-address',
            'ip-address=192.168.1.123,mac-address=aa:aa:aa:aa:aa:aa',
            '--allowed-address',
            'ip-address=192.168.1.45,mac-address=aa:aa:aa:aa:aa:b1',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id),
            ('enable', True),
            ('allowed_address_pairs', [{'ip-address': '192.168.1.123',
                                        'mac-address': 'aa:aa:aa:aa:aa:aa'},
                                       {'ip-address': '192.168.1.45',
                                        'mac-address': 'aa:aa:aa:aa:aa:b1'}]),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'allowed_address_pairs': pairs,
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_port_with_qos(self):
        qos_policy = network_fakes.FakeNetworkQosPolicy.create_one_qos_policy()
        self.network.find_qos_policy = mock.Mock(return_value=qos_policy)
        arglist = [
            '--network', self._port.network_id,
            '--qos-policy', qos_policy.id,
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('qos_policy', qos_policy.id),
            ('name', 'test-port'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'qos_policy_id': qos_policy.id,
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_port_security_enabled(self):
        arglist = [
            '--network', self._port.network_id,
            '--enable-port-security',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('enable_port_security', True),
            ('name', 'test-port'),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        self.cmd.take_action(parsed_args)

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'port_security_enabled': True,
            'name': 'test-port',
        })

    def test_create_port_security_disabled(self):
        arglist = [
            '--network', self._port.network_id,
            '--disable-port-security',
            'test-port',
        ]
        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('disable_port_security', True),
            ('name', 'test-port'),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        self.cmd.take_action(parsed_args)

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'port_security_enabled': False,
            'name': 'test-port',
        })

    def _test_create_with_tag(self, add_tags=True, add_tags_in_post=True):
        arglist = [
            '--network', self._port.network_id,
            'test-port',
        ]
        if add_tags:
            arglist += ['--tag', 'red', '--tag', 'blue']
        else:
            arglist += ['--no-tag']
        verifylist = [
            ('network', self._port.network_id,),
            ('enable', True),
            ('name', 'test-port'),
        ]
        if add_tags:
            verifylist.append(('tags', ['red', 'blue']))
        else:
            verifylist.append(('no_tag', True))

        self.network.find_extension = mock.Mock(return_value=add_tags_in_post)

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        columns, data = (self.cmd.take_action(parsed_args))

        args = {
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'name': 'test-port',
        }
        if add_tags_in_post:
            if add_tags:
                args['tags'] = sorted(['red', 'blue'])
            else:
                args['tags'] = []
            self.network.create_port.assert_called_once()
            # Now we need to verify if arguments to call create_port are as
            # expected,
            # But we can't simply use assert_called_once_with() method because
            # duplicates from 'tags' are removed with
            # list(set(parsed_args.tags)) and that don't quarantee order of
            # tags list which is used to call create_port().
            create_port_call_kwargs = self.network.create_port.call_args[1]
            create_port_call_kwargs['tags'] = sorted(
                create_port_call_kwargs['tags'])
            self.assertDictEqual(args, create_port_call_kwargs)
        else:
            self.network.create_port.assert_called_once_with(
                admin_state_up=True,
                network_id=self._port.network_id,
                name='test-port'
            )
            if add_tags:
                self.network.set_tags.assert_called_once_with(
                    self._port,
                    tests_utils.CompareBySet(['red', 'blue']))
            else:
                self.assertFalse(self.network.set_tags.called)

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_with_tags(self):
        self._test_create_with_tag(add_tags=True, add_tags_in_post=True)

    def test_create_with_no_tag(self):
        self._test_create_with_tag(add_tags=False, add_tags_in_post=True)

    def test_create_with_tags_using_put(self):
        self._test_create_with_tag(add_tags=True, add_tags_in_post=False)

    def test_create_with_no_tag_using_put(self):
        self._test_create_with_tag(add_tags=False, add_tags_in_post=False)

    def _test_create_with_uplink_status_propagation(self, enable=True):
        arglist = [
            '--network', self._port.network_id,
            'test-port',
        ]
        if enable:
            arglist += ['--enable-uplink-status-propagation']
        else:
            arglist += ['--disable-uplink-status-propagation']
        verifylist = [
            ('network', self._port.network_id,),
            ('name', 'test-port'),
        ]
        if enable:
            verifylist.append(('enable_uplink_status_propagation', True))
        else:
            verifylist.append(('disable_uplink_status_propagation', True))
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'propagate_uplink_status': enable,
            'name': 'test-port',
        })

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_with_uplink_status_propagation_enabled(self):
        self._test_create_with_uplink_status_propagation(enable=True)

    def test_create_with_uplink_status_propagation_disabled(self):
        self._test_create_with_uplink_status_propagation(enable=False)

    def test_create_port_with_extra_dhcp_option(self):
        extra_dhcp_options = [{'opt_name': 'classless-static-route',
                               'opt_value': '169.254.169.254/32,22.2.0.2,'
                                            '0.0.0.0/0,22.2.0.1',
                               'ip_version': '4'},
                              {'opt_name': 'dns-server',
                               'opt_value': '240C::6666',
                               'ip_version': '6'}]
        arglist = [
            '--network', self._port.network_id,
            '--extra-dhcp-option', 'name=classless-static-route,'
                                   'value=169.254.169.254/32,22.2.0.2,'
                                   '0.0.0.0/0,22.2.0.1,'
                                   'ip-version=4',
            '--extra-dhcp-option', 'name=dns-server,value=240C::6666,'
                                   'ip-version=6',
            'test-port',
        ]

        verifylist = [
            ('network', self._port.network_id,),
            ('extra_dhcp_options', [{'name': 'classless-static-route',
                                     'value': '169.254.169.254/32,22.2.0.2,'
                                              '0.0.0.0/0,22.2.0.1',
                                     'ip-version': '4'},
                                    {'name': 'dns-server',
                                     'value': '240C::6666',
                                     'ip-version': '6'}]),
            ('name', 'test-port'),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        self.cmd.take_action(parsed_args)

        self.network.create_port.assert_called_once_with(**{
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'extra_dhcp_opts': extra_dhcp_options,
            'name': 'test-port',
        })

    def _test_create_with_numa_affinity_policy(self, policy=None):
        arglist = [
            '--network', self._port.network_id,
            'test-port',
        ]
        if policy:
            arglist += ['--numa-policy-%s' % policy]

        numa_affinity_policy = None if not policy else policy
        verifylist = [
            ('network', self._port.network_id,),
            ('name', 'test-port'),
        ]
        if policy:
            verifylist.append(('numa_policy_%s' % policy, True))

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        create_args = {
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'name': 'test-port',
        }
        if numa_affinity_policy:
            create_args['numa_affinity_policy'] = numa_affinity_policy
        self.network.create_port.assert_called_once_with(**create_args)

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)

    def test_create_with_numa_affinity_policy_required(self):
        self._test_create_with_numa_affinity_policy(policy='required')

    def test_create_with_numa_affinity_policy_preferred(self):
        self._test_create_with_numa_affinity_policy(policy='preferred')

    def test_create_with_numa_affinity_policy_legacy(self):
        self._test_create_with_numa_affinity_policy(policy='legacy')

    def test_create_with_numa_affinity_policy_null(self):
        self._test_create_with_numa_affinity_policy()

    def test_create_with_device_profile(self):
        arglist = [
            '--network', self._port.network_id,
            '--device-profile', 'cyborg_device_profile_1',
            'test-port',
        ]

        verifylist = [
            ('network', self._port.network_id,),
            ('device_profile', self._port.device_profile,),
            ('name', 'test-port'),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = (self.cmd.take_action(parsed_args))

        create_args = {
            'admin_state_up': True,
            'network_id': self._port.network_id,
            'name': 'test-port',
            'device_profile': 'cyborg_device_profile_1',
        }
        self.network.create_port.assert_called_once_with(**create_args)
        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)


class TestDeletePort(TestPort):

    # Ports to delete.
    _ports = network_fakes.create_ports(count=2)

    def setUp(self):
        super(TestDeletePort, self).setUp()

        self.network.delete_port = mock.Mock(return_value=None)
        self.network.find_port = network_fakes.get_ports(
            ports=self._ports)
        # Get the command object to test
        self.cmd = port.DeletePort(self.app, self.namespace)

    def test_port_delete(self):
        arglist = [
            self._ports[0].name,
        ]
        verifylist = [
            ('port', [self._ports[0].name]),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        self.network.find_port.assert_called_once_with(
            self._ports[0].name, ignore_missing=False)
        self.network.delete_port.assert_called_once_with(self._ports[0])
        self.assertIsNone(result)

    def test_multi_ports_delete(self):
        arglist = []
        verifylist = []

        for p in self._ports:
            arglist.append(p.name)
        verifylist = [
            ('port', arglist),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)

        calls = []
        for p in self._ports:
            calls.append(call(p))
        self.network.delete_port.assert_has_calls(calls)
        self.assertIsNone(result)

    def test_multi_ports_delete_with_exception(self):
        arglist = [
            self._ports[0].name,
            'unexist_port',
        ]
        verifylist = [
            ('port',
             [self._ports[0].name, 'unexist_port']),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        find_mock_result = [self._ports[0], exceptions.CommandError]
        self.network.find_port = (
            mock.Mock(side_effect=find_mock_result)
        )

        try:
            self.cmd.take_action(parsed_args)
            self.fail('CommandError should be raised.')
        except exceptions.CommandError as e:
            self.assertEqual('1 of 2 ports failed to delete.', str(e))

        self.network.find_port.assert_any_call(
            self._ports[0].name, ignore_missing=False)
        self.network.find_port.assert_any_call(
            'unexist_port', ignore_missing=False)
        self.network.delete_port.assert_called_once_with(
            self._ports[0]
        )


class TestListPort(TestPort):

    _ports = network_fakes.create_ports(count=3)

    columns = (
        'ID',
        'Name',
        'MAC Address',
        'Fixed IP Addresses',
        'Status',
    )

    columns_long = (
        'ID',
        'Name',
        'MAC Address',
        'Fixed IP Addresses',
        'Status',
        'Security Groups',
        'Device Owner',
        'Tags',
    )

    data = []
    for prt in _ports:
        data.append((
            prt.id,
            prt.name,
            prt.mac_address,
            format_columns.ListDictColumn(prt.fixed_ips),
            prt.status,
        ))

    data_long = []
    for prt in _ports:
        data_long.append((
            prt.id,
            prt.name,
            prt.mac_address,
            format_columns.ListDictColumn(prt.fixed_ips),
            prt.status,
            format_columns.ListColumn(prt.security_group_ids),
            prt.device_owner,
            format_columns.ListColumn(prt.tags),
        ))

    def setUp(self):
        super(TestListPort, self).setUp()

        # Get the command object to test
        self.cmd = port.ListPort(self.app, self.namespace)
        self.network.ports = mock.Mock(return_value=self._ports)
        fake_router = network_fakes.FakeRouter.create_one_router({
            'id': 'fake-router-id',
        })
        fake_network = network_fakes.create_one_network({
            'id': 'fake-network-id',
        })
        self.network.find_router = mock.Mock(return_value=fake_router)
        self.network.find_network = mock.Mock(return_value=fake_network)
        self.app.client_manager.compute = mock.Mock()

    def test_port_list_no_options(self):
        arglist = []
        verifylist = []

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(
            fields=LIST_FIELDS_TO_RETRIEVE)
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_router_opt(self):
        arglist = [
            '--router', 'fake-router-name',
        ]

        verifylist = [
            ('router', 'fake-router-name')
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'device_id': 'fake-router-id',
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    @mock.patch.object(utils, 'find_resource')
    def test_port_list_with_server_option(self, mock_find):
        fake_server = compute_fakes.FakeServer.create_one_server()
        mock_find.return_value = fake_server

        arglist = [
            '--server', 'fake-server-name',
        ]
        verifylist = [
            ('server', 'fake-server-name'),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        columns, data = self.cmd.take_action(parsed_args)
        self.network.ports.assert_called_once_with(
            device_id=fake_server.id,
            fields=LIST_FIELDS_TO_RETRIEVE)
        mock_find.assert_called_once_with(mock.ANY, 'fake-server-name')
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_device_id_opt(self):
        arglist = [
            '--device-id', self._ports[0].device_id,
        ]

        verifylist = [
            ('device_id', self._ports[0].device_id)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'device_id': self._ports[0].device_id,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_device_owner_opt(self):
        arglist = [
            '--device-owner', self._ports[0].device_owner,
        ]

        verifylist = [
            ('device_owner', self._ports[0].device_owner)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'device_owner': self._ports[0].device_owner,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_all_opt(self):
        arglist = [
            '--device-owner', self._ports[0].device_owner,
            '--router', 'fake-router-name',
            '--network', 'fake-network-name',
            '--mac-address', self._ports[0].mac_address,
        ]

        verifylist = [
            ('device_owner', self._ports[0].device_owner),
            ('router', 'fake-router-name'),
            ('network', 'fake-network-name'),
            ('mac_address', self._ports[0].mac_address)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'device_owner': self._ports[0].device_owner,
            'device_id': 'fake-router-id',
            'network_id': 'fake-network-id',
            'mac_address': self._ports[0].mac_address,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_mac_address_opt(self):
        arglist = [
            '--mac-address', self._ports[0].mac_address,
        ]

        verifylist = [
            ('mac_address', self._ports[0].mac_address)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'mac_address': self._ports[0].mac_address,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_fixed_ip_opt_ip_address(self):
        ip_address = self._ports[0].fixed_ips[0]['ip_address']
        arglist = [
            '--fixed-ip', "ip-address=%s" % ip_address,
        ]
        verifylist = [
            ('fixed_ip', [{'ip-address': ip_address}])
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'fixed_ips': ['ip_address=%s' % ip_address],
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_fixed_ip_opt_ip_address_substr(self):
        ip_address_ss = self._ports[0].fixed_ips[0]['ip_address'][:-1]
        arglist = [
            '--fixed-ip', "ip-substring=%s" % ip_address_ss,
        ]
        verifylist = [
            ('fixed_ip', [{'ip-substring': ip_address_ss}])
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'fixed_ips': ['ip_address_substr=%s' % ip_address_ss],
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_fixed_ip_opt_subnet_id(self):
        subnet_id = self._ports[0].fixed_ips[0]['subnet_id']
        arglist = [
            '--fixed-ip', "subnet=%s" % subnet_id,
        ]
        verifylist = [
            ('fixed_ip', [{'subnet': subnet_id}])
        ]

        self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet(
            {'id': subnet_id})
        self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'fixed_ips': ['subnet_id=%s' % subnet_id],
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_fixed_ip_opts(self):
        subnet_id = self._ports[0].fixed_ips[0]['subnet_id']
        ip_address = self._ports[0].fixed_ips[0]['ip_address']
        arglist = [
            '--fixed-ip', "subnet=%s,ip-address=%s" % (subnet_id,
                                                       ip_address)
        ]
        verifylist = [
            ('fixed_ip', [{'subnet': subnet_id,
                          'ip-address': ip_address}])
        ]

        self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet(
            {'id': subnet_id})
        self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'fixed_ips': ['subnet_id=%s' % subnet_id,
                          'ip_address=%s' % ip_address],
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_fixed_ips(self):
        subnet_id = self._ports[0].fixed_ips[0]['subnet_id']
        ip_address = self._ports[0].fixed_ips[0]['ip_address']
        arglist = [
            '--fixed-ip', "subnet=%s" % subnet_id,
            '--fixed-ip', "ip-address=%s" % ip_address,
        ]
        verifylist = [
            ('fixed_ip', [{'subnet': subnet_id},
                          {'ip-address': ip_address}])
        ]

        self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet({
            'id': subnet_id,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(**{
            'fixed_ips': ['subnet_id=%s' % subnet_id,
                          'ip_address=%s' % ip_address],
            'fields': LIST_FIELDS_TO_RETRIEVE,
        })
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_list_port_with_long(self):
        arglist = [
            '--long',
        ]

        verifylist = [
            ('long', True),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(
            fields=LIST_FIELDS_TO_RETRIEVE + LIST_FIELDS_TO_RETRIEVE_LONG)
        self.assertEqual(self.columns_long, columns)
        self.assertCountEqual(self.data_long, list(data))

    def test_port_list_host(self):
        arglist = [
            '--host', 'foobar',
        ]
        verifylist = [
            ('host', 'foobar'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        filters = {
            'binding:host_id': 'foobar',
            'fields': LIST_FIELDS_TO_RETRIEVE,
        }

        self.network.ports.assert_called_once_with(**filters)
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_project(self):
        project = identity_fakes.FakeProject.create_one_project()
        self.projects_mock.get.return_value = project
        arglist = [
            '--project', project.id,
        ]
        verifylist = [
            ('project', project.id),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        filters = {
            'project_id': project.id,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        }

        self.network.ports.assert_called_once_with(**filters)
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_project_domain(self):
        project = identity_fakes.FakeProject.create_one_project()
        self.projects_mock.get.return_value = project
        arglist = [
            '--project', project.id,
            '--project-domain', project.domain_id,
        ]
        verifylist = [
            ('project', project.id),
            ('project_domain', project.domain_id),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        filters = {
            'project_id': project.id,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        }

        self.network.ports.assert_called_once_with(**filters)
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_name(self):
        test_name = "fakename"
        arglist = [
            '--name', test_name,
        ]
        verifylist = [
            ('name', test_name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        filters = {
            'name': test_name,
            'fields': LIST_FIELDS_TO_RETRIEVE,
        }

        self.network.ports.assert_called_once_with(**filters)
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_list_with_tag_options(self):
        arglist = [
            '--tags', 'red,blue',
            '--any-tags', 'red,green',
            '--not-tags', 'orange,yellow',
            '--not-any-tags', 'black,white',
        ]
        verifylist = [
            ('tags', ['red', 'blue']),
            ('any_tags', ['red', 'green']),
            ('not_tags', ['orange', 'yellow']),
            ('not_any_tags', ['black', 'white']),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        columns, data = self.cmd.take_action(parsed_args)

        self.network.ports.assert_called_once_with(
            **{'tags': 'red,blue',
               'any_tags': 'red,green',
               'not_tags': 'orange,yellow',
               'not_any_tags': 'black,white',
               'fields': LIST_FIELDS_TO_RETRIEVE}
        )
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))

    def test_port_list_security_group(self):
        arglist = [
            '--security-group', 'sg-id1',
            '--security-group', 'sg-id2',
        ]
        verifylist = [
            ('security_groups', ['sg-id1', 'sg-id2']),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)
        filters = {
            'security_groups': ['sg-id1', 'sg-id2'],
            'fields': LIST_FIELDS_TO_RETRIEVE,
        }

        self.network.ports.assert_called_once_with(**filters)
        self.assertEqual(self.columns, columns)
        self.assertCountEqual(self.data, list(data))


class TestSetPort(TestPort):

    _port = network_fakes.create_one_port({'tags': ['green', 'red']})

    def setUp(self):
        super(TestSetPort, self).setUp()
        self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet()
        self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
        self.network.find_port = mock.Mock(return_value=self._port)
        self.network.update_port = mock.Mock(return_value=None)
        self.network.set_tags = mock.Mock(return_value=None)

        # Get the command object to test
        self.cmd = port.SetPort(self.app, self.namespace)

    def test_set_port_defaults(self):
        arglist = [
            self._port.name,
        ]
        verifylist = [
            ('port', self._port.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        self.assertFalse(self.network.update_port.called)
        self.assertFalse(self.network.set_tags.called)
        self.assertIsNone(result)

    def test_set_port_fixed_ip(self):
        _testport = network_fakes.create_one_port(
            {'fixed_ips': [{'ip_address': '0.0.0.1'}]})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--fixed-ip', 'ip-address=10.0.0.12',
            _testport.name,
        ]
        verifylist = [
            ('fixed_ip', [{'ip-address': '10.0.0.12'}]),
            ('port', _testport.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        attrs = {
            'fixed_ips': [
                {'ip_address': '0.0.0.1'},
                {'ip_address': '10.0.0.12'},
            ],
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_fixed_ip_clear(self):
        _testport = network_fakes.create_one_port(
            {'fixed_ips': [{'ip_address': '0.0.0.1'}]})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--fixed-ip', 'ip-address=10.0.0.12',
            '--no-fixed-ip',
            _testport.name,
        ]
        verifylist = [
            ('fixed_ip', [{'ip-address': '10.0.0.12'}]),
            ('no_fixed_ip', True)
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        attrs = {
            'fixed_ips': [
                {'ip_address': '10.0.0.12'},
            ],
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_dns_name(self):
        arglist = [
            '--dns-name', '8.8.8.8',
            self._port.name,
        ]
        verifylist = [
            ('dns_name', '8.8.8.8'),
            ('port', self._port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'dns_name': '8.8.8.8',
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_overwrite_binding_profile(self):
        _testport = network_fakes.create_one_port(
            {'binding_profile': {'lok_i': 'visi_on'}})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--binding-profile', 'lok_i=than_os',
            '--no-binding-profile',
            _testport.name,
        ]
        verifylist = [
            ('binding_profile', {'lok_i': 'than_os'}),
            ('no_binding_profile', True)
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)
        attrs = {
            'binding:profile':
                {'lok_i': 'than_os'},
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_overwrite_mac_address(self):
        _testport = network_fakes.create_one_port(
            {'mac_address': '11:22:33:44:55:66'})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--mac-address', '66:55:44:33:22:11',
            _testport.name,
        ]
        verifylist = [
            ('mac_address', '66:55:44:33:22:11'),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)
        attrs = {
            'mac_address': '66:55:44:33:22:11',
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_this(self):
        arglist = [
            '--disable',
            '--no-fixed-ip',
            '--no-binding-profile',
            self._port.name,
        ]
        verifylist = [
            ('disable', True),
            ('no_binding_profile', True),
            ('no_fixed_ip', True),
            ('port', self._port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'admin_state_up': False,
            'binding:profile': {},
            'fixed_ips': [],
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_that(self):
        arglist = [
            '--description', 'newDescription',
            '--enable',
            '--vnic-type', 'macvtap',
            '--binding-profile', 'foo=bar',
            '--host', 'binding-host-id-xxxx',
            '--name', 'newName',
            self._port.name,
        ]
        verifylist = [
            ('description', 'newDescription'),
            ('enable', True),
            ('vnic_type', 'macvtap'),
            ('binding_profile', {'foo': 'bar'}),
            ('host', 'binding-host-id-xxxx'),
            ('name', 'newName'),
            ('port', self._port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'admin_state_up': True,
            'binding:vnic_type': 'macvtap',
            'binding:profile': {'foo': 'bar'},
            'binding:host_id': 'binding-host-id-xxxx',
            'description': 'newDescription',
            'name': 'newName',
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_invalid_json_binding_profile(self):
        arglist = [
            '--binding-profile', '{"parent_name"}',
            'test-port',
        ]
        self.assertRaises(argparse.ArgumentTypeError,
                          self.check_parser,
                          self.cmd,
                          arglist,
                          None)

    def test_set_port_invalid_key_value_binding_profile(self):
        arglist = [
            '--binding-profile', 'key',
            'test-port',
        ]
        self.assertRaises(argparse.ArgumentTypeError,
                          self.check_parser,
                          self.cmd,
                          arglist,
                          None)

    def test_set_port_mixed_binding_profile(self):
        arglist = [
            '--binding-profile', 'foo=bar',
            '--binding-profile', '{"foo2": "bar2"}',
            self._port.name,
        ]
        verifylist = [
            ('binding_profile', {'foo': 'bar', 'foo2': 'bar2'}),
            ('port', self._port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'binding:profile': {'foo': 'bar', 'foo2': 'bar2'},
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_security_group(self):
        sg = network_fakes.FakeSecurityGroup.create_one_security_group()
        self.network.find_security_group = mock.Mock(return_value=sg)
        arglist = [
            '--security-group', sg.id,
            self._port.name,
        ]
        verifylist = [
            ('security_group', [sg.id]),
            ('port', self._port.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        attrs = {
            'security_group_ids': [sg.id],
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_security_group_append(self):
        sg_1 = network_fakes.FakeSecurityGroup.create_one_security_group()
        sg_2 = network_fakes.FakeSecurityGroup.create_one_security_group()
        sg_3 = network_fakes.FakeSecurityGroup.create_one_security_group()
        self.network.find_security_group = mock.Mock(side_effect=[sg_2, sg_3])
        _testport = network_fakes.create_one_port(
            {'security_group_ids': [sg_1.id]})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--security-group', sg_2.id,
            '--security-group', sg_3.id,
            _testport.name,
        ]
        verifylist = [
            ('security_group', [sg_2.id, sg_3.id]),
            ('port', _testport.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        attrs = {
            'security_group_ids': [sg_1.id, sg_2.id, sg_3.id],
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_security_group_clear(self):
        arglist = [
            '--no-security-group',
            self._port.name,
        ]
        verifylist = [
            ('no_security_group', True),
            ('port', self._port.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        attrs = {
            'security_group_ids': [],
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_security_group_replace(self):
        sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
        sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
        _testport = network_fakes.create_one_port(
            {'security_group_ids': [sg1.id]})
        self.network.find_port = mock.Mock(return_value=_testport)
        self.network.find_security_group = mock.Mock(return_value=sg2)
        arglist = [
            '--security-group', sg2.id,
            '--no-security-group',
            _testport.name,
        ]
        verifylist = [
            ('security_group', [sg2.id]),
            ('no_security_group', True)
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        result = self.cmd.take_action(parsed_args)
        attrs = {
            'security_group_ids': [sg2.id],
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_allowed_address_pair(self):
        arglist = [
            '--allowed-address', 'ip-address=192.168.1.123',
            self._port.name,
        ]
        verifylist = [
            ('allowed_address_pairs', [{'ip-address': '192.168.1.123'}]),
            ('port', self._port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'allowed_address_pairs': [{'ip_address': '192.168.1.123'}],
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_append_allowed_address_pair(self):
        _testport = network_fakes.create_one_port(
            {'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--allowed-address', 'ip-address=192.168.1.45',
            _testport.name,
        ]
        verifylist = [
            ('allowed_address_pairs', [{'ip-address': '192.168.1.45'}]),
            ('port', _testport.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'allowed_address_pairs': [{'ip_address': '192.168.1.123'},
                                      {'ip_address': '192.168.1.45'}],
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_overwrite_allowed_address_pair(self):
        _testport = network_fakes.create_one_port(
            {'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--allowed-address', 'ip-address=192.168.1.45',
            '--no-allowed-address',
            _testport.name,
        ]
        verifylist = [
            ('allowed_address_pairs', [{'ip-address': '192.168.1.45'}]),
            ('no_allowed_address_pair', True),
            ('port', _testport.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'allowed_address_pairs': [{'ip_address': '192.168.1.45'}],
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_no_allowed_address_pairs(self):
        arglist = [
            '--no-allowed-address',
            self._port.name,
        ]
        verifylist = [
            ('no_allowed_address_pair', True),
            ('port', self._port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'allowed_address_pairs': [],
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_extra_dhcp_option(self):
        arglist = [
            '--extra-dhcp-option', 'name=foo,value=bar',
            self._port.name,
        ]
        verifylist = [
            ('extra_dhcp_options', [{'name': 'foo',
                                     'value': 'bar'}]),
            ('port', self._port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'extra_dhcp_opts': [{'opt_name': 'foo',
                                 'opt_value': 'bar'}],
        }
        self.network.update_port.assert_called_once_with(self._port, **attrs)
        self.assertIsNone(result)

    def test_set_port_security_enabled(self):
        arglist = [
            '--enable-port-security',
            self._port.id,
        ]
        verifylist = [
            ('enable_port_security', True),
            ('port', self._port.id,)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        self.cmd.take_action(parsed_args)

        self.network.update_port.assert_called_once_with(self._port, **{
            'port_security_enabled': True,
        })

    def test_set_port_security_disabled(self):
        arglist = [
            '--disable-port-security',
            self._port.id,
        ]
        verifylist = [
            ('disable_port_security', True),
            ('port', self._port.id,)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        self.cmd.take_action(parsed_args)

        self.network.update_port.assert_called_once_with(self._port, **{
            'port_security_enabled': False,
        })

    def test_set_port_with_qos(self):
        qos_policy = network_fakes.FakeNetworkQosPolicy.create_one_qos_policy()
        self.network.find_qos_policy = mock.Mock(return_value=qos_policy)
        _testport = network_fakes.create_one_port(
            {'qos_policy_id': None})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--qos-policy', qos_policy.id,
            _testport.name,
        ]
        verifylist = [
            ('qos_policy', qos_policy.id),
            ('port', _testport.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'qos_policy_id': qos_policy.id,
        }
        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_data_plane_status(self):
        _testport = network_fakes.create_one_port(
            {'data_plane_status': None})
        self.network.find_port = mock.Mock(return_value=_testport)
        arglist = [
            '--data-plane-status', 'ACTIVE',
            _testport.name,
        ]
        verifylist = [
            ('data_plane_status', 'ACTIVE'),
            ('port', _testport.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'data_plane_status': 'ACTIVE',
        }

        self.network.update_port.assert_called_once_with(_testport, **attrs)
        self.assertIsNone(result)

    def test_set_port_invalid_data_plane_status_value(self):
        arglist = [
            '--data-plane-status', 'Spider-Man',
            'test-port',
        ]
        self.assertRaises(tests_utils.ParserException,
                          self.check_parser,
                          self.cmd,
                          arglist,
                          None)

    def _test_set_tags(self, with_tags=True):
        if with_tags:
            arglist = ['--tag', 'red', '--tag', 'blue']
            verifylist = [('tags', ['red', 'blue'])]
            expected_args = ['red', 'blue', 'green']
        else:
            arglist = ['--no-tag']
            verifylist = [('no_tag', True)]
            expected_args = []
        arglist.append(self._port.name)
        verifylist.append(
            ('port', self._port.name))

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        self.assertFalse(self.network.update_port.called)
        self.network.set_tags.assert_called_once_with(
            self._port,
            tests_utils.CompareBySet(expected_args))
        self.assertIsNone(result)

    def test_set_with_tags(self):
        self._test_set_tags(with_tags=True)

    def test_set_with_no_tag(self):
        self._test_set_tags(with_tags=False)

    def _test_create_with_numa_affinity_policy(self, policy):
        arglist = [
            '--numa-policy-%s' % policy,
            self._port.id,
        ]
        verifylist = [
            ('numa_policy_%s' % policy, True),
            ('port', self._port.id,)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        self.cmd.take_action(parsed_args)

        self.network.update_port.assert_called_once_with(
            self._port, **{'numa_affinity_policy': policy})

    def test_create_with_numa_affinity_policy_required(self):
        self._test_create_with_numa_affinity_policy('required')

    def test_create_with_numa_affinity_policy_preferred(self):
        self._test_create_with_numa_affinity_policy('preferred')

    def test_create_with_numa_affinity_policy_legacy(self):
        self._test_create_with_numa_affinity_policy('legacy')


class TestShowPort(TestPort):

    # The port to show.
    _port = network_fakes.create_one_port()
    columns, data = TestPort._get_common_cols_data(_port)

    def setUp(self):
        super(TestShowPort, self).setUp()

        self.network.find_port = mock.Mock(return_value=self._port)

        # Get the command object to test
        self.cmd = port.ShowPort(self.app, self.namespace)

    def test_show_no_options(self):
        arglist = []
        verifylist = []

        self.assertRaises(tests_utils.ParserException,
                          self.check_parser, self.cmd, arglist, verifylist)

    def test_show_all_options(self):
        arglist = [
            self._port.name,
        ]
        verifylist = [
            ('port', self._port.name),
        ]
        parsed_args = self.check_parser(self.cmd, arglist, verifylist)

        columns, data = self.cmd.take_action(parsed_args)

        self.network.find_port.assert_called_once_with(
            self._port.name, ignore_missing=False)

        self.assertEqual(set(self.columns), set(columns))
        self.assertCountEqual(self.data, data)


class TestUnsetPort(TestPort):

    def setUp(self):
        super(TestUnsetPort, self).setUp()
        self._testport = network_fakes.create_one_port(
            {'fixed_ips': [{'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
                            'ip_address': '0.0.0.1'},
                           {'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
                            'ip_address': '1.0.0.0'}],
             'binding:profile': {'batman': 'Joker', 'Superman': 'LexLuthor'},
             'tags': ['green', 'red'], })
        self.fake_subnet = network_fakes.FakeSubnet.create_one_subnet(
            {'id': '042eb10a-3a18-4658-ab-cf47c8d03152'})
        self.network.find_subnet = mock.Mock(return_value=self.fake_subnet)
        self.network.find_port = mock.Mock(return_value=self._testport)
        self.network.update_port = mock.Mock(return_value=None)
        self.network.set_tags = mock.Mock(return_value=None)
        # Get the command object to test
        self.cmd = port.UnsetPort(self.app, self.namespace)

    def test_unset_port_parameters(self):
        arglist = [
            '--fixed-ip',
            'subnet=042eb10a-3a18-4658-ab-cf47c8d03152,ip-address=1.0.0.0',
            '--binding-profile', 'Superman',
            '--qos-policy',
            '--host',
            self._testport.name,
        ]
        verifylist = [
            ('fixed_ip', [{
                'subnet': '042eb10a-3a18-4658-ab-cf47c8d03152',
                'ip-address': '1.0.0.0'}]),
            ('binding_profile', ['Superman']),
            ('qos_policy', True),
            ('host', True)
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'fixed_ips': [{
                'subnet_id': '042eb10a-3a18-4658-ab-cf47c8d03152',
                'ip_address': '0.0.0.1'}],
            'binding:profile': {'batman': 'Joker'},
            'qos_policy_id': None,
            'binding:host_id': None
        }
        self.network.update_port.assert_called_once_with(
            self._testport, **attrs)
        self.assertIsNone(result)

    def test_unset_port_fixed_ip_not_existent(self):
        arglist = [
            '--fixed-ip', 'ip-address=1.0.0.1',
            '--binding-profile', 'Superman',
            self._testport.name,
        ]
        verifylist = [
            ('fixed_ip', [{'ip-address': '1.0.0.1'}]),
            ('binding_profile', ['Superman']),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        self.assertRaises(exceptions.CommandError,
                          self.cmd.take_action,
                          parsed_args)

    def test_unset_port_binding_profile_not_existent(self):
        arglist = [
            '--fixed-ip', 'ip-address=1.0.0.0',
            '--binding-profile', 'Neo',
            self._testport.name,
        ]
        verifylist = [
            ('fixed_ip', [{'ip-address': '1.0.0.0'}]),
            ('binding_profile', ['Neo']),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        self.assertRaises(exceptions.CommandError,
                          self.cmd.take_action,
                          parsed_args)

    def test_unset_security_group(self):
        _fake_sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
        _fake_sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
        _fake_port = network_fakes.create_one_port(
            {'security_group_ids': [_fake_sg1.id, _fake_sg2.id]})
        self.network.find_port = mock.Mock(return_value=_fake_port)
        self.network.find_security_group = mock.Mock(return_value=_fake_sg2)
        arglist = [
            '--security-group', _fake_sg2.id,
            _fake_port.name,
        ]
        verifylist = [
            ('security_group_ids', [_fake_sg2.id]),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'security_group_ids': [_fake_sg1.id]
        }
        self.network.update_port.assert_called_once_with(
            _fake_port, **attrs)
        self.assertIsNone(result)

    def test_unset_port_security_group_not_existent(self):
        _fake_sg1 = network_fakes.FakeSecurityGroup.create_one_security_group()
        _fake_sg2 = network_fakes.FakeSecurityGroup.create_one_security_group()
        _fake_port = network_fakes.create_one_port(
            {'security_group_ids': [_fake_sg1.id]})
        self.network.find_security_group = mock.Mock(return_value=_fake_sg2)
        arglist = [
            '--security-group', _fake_sg2.id,
            _fake_port.name,
        ]
        verifylist = [
            ('security_group_ids', [_fake_sg2.id]),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        self.assertRaises(exceptions.CommandError,
                          self.cmd.take_action,
                          parsed_args)

    def test_unset_port_allowed_address_pair(self):
        _fake_port = network_fakes.create_one_port(
            {'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
        self.network.find_port = mock.Mock(return_value=_fake_port)
        arglist = [
            '--allowed-address', 'ip-address=192.168.1.123',
            _fake_port.name,
        ]
        verifylist = [
            ('allowed_address_pairs', [{'ip-address': '192.168.1.123'}]),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'allowed_address_pairs': [],
        }

        self.network.update_port.assert_called_once_with(_fake_port, **attrs)
        self.assertIsNone(result)

    def test_unset_port_allowed_address_pair_not_existent(self):
        _fake_port = network_fakes.create_one_port(
            {'allowed_address_pairs': [{'ip_address': '192.168.1.123'}]})
        self.network.find_port = mock.Mock(return_value=_fake_port)
        arglist = [
            '--allowed-address', 'ip-address=192.168.1.45',
            _fake_port.name,
        ]
        verifylist = [
            ('allowed_address_pairs', [{'ip-address': '192.168.1.45'}]),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        self.assertRaises(exceptions.CommandError,
                          self.cmd.take_action,
                          parsed_args)

    def test_unset_port_data_plane_status(self):
        _fake_port = network_fakes.create_one_port(
            {'data_plane_status': 'ACTIVE'})
        self.network.find_port = mock.Mock(return_value=_fake_port)
        arglist = [
            '--data-plane-status',
            _fake_port.name,
        ]
        verifylist = [
            ('data_plane_status', True),
            ('port', _fake_port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'data_plane_status': None,
        }

        self.network.update_port.assert_called_once_with(_fake_port, **attrs)
        self.assertIsNone(result)

    def _test_unset_tags(self, with_tags=True):
        if with_tags:
            arglist = ['--tag', 'red', '--tag', 'blue']
            verifylist = [('tags', ['red', 'blue'])]
            expected_args = ['green']
        else:
            arglist = ['--all-tag']
            verifylist = [('all_tag', True)]
            expected_args = []
        arglist.append(self._testport.name)
        verifylist.append(
            ('port', self._testport.name))

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        self.assertFalse(self.network.update_port.called)
        self.network.set_tags.assert_called_once_with(
            self._testport,
            tests_utils.CompareBySet(expected_args))
        self.assertIsNone(result)

    def test_unset_with_tags(self):
        self._test_unset_tags(with_tags=True)

    def test_unset_with_all_tag(self):
        self._test_unset_tags(with_tags=False)

    def test_unset_numa_affinity_policy(self):
        _fake_port = network_fakes.create_one_port(
            {'numa_affinity_policy': 'required'})
        self.network.find_port = mock.Mock(return_value=_fake_port)
        arglist = [
            '--numa-policy',
            _fake_port.name,
        ]
        verifylist = [
            ('numa_policy', True),
            ('port', _fake_port.name),
        ]

        parsed_args = self.check_parser(self.cmd, arglist, verifylist)
        result = self.cmd.take_action(parsed_args)

        attrs = {
            'numa_affinity_policy': None,
        }

        self.network.update_port.assert_called_once_with(_fake_port, **attrs)
        self.assertIsNone(result)
