# Copyright 2012 OpenStack Foundation.
# All Rights Reserved
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
#

import contextlib
from io import StringIO
import itertools
import sys
from unittest import mock
import urllib.parse as urlparse

from oslo_serialization import jsonutils
from oslo_utils import encodeutils
from oslotest import base
import requests
import yaml

from neutronclient.common import constants
from neutronclient.common import exceptions
from neutronclient.common import utils
from neutronclient.neutron import v2_0 as neutronV2_0
from neutronclient.neutron.v2_0 import network
from neutronclient import shell
from neutronclient.v2_0 import client

API_VERSION = "2.0"
TOKEN = 'testtoken'
ENDURL = 'localurl'
REQUEST_ID = 'test_request_id'


@contextlib.contextmanager
def capture_std_streams():
    fake_stdout, fake_stderr = StringIO(), StringIO()
    stdout, stderr = sys.stdout, sys.stderr
    try:
        sys.stdout, sys.stderr = fake_stdout, fake_stderr
        yield fake_stdout, fake_stderr
    finally:
        sys.stdout, sys.stderr = stdout, stderr


class FakeStdout(object):

    def __init__(self):
        self.content = []

    def write(self, text):
        self.content.append(text)

    def make_string(self):
        result = ''
        for line in self.content:
            result += encodeutils.safe_decode(line, 'utf-8')
        return result


class MyRequest(requests.Request):
    def __init__(self, method=None):
        self.method = method


class MyResp(requests.Response):
    def __init__(self, status_code, headers=None, reason=None,
                 request=None, url=None):
        self.status_code = status_code
        self.headers = headers or {}
        self.reason = reason
        self.request = request or MyRequest()
        self.url = url


class MyApp(object):
    def __init__(self, _stdout):
        self.stdout = _stdout


def end_url(path, query=None):
    _url_str = ENDURL + "/v" + API_VERSION + path
    return query and _url_str + "?" + query or _url_str


class MyUrlComparator(object):
    def __init__(self, lhs, client):
        self.lhs = lhs
        self.client = client

    def __eq__(self, rhs):
        lhsp = urlparse.urlparse(self.lhs)
        rhsp = urlparse.urlparse(rhs)

        lhs_qs = urlparse.parse_qsl(lhsp.query)
        rhs_qs = urlparse.parse_qsl(rhsp.query)

        return (lhsp.scheme == rhsp.scheme and
                lhsp.netloc == rhsp.netloc and
                lhsp.path == rhsp.path and
                len(lhs_qs) == len(rhs_qs) and
                set(lhs_qs) == set(rhs_qs))

    def __str__(self):
        return self.lhs

    def __repr__(self):
        return str(self)


class MyComparator(object):
    def __init__(self, lhs, client):
        self.lhs = lhs
        self.client = client

    def _com_dict(self, lhs, rhs):
        if len(lhs) != len(rhs):
            return False
        for key, value in lhs.items():
            if key not in rhs:
                return False
            rhs_value = rhs[key]
            if not self._com(value, rhs_value):
                return False
        return True

    def _com_list(self, lhs, rhs):
        if len(lhs) != len(rhs):
            return False
        for lhs_value in lhs:
            if lhs_value not in rhs:
                return False
        return True

    def _com(self, lhs, rhs):
        if lhs is None:
            return rhs is None
        if isinstance(lhs, dict):
            if not isinstance(rhs, dict):
                return False
            return self._com_dict(lhs, rhs)
        if isinstance(lhs, list):
            if not isinstance(rhs, list):
                return False
            return self._com_list(lhs, rhs)
        if isinstance(lhs, tuple):
            if not isinstance(rhs, tuple):
                return False
            return self._com_list(lhs, rhs)
        return lhs == rhs

    def __eq__(self, rhs):
        if self.client:
            rhs = self.client.deserialize(rhs, 200)
        return self._com(self.lhs, rhs)

    def __repr__(self):
        if self.client:
            return self.client.serialize(self.lhs)
        return str(self.lhs)


class ContainsKeyValue(object):
    """Checks whether key/value pair(s) are included in a dict parameter.

    This class just checks whether specifid key/value pairs passed in
    __init__() are included in a dict parameter. The comparison does not
    fail even if other key/value pair(s) exists in a target dict.
    """

    def __init__(self, expected):
        self._expected = expected

    def __eq__(self, other):
        if not isinstance(other, dict):
            return False
        for key, value in self._expected.items():
            if key not in other:
                return False
            if other[key] != value:
                return False
        return True

    def __repr__(self):
        return ('<%s (expected: %s)>' %
                (self.__class__.__name__, self._expected))


class IsA(object):
    """Checks whether the parameter is of specific type."""

    def __init__(self, expected_type):
        self._expected_type = expected_type

    def __eq__(self, other):
        return isinstance(other, self._expected_type)

    def __repr__(self):
        return ('<%s (expected: %s)>' %
                (self.__class__.__name__, self._expected_type))


class CLITestV20Base(base.BaseTestCase):

    test_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
    id_field = 'id'

    non_admin_status_resources = []

    def _find_resourceid(self, client, resource, name_or_id,
                         cmd_resource=None, parent_id=None):
        return name_or_id

    def setUp(self, plurals=None):
        """Prepare the test environment."""
        super(CLITestV20Base, self).setUp()
        client.Client.EXTED_PLURALS.update(constants.PLURALS)
        if plurals is not None:
            client.Client.EXTED_PLURALS.update(plurals)
        self.metadata = {'plurals': client.Client.EXTED_PLURALS}
        self.endurl = ENDURL
        self.fake_stdout = FakeStdout()

        self.addCleanup(mock.patch.stopall)
        mock.patch('sys.stdout', new=self.fake_stdout).start()
        mock.patch('neutronclient.neutron.v2_0.find_resourceid_by_name_or_id',
                   new=self._find_resourceid).start()
        mock.patch('neutronclient.neutron.v2_0.find_resourceid_by_id',
                   new=self._find_resourceid).start()

        self.client = client.Client(token=TOKEN, endpoint_url=self.endurl)

    def register_non_admin_status_resource(self, resource_name):
        # TODO(amotoki):
        # It is recommended to define
        # "non_admin_status_resources in each test class rather than
        # using register_non_admin_status_resource method.

        # If we change self.non_admin_status_resources like this,
        # we need to ensure this should be an instance variable
        # to avoid changing the class variable.
        if (id(self.non_admin_status_resources) ==
                id(self.__class__.non_admin_status_resources)):
            self.non_admin_status_resources = (self.__class__.
                                               non_admin_status_resources[:])
        self.non_admin_status_resources.append(resource_name)

    def _test_create_resource(self, resource, cmd, name, myid, args,
                              position_names, position_values,
                              tenant_id=None, tags=None, admin_state_up=True,
                              extra_body=None, cmd_resource=None,
                              parent_id=None, no_api_call=False,
                              expected_exception=None,
                              **kwargs):
        if not cmd_resource:
            cmd_resource = resource
        if (resource in self.non_admin_status_resources):
            body = {resource: {}, }
        else:
            body = {resource: {'admin_state_up': admin_state_up, }, }
        if tenant_id:
            body[resource].update({'tenant_id': tenant_id})
        if tags:
            body[resource].update({'tags': tags})
        if extra_body:
            body[resource].update(extra_body)
        body[resource].update(kwargs)

        for i in range(len(position_names)):
            body[resource].update({position_names[i]: position_values[i]})
        ress = {resource:
                {self.id_field: myid}, }
        if name:
            ress[resource].update({'name': name})
        resstr = self.client.serialize(ress)
        # url method body
        resource_plural = self.client.get_resource_plural(cmd_resource)
        path = getattr(self.client, resource_plural + "_path")
        if parent_id:
            path = path % parent_id
        mock_body = MyComparator(body, self.client)

        cmd_parser = cmd.get_parser('create_' + resource)
        resp = (MyResp(200), resstr)

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient, "request",
                                  return_value=resp) as mock_request:
            if expected_exception:
                self.assertRaises(expected_exception,
                                  shell.run_command, cmd, cmd_parser, args)
            else:
                shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        if not no_api_call:
            mock_request.assert_called_once_with(
                end_url(path), 'POST',
                body=mock_body,
                headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
        if not expected_exception:
            _str = self.fake_stdout.make_string()
            self.assertIn(myid, _str)
            if name:
                self.assertIn(name, _str)

    def _test_list_columns(self, cmd, resources,
                           resources_out, args=('-f', 'json'),
                           cmd_resources=None, parent_id=None):
        if not cmd_resources:
            cmd_resources = resources

        resstr = self.client.serialize(resources_out)

        path = getattr(self.client, cmd_resources + "_path")
        if parent_id:
            path = path % parent_id
        cmd_parser = cmd.get_parser("list_" + cmd_resources)
        resp = (MyResp(200), resstr)

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient, "request",
                                  return_value=resp) as mock_request:
            shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        mock_request.assert_called_once_with(
            end_url(path), 'GET',
            body=None,
            headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))

    def _test_list_resources(self, resources, cmd, detail=False, tags=(),
                             fields_1=(), fields_2=(), page_size=None,
                             sort_key=(), sort_dir=(), response_contents=None,
                             base_args=None, path=None, cmd_resources=None,
                             parent_id=None, output_format=None, query=""):
        if not cmd_resources:
            cmd_resources = resources
        if response_contents is None:
            contents = [{self.id_field: 'myid1', },
                        {self.id_field: 'myid2', }, ]
        else:
            contents = response_contents
        reses = {resources: contents}
        resstr = self.client.serialize(reses)
        # url method body
        args = base_args if base_args is not None else []
        if detail:
            args.append('-D')
        if fields_1:
            for field in fields_1:
                args.append('--fields')
                args.append(field)

        if tags:
            args.append('--')
            args.append("--tag")
        for tag in tags:
            args.append(tag)
            tag_query = urlparse.urlencode(
                {'tag': encodeutils.safe_encode(tag)})
            if query:
                query += "&" + tag_query
            else:
                query = tag_query
        if (not tags) and fields_2:
            args.append('--')
        if fields_2:
            args.append("--fields")
            for field in fields_2:
                args.append(field)
        if detail:
            query = query and query + '&verbose=True' or 'verbose=True'
        for field in itertools.chain(fields_1, fields_2):
            if query:
                query += "&fields=" + field
            else:
                query = "fields=" + field
        if page_size:
            args.append("--page-size")
            args.append(str(page_size))
            if query:
                query += "&limit=%s" % page_size
            else:
                query = "limit=%s" % page_size
        if sort_key:
            for key in sort_key:
                args.append('--sort-key')
                args.append(key)
                if query:
                    query += '&'
                query += 'sort_key=%s' % key
        if sort_dir:
            len_diff = len(sort_key) - len(sort_dir)
            if len_diff > 0:
                sort_dir = tuple(sort_dir) + ('asc',) * len_diff
            elif len_diff < 0:
                sort_dir = sort_dir[:len(sort_key)]
            for dir in sort_dir:
                args.append('--sort-dir')
                args.append(dir)
                if query:
                    query += '&'
                query += 'sort_dir=%s' % dir
        if path is None:
            path = getattr(self.client, cmd_resources + "_path")
            if parent_id:
                path = path % parent_id
        if output_format:
            args.append('-f')
            args.append(output_format)
        cmd_parser = cmd.get_parser("list_" + cmd_resources)
        resp = (MyResp(200), resstr)

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient, "request",
                                  return_value=resp) as mock_request:
            shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        mock_request.assert_called_once_with(
            MyUrlComparator(end_url(path, query), self.client),
            'GET',
            body=None,
            headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
        _str = self.fake_stdout.make_string()
        if response_contents is None:
            self.assertIn('myid1', _str)
        return _str

    def _test_list_resources_with_pagination(self, resources, cmd,
                                             base_args=None,
                                             cmd_resources=None,
                                             parent_id=None, query=""):
        if not cmd_resources:
            cmd_resources = resources

        path = getattr(self.client, cmd_resources + "_path")
        if parent_id:
            path = path % parent_id
        fake_query = "marker=myid2&limit=2"
        reses1 = {resources: [{'id': 'myid1', },
                              {'id': 'myid2', }],
                  '%s_links' % resources: [{'href': end_url(path, fake_query),
                                            'rel': 'next'}]}
        reses2 = {resources: [{'id': 'myid3', },
                              {'id': 'myid4', }]}
        resstr1 = self.client.serialize(reses1)
        resstr2 = self.client.serialize(reses2)
        cmd_parser = cmd.get_parser("list_" + cmd_resources)
        args = base_args if base_args is not None else []
        mock_request_calls = [
            mock.call(
                end_url(path, query), 'GET',
                body=None,
                headers=ContainsKeyValue({'X-Auth-Token': TOKEN})),
            mock.call(
                MyUrlComparator(end_url(path, fake_query),
                                self.client), 'GET',
                body=None,
                headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))]
        mock_request_resp = [(MyResp(200), resstr1), (MyResp(200), resstr2)]

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient,
                                  "request") as mock_request:
            mock_request.side_effect = mock_request_resp
            shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        self.assertEqual(2, mock_request.call_count)
        mock_request.assert_has_calls(mock_request_calls)

    def _test_update_resource(self, resource, cmd, myid, args, extrafields,
                              cmd_resource=None, parent_id=None):
        if not cmd_resource:
            cmd_resource = resource

        body = {resource: extrafields}
        path = getattr(self.client, cmd_resource + "_path")
        if parent_id:
            path = path % (parent_id, myid)
        else:
            path = path % myid
        mock_body = MyComparator(body, self.client)

        cmd_parser = cmd.get_parser("update_" + cmd_resource)
        resp = (MyResp(204), None)

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient, "request",
                                  return_value=resp) as mock_request:
            shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        mock_request.assert_called_once_with(
            MyUrlComparator(end_url(path), self.client),
            'PUT',
            body=mock_body,
            headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
        _str = self.fake_stdout.make_string()
        self.assertIn(myid, _str)

    def _test_show_resource(self, resource, cmd, myid, args, fields=(),
                            cmd_resource=None, parent_id=None):
        if not cmd_resource:
            cmd_resource = resource

        query = "&".join(["fields=%s" % field for field in fields])
        expected_res = {resource:
                        {self.id_field: myid,
                         'name': 'myname', }, }
        resstr = self.client.serialize(expected_res)
        path = getattr(self.client, cmd_resource + "_path")
        if parent_id:
            path = path % (parent_id, myid)
        else:
            path = path % myid
        cmd_parser = cmd.get_parser("show_" + cmd_resource)
        resp = (MyResp(200), resstr)

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient, "request",
                                  return_value=resp) as mock_request:
            shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        mock_request.assert_called_once_with(
            end_url(path, query), 'GET',
            body=None,
            headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
        _str = self.fake_stdout.make_string()
        self.assertIn(myid, _str)
        self.assertIn('myname', _str)

    def _test_set_path_and_delete(self, path, parent_id, myid,
                                  mock_request_calls, mock_request_returns,
                                  delete_fail=False):
        return_val = 404 if delete_fail else 204
        if parent_id:
            path = path % (parent_id, myid)
        else:
            path = path % (myid)
        mock_request_returns.append((MyResp(return_val), None))
        mock_request_calls.append(mock.call(
            end_url(path), 'DELETE',
            body=None,
            headers=ContainsKeyValue({'X-Auth-Token': TOKEN})))

    def _test_delete_resource(self, resource, cmd, myid, args,
                              cmd_resource=None, parent_id=None,
                              extra_id=None, delete_fail=False):
        mock_request_calls = []
        mock_request_returns = []
        if not cmd_resource:
            cmd_resource = resource
        path = getattr(self.client, cmd_resource + "_path")
        self._test_set_path_and_delete(path, parent_id, myid,
                                       mock_request_calls,
                                       mock_request_returns)
        # extra_id is used to test for bulk_delete
        if extra_id:
            self._test_set_path_and_delete(path, parent_id, extra_id,
                                           mock_request_calls,
                                           mock_request_returns,
                                           delete_fail)
        cmd_parser = cmd.get_parser("delete_" + cmd_resource)

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient,
                                  "request") as mock_request:
            mock_request.side_effect = mock_request_returns
            shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        mock_request.assert_has_calls(mock_request_calls)
        _str = self.fake_stdout.make_string()
        self.assertIn(myid, _str)
        if extra_id:
            self.assertIn(extra_id, _str)

    def _test_update_resource_action(self, resource, cmd, myid, action, args,
                                     body, expected_code=200, retval=None,
                                     cmd_resource=None):
        if not cmd_resource:
            cmd_resource = resource
        path = getattr(self.client, cmd_resource + "_path")
        path_action = '%s/%s' % (myid, action)
        cmd_parser = cmd.get_parser("update_" + cmd_resource)
        resp = (MyResp(expected_code), retval)

        with mock.patch.object(cmd, "get_client",
                               return_value=self.client) as mock_get_client, \
                mock.patch.object(self.client.httpclient, "request",
                                  return_value=resp) as mock_request:
            shell.run_command(cmd, cmd_parser, args)

        self.assert_mock_multiple_calls_with_same_arguments(
            mock_get_client, mock.call(), None)
        mock_request.assert_called_once_with(
            end_url(path % path_action), 'PUT',
            body=MyComparator(body, self.client),
            headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
        _str = self.fake_stdout.make_string()
        self.assertIn(myid, _str)

    def assert_mock_multiple_calls_with_same_arguments(
            self, mocked_method, expected_call, count):
        if count is None:
            self.assertLessEqual(1, mocked_method.call_count)
        else:
            self.assertEqual(count, mocked_method.call_count)
        mocked_method.assert_has_calls(
            [expected_call] * mocked_method.call_count)


class TestListCommand(neutronV2_0.ListCommand):
    resource = 'test_resource'
    filter_attrs = [
        'name',
        'admin_state_up',
        {'name': 'foo', 'help': 'non-boolean attribute foo'},
        {'name': 'bar', 'help': 'boolean attribute bar',
         'boolean': True},
        {'name': 'baz', 'help': 'integer attribute baz',
         'argparse_kwargs': {'choices': ['baz1', 'baz2']}},
    ]


class ListCommandTestCase(CLITestV20Base):

    def setUp(self):
        super(ListCommandTestCase, self).setUp()
        self.client.extend_list('test_resources', '/test_resources', None)
        setattr(self.client, 'test_resources_path', '/test_resources')

    def _test_list_resources_filter_params(self, base_args='', query=''):
        resources = 'test_resources'
        cmd = TestListCommand(MyApp(sys.stdout), None)
        self._test_list_resources(resources, cmd,
                                  base_args=base_args.split(),
                                  query=query)

    def _test_list_resources_with_arg_error(self, base_args=''):
        resources = 'test_resources'
        cmd = TestListCommand(MyApp(sys.stdout), None)
        # argparse parse error leads to SystemExit
        self.assertRaises(SystemExit,
                          self._test_list_resources,
                          resources, cmd,
                          base_args=base_args.split())

    def test_list_resources_without_filter(self):
        self._test_list_resources_filter_params()

    def test_list_resources_use_default_filter(self):
        self._test_list_resources_filter_params(
            base_args='--name val1 --admin-state-up False',
            query='name=val1&admin_state_up=False')

    def test_list_resources_use_custom_filter(self):
        self._test_list_resources_filter_params(
            base_args='--foo FOO --bar True',
            query='foo=FOO&bar=True')

    def test_list_resources_boolean_check_default_filter(self):
        self._test_list_resources_filter_params(
            base_args='--admin-state-up True', query='admin_state_up=True')
        self._test_list_resources_filter_params(
            base_args='--admin-state-up False', query='admin_state_up=False')
        self._test_list_resources_with_arg_error(
            base_args='--admin-state-up non-true-false')

    def test_list_resources_boolean_check_custom_filter(self):
        self._test_list_resources_filter_params(
            base_args='--bar True', query='bar=True')
        self._test_list_resources_filter_params(
            base_args='--bar False', query='bar=False')
        self._test_list_resources_with_arg_error(
            base_args='--bar non-true-false')

    def test_list_resources_argparse_kwargs(self):
        self._test_list_resources_filter_params(
            base_args='--baz baz1', query='baz=baz1')
        self._test_list_resources_filter_params(
            base_args='--baz baz2', query='baz=baz2')
        self._test_list_resources_with_arg_error(
            base_args='--bar non-choice')


class ClientV2TestJson(CLITestV20Base):
    def test_do_request_unicode(self):
        unicode_text = u'\u7f51\u7edc'
        # url with unicode
        action = u'/test'
        expected_action = action
        # query string with unicode
        params = {'test': unicode_text}
        expect_query = urlparse.urlencode(utils.safe_encode_dict(params))
        # request body with unicode
        body = params
        expect_body = self.client.serialize(body)
        self.client.httpclient.auth_token = encodeutils.safe_encode(
            unicode_text)
        expected_auth_token = encodeutils.safe_encode(unicode_text)
        resp_headers = {'x-openstack-request-id': REQUEST_ID}
        resp = (MyResp(200, resp_headers), expect_body)

        with mock.patch.object(self.client.httpclient, "request",
                               return_value=resp) as mock_request:
            result = self.client.do_request('PUT', action, body=body,
                                            params=params)

        mock_request.assert_called_once_with(
            end_url(expected_action, query=expect_query),
            'PUT', body=expect_body,
            headers=ContainsKeyValue({'X-Auth-Token': expected_auth_token}))
        # test response with unicode
        self.assertEqual(body, result)

    def test_do_request_error_without_response_body(self):
        params = {'test': 'value'}
        expect_query = urlparse.urlencode(params)
        self.client.httpclient.auth_token = 'token'
        resp_headers = {'x-openstack-request-id': REQUEST_ID}
        resp = (MyResp(400, headers=resp_headers, reason='An error'), '')

        with mock.patch.object(self.client.httpclient, "request",
                               return_value=resp) as mock_request:
            error = self.assertRaises(exceptions.NeutronClientException,
                                      self.client.do_request, 'PUT', '/test',
                                      body='', params=params)

        mock_request.assert_called_once_with(
            MyUrlComparator(end_url('/test', query=expect_query), self.client),
            'PUT', body='',
            headers=ContainsKeyValue({'X-Auth-Token': 'token'}))
        expected_error = "An error\nNeutron server returns " \
                         "request_ids: %s" % [REQUEST_ID]
        self.assertEqual(expected_error, str(error))

    def test_do_request_with_long_uri_exception(self):
        long_string = 'x' * 8200                  # 8200 > MAX_URI_LEN:8192
        params = {'id': long_string}
        exception = self.assertRaises(exceptions.RequestURITooLong,
                                      self.client.do_request,
                                      'GET', '/test', body='', params=params)
        self.assertNotEqual(0, exception.excess)

    def test_do_request_request_ids(self):
        params = {'test': 'value'}
        expect_query = urlparse.urlencode(params)
        self.client.httpclient.auth_token = 'token'
        body = params
        expect_body = self.client.serialize(body)
        resp_headers = {'x-openstack-request-id': REQUEST_ID}
        resp = (MyResp(200, resp_headers), expect_body)

        with mock.patch.object(self.client.httpclient, "request",
                               return_value=resp) as mock_request:
            result = self.client.do_request('PUT', '/test', body=body,
                                            params=params)

        mock_request.assert_called_once_with(
            MyUrlComparator(end_url('/test', query=expect_query), self.client),
            'PUT', body=expect_body,
            headers=ContainsKeyValue({'X-Auth-Token': 'token'}))

        self.assertEqual(body, result)
        self.assertEqual([REQUEST_ID], result.request_ids)

    def test_list_request_ids_with_retrieve_all_true(self):
        path = '/test'
        resources = 'tests'
        fake_query = "marker=myid2&limit=2"
        reses1 = {resources: [{'id': 'myid1', },
                              {'id': 'myid2', }],
                  '%s_links' % resources: [{'href': end_url(path, fake_query),
                                            'rel': 'next'}]}
        reses2 = {resources: [{'id': 'myid3', },
                              {'id': 'myid4', }]}
        resstr1 = self.client.serialize(reses1)
        resstr2 = self.client.serialize(reses2)
        resp_headers = {'x-openstack-request-id': REQUEST_ID}
        resp = [(MyResp(200, resp_headers), resstr1),
                (MyResp(200, resp_headers), resstr2)]
        with mock.patch.object(self.client.httpclient, "request",
                               side_effect=resp) as mock_request:
            result = self.client.list(resources, path)

        self.assertEqual(2, mock_request.call_count)
        mock_request.assert_has_calls([
            mock.call(
                end_url(path, ""), 'GET',
                body=None,
                headers=ContainsKeyValue({'X-Auth-Token': TOKEN})),
            mock.call(
                MyUrlComparator(end_url(path, fake_query),
                                self.client), 'GET',
                body=None,
                headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))])

        self.assertEqual([REQUEST_ID, REQUEST_ID], result.request_ids)

    def test_list_request_ids_with_retrieve_all_false(self):
        path = '/test'
        resources = 'tests'
        fake_query = "marker=myid2&limit=2"
        reses1 = {resources: [{'id': 'myid1', },
                              {'id': 'myid2', }],
                  '%s_links' % resources: [{'href': end_url(path, fake_query),
                                            'rel': 'next'}]}
        reses2 = {resources: [{'id': 'myid3', },
                              {'id': 'myid4', }]}
        resstr1 = self.client.serialize(reses1)
        resstr2 = self.client.serialize(reses2)
        resp_headers = {'x-openstack-request-id': REQUEST_ID}
        resp = [(MyResp(200, resp_headers), resstr1),
                (MyResp(200, resp_headers), resstr2)]
        with mock.patch.object(self.client.httpclient, "request",
                               side_effect=resp) as mock_request:
            result = self.client.list(resources, path, retrieve_all=False)
            next(result)
            self.assertEqual([REQUEST_ID], result.request_ids)
            next(result)
            self.assertEqual([REQUEST_ID, REQUEST_ID], result.request_ids)

        self.assertEqual(2, mock_request.call_count)
        mock_request.assert_has_calls([
            mock.call(
                end_url(path, ""), 'GET',
                body=None,
                headers=ContainsKeyValue({'X-Auth-Token': TOKEN})),
            mock.call(
                MyUrlComparator(end_url(path, fake_query), self.client), 'GET',
                body=None,
                headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))])

    def test_deserialize_without_data(self):
        data = u''
        result = self.client.deserialize(data, 200)
        self.assertEqual(data, result)

    def test_update_resource(self):
        params = {'test': 'value'}
        expect_query = urlparse.urlencode(params)
        self.client.httpclient.auth_token = 'token'
        body = params
        expect_body = self.client.serialize(body)
        resp_headers = {'x-openstack-request-id': REQUEST_ID}
        resp = (MyResp(200, resp_headers), expect_body)

        with mock.patch.object(self.client.httpclient, "request",
                               return_value=resp) as mock_request:
            result = self.client._update_resource('/test', body=body,
                                                  params=params)

        mock_request.assert_called_once_with(
            MyUrlComparator(end_url('/test', query=expect_query), self.client),
            'PUT', body=expect_body,
            headers=ContainsKeyValue({'X-Auth-Token': 'token'}))
        self.assertNotIn('If-Match', mock_request.call_args[1]['headers'])

        self.assertEqual(body, result)
        self.assertEqual([REQUEST_ID], result.request_ids)

    def test_update_resource_with_revision_number(self):
        params = {'test': 'value'}
        expect_query = urlparse.urlencode(params)
        self.client.httpclient.auth_token = 'token'
        body = params
        expect_body = self.client.serialize(body)
        resp_headers = {'x-openstack-request-id': REQUEST_ID}
        resp = (MyResp(200, resp_headers), expect_body)

        with mock.patch.object(self.client.httpclient, "request",
                               return_value=resp) as mock_request:
            result = self.client._update_resource('/test', body=body,
                                                  params=params,
                                                  revision_number=1)

        mock_request.assert_called_once_with(
            MyUrlComparator(end_url('/test', query=expect_query), self.client),
            'PUT', body=expect_body,
            headers=ContainsKeyValue(
                {'X-Auth-Token': 'token', 'If-Match': 'revision_number=1'}))

        self.assertEqual(body, result)
        self.assertEqual([REQUEST_ID], result.request_ids)


class CLITestV20ExceptionHandler(CLITestV20Base):

    def _test_exception_handler_v20(
            self, expected_exception, status_code, expected_msg,
            error_type=None, error_msg=None, error_detail=None,
            request_id=None, error_content=None):

        resp = MyResp(status_code, {'x-openstack-request-id': request_id})
        if request_id is not None:
            expected_msg += "\nNeutron server returns " \
                            "request_ids: %s" % [request_id]
        if error_content is None:
            error_content = {'NeutronError': {'type': error_type,
                                              'message': error_msg,
                                              'detail': error_detail}}
        expected_content = self.client._convert_into_with_meta(error_content,
                                                               resp)

        e = self.assertRaises(expected_exception,
                              client.exception_handler_v20,
                              status_code, expected_content)
        self.assertEqual(status_code, e.status_code)
        self.assertEqual(expected_exception.__name__,
                         e.__class__.__name__)

        if expected_msg is None:
            if error_detail:
                expected_msg = '\n'.join([error_msg, error_detail])
            else:
                expected_msg = error_msg
        self.assertEqual(expected_msg, e.message)

    def test_exception_handler_v20_ip_address_in_use(self):
        err_msg = ('Unable to complete operation for network '
                   'fake-network-uuid. The IP address fake-ip is in use.')
        self._test_exception_handler_v20(
            exceptions.IpAddressInUseClient, 409, err_msg,
            'IpAddressInUse', err_msg, '', REQUEST_ID)

    def test_exception_handler_v20_neutron_known_error(self):
        known_error_map = [
            ('NetworkNotFound', exceptions.NetworkNotFoundClient, 404),
            ('PortNotFound', exceptions.PortNotFoundClient, 404),
            ('NetworkInUse', exceptions.NetworkInUseClient, 409),
            ('PortInUse', exceptions.PortInUseClient, 409),
            ('StateInvalid', exceptions.StateInvalidClient, 400),
            ('IpAddressInUse', exceptions.IpAddressInUseClient, 409),
            ('IpAddressGenerationFailure',
             exceptions.IpAddressGenerationFailureClient, 409),
            ('MacAddressInUse', exceptions.MacAddressInUseClient, 409),
            ('ExternalIpAddressExhausted',
             exceptions.ExternalIpAddressExhaustedClient, 400),
            ('OverQuota', exceptions.OverQuotaClient, 409),
            ('InvalidIpForNetwork', exceptions.InvalidIpForNetworkClient, 400),
            ('InvalidIpForSubnet', exceptions.InvalidIpForSubnetClient, 400),
            ('IpAddressAlreadyAllocated',
             exceptions.IpAddressAlreadyAllocatedClient, 400),
        ]

        error_msg = 'dummy exception message'
        error_detail = 'sample detail'
        for server_exc, client_exc, status_code in known_error_map:
            self._test_exception_handler_v20(
                client_exc, status_code,
                error_msg + '\n' + error_detail,
                server_exc, error_msg, error_detail, REQUEST_ID)

    def test_exception_handler_v20_neutron_known_error_without_detail(self):
        error_msg = 'Network not found'
        error_detail = ''
        self._test_exception_handler_v20(
            exceptions.NetworkNotFoundClient, 404,
            error_msg,
            'NetworkNotFound', error_msg, error_detail, REQUEST_ID)

    def test_exception_handler_v20_unknown_error_to_per_code_exception(self):
        for status_code, client_exc in exceptions.HTTP_EXCEPTION_MAP.items():
            error_msg = 'Unknown error'
            error_detail = 'This is detail'
            self._test_exception_handler_v20(
                client_exc, status_code,
                error_msg + '\n' + error_detail,
                'UnknownError', error_msg, error_detail, [REQUEST_ID])

    def test_exception_handler_v20_neutron_unknown_status_code(self):
        error_msg = 'Unknown error'
        error_detail = 'This is detail'
        self._test_exception_handler_v20(
            exceptions.NeutronClientException, 501,
            error_msg + '\n' + error_detail,
            'UnknownError', error_msg, error_detail, REQUEST_ID)

    def test_exception_handler_v20_bad_neutron_error(self):
        for status_code, client_exc in exceptions.HTTP_EXCEPTION_MAP.items():
            error_content = {'NeutronError': {'unknown_key': 'UNKNOWN'}}
            self._test_exception_handler_v20(
                client_exc, status_code,
                expected_msg="{'unknown_key': 'UNKNOWN'}",
                error_content=error_content,
                request_id=REQUEST_ID)

    def test_exception_handler_v20_error_dict_contains_message(self):
        error_content = {'message': 'This is an error message'}
        for status_code, client_exc in exceptions.HTTP_EXCEPTION_MAP.items():
            self._test_exception_handler_v20(
                client_exc, status_code,
                expected_msg='This is an error message',
                error_content=error_content,
                request_id=REQUEST_ID)

    def test_exception_handler_v20_error_dict_not_contain_message(self):
        # 599 is not contained in HTTP_EXCEPTION_MAP.
        error_content = {'error': 'This is an error message'}
        expected_msg = '%s-%s' % (599, error_content)
        self._test_exception_handler_v20(
            exceptions.NeutronClientException, 599,
            expected_msg=expected_msg,
            request_id=None,
            error_content=error_content)

    def test_exception_handler_v20_default_fallback(self):
        # 599 is not contained in HTTP_EXCEPTION_MAP.
        error_content = 'This is an error message'
        expected_msg = '%s-%s' % (599, error_content)
        self._test_exception_handler_v20(
            exceptions.NeutronClientException, 599,
            expected_msg=expected_msg,
            request_id=None,
            error_content=error_content)

    def test_exception_status(self):
        e = exceptions.BadRequest()
        self.assertEqual(400, e.status_code)

        e = exceptions.BadRequest(status_code=499)
        self.assertEqual(499, e.status_code)

        # SslCertificateValidationError has no explicit status_code,
        # but should have a 'safe' defined fallback.
        e = exceptions.SslCertificateValidationError()
        self.assertIsNotNone(e.status_code)

        e = exceptions.SslCertificateValidationError(status_code=599)
        self.assertEqual(599, e.status_code)

    def test_connection_failed(self):
        self.client.httpclient.auth_token = 'token'
        excp = requests.exceptions.ConnectionError('Connection refused')

        with mock.patch.object(self.client.httpclient, "request",
                               side_effect=excp) as mock_request:
            error = self.assertRaises(exceptions.ConnectionFailed,
                                      self.client.get, '/test')

        mock_request.assert_called_once_with(
            end_url('/test'), 'GET',
            body=None,
            headers=ContainsKeyValue({'X-Auth-Token': 'token'}))
        # NB: ConnectionFailed has no explicit status_code, so this
        # tests that there is a fallback defined.
        self.assertIsNotNone(error.status_code)


class DictWithMetaTest(base.BaseTestCase):

    def test_dict_with_meta(self):
        body = {'test': 'value'}
        resp = MyResp(200, {'x-openstack-request-id': REQUEST_ID})
        obj = client._DictWithMeta(body, resp)
        self.assertEqual(body, obj)

        # Check request_ids attribute is added to obj
        self.assertTrue(hasattr(obj, 'request_ids'))
        self.assertEqual([REQUEST_ID], obj.request_ids)


class TupleWithMetaTest(base.BaseTestCase):

    def test_tuple_with_meta(self):
        body = ('test', 'value')
        resp = MyResp(200, {'x-openstack-request-id': REQUEST_ID})
        obj = client._TupleWithMeta(body, resp)
        self.assertEqual(body, obj)

        # Check request_ids attribute is added to obj
        self.assertTrue(hasattr(obj, 'request_ids'))
        self.assertEqual([REQUEST_ID], obj.request_ids)


class StrWithMetaTest(base.BaseTestCase):

    def test_str_with_meta(self):
        body = "test_string"
        resp = MyResp(200, {'x-openstack-request-id': REQUEST_ID})
        obj = client._StrWithMeta(body, resp)
        self.assertEqual(body, obj)

        # Check request_ids attribute is added to obj
        self.assertTrue(hasattr(obj, 'request_ids'))
        self.assertEqual([REQUEST_ID], obj.request_ids)


class GeneratorWithMetaTest(base.BaseTestCase):

    body = {'test': 'value'}
    resp = MyResp(200, {'x-openstack-request-id': REQUEST_ID})

    def _pagination(self, collection, path, **params):
        obj = client._DictWithMeta(self.body, self.resp)
        yield obj

    def test_generator(self):
        obj = client._GeneratorWithMeta(self._pagination, 'test_collection',
                                        'test_path', test_args='test_args')
        self.assertEqual(self.body, next(obj))

        self.assertTrue(hasattr(obj, 'request_ids'))
        self.assertEqual([REQUEST_ID], obj.request_ids)


class CLITestV20OutputFormatter(CLITestV20Base):

    def _test_create_resource_with_formatter(self, fmt):
        resource = 'network'
        cmd = network.CreateNetwork(MyApp(sys.stdout), None)
        args = ['-f', fmt, 'myname']
        position_names = ['name']
        position_values = ['myname']
        self._test_create_resource(resource, cmd, 'myname', 'myid', args,
                                   position_names, position_values)

    def test_create_resource_table(self):
        self._test_create_resource_with_formatter('table')
        print(self.fake_stdout.content)
        # table data is contains in the third element.
        data = self.fake_stdout.content[2].split('\n')
        self.assertTrue(any(' id ' in d for d in data))
        self.assertTrue(any(' name ' in d for d in data))

    def test_create_resource_json(self):
        self._test_create_resource_with_formatter('json')
        data = jsonutils.loads(self.fake_stdout.make_string())
        self.assertEqual('myname', data['name'])
        self.assertEqual('myid', data['id'])

    def test_create_resource_yaml(self):
        self._test_create_resource_with_formatter('yaml')
        data = yaml.load(self.fake_stdout.make_string())
        self.assertEqual('myname', data['name'])
        self.assertEqual('myid', data['id'])

    def _test_show_resource_with_formatter(self, fmt):
        resource = 'network'
        cmd = network.ShowNetwork(MyApp(sys.stdout), None)
        args = ['-f', fmt, '-F', 'id', '-F', 'name', 'myid']
        self._test_show_resource(resource, cmd, 'myid',
                                 args, ['id', 'name'])

    def test_show_resource_table(self):
        self._test_show_resource_with_formatter('table')
        data = self.fake_stdout.content[0].split('\n')
        self.assertTrue(any(' id ' in d for d in data))
        self.assertTrue(any(' name ' in d for d in data))

    def test_show_resource_json(self):
        self._test_show_resource_with_formatter('json')
        data = jsonutils.loads(''.join(self.fake_stdout.content))
        self.assertEqual('myname', data['name'])
        self.assertEqual('myid', data['id'])

    def test_show_resource_yaml(self):
        self._test_show_resource_with_formatter('yaml')
        data = yaml.load(''.join(self.fake_stdout.content))
        self.assertEqual('myname', data['name'])
        self.assertEqual('myid', data['id'])

    @mock.patch.object(network.ListNetwork, "extend_list")
    def _test_list_resources_with_formatter(self, fmt, mock_extend_list):
        resources = 'networks'
        cmd = network.ListNetwork(MyApp(sys.stdout), None)
        # ListNetwork has its own extend_list, so we need to stub out it
        # to avoid an extra API call.
        self._test_list_resources(resources, cmd, output_format=fmt)
        mock_extend_list.assert_called_once_with(IsA(list), mock.ANY)

    def test_list_resources_table(self):
        self._test_list_resources_with_formatter('table')
        data = self.fake_stdout.content[0].split('\n')
        self.assertTrue(any(' id ' in d for d in data))
        self.assertTrue(any(' myid1 ' in d for d in data))
        self.assertTrue(any(' myid2 ' in d for d in data))

    def test_list_resources_json(self):
        self._test_list_resources_with_formatter('json')
        data = jsonutils.loads(''.join(self.fake_stdout.content))
        self.assertEqual(['myid1', 'myid2'], [d['id'] for d in data])

    def test_list_resources_yaml(self):
        self._test_list_resources_with_formatter('yaml')
        data = yaml.load(''.join(self.fake_stdout.content))
        self.assertEqual(['myid1', 'myid2'], [d['id'] for d in data])
