#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain
#   a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.

import types

from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import exceptions as ks_exc
from keystoneclient import session as ks_session
import mock
import requests

from ceilometerclient import client
from ceilometerclient import exc
from ceilometerclient.openstack.common.apiclient import exceptions
from ceilometerclient.tests.unit import fakes
from ceilometerclient.tests.unit import utils
from ceilometerclient.v2 import client as v2client

FAKE_ENV = {
    'username': 'username',
    'password': 'password',
    'tenant_name': 'tenant_name',
    'auth_url': 'http://no.where',
    'auth_plugin': mock.Mock(),
    'ceilometer_url': 'http://no.where',
    'token': '1234',
    'user_domain_name': 'default',
    'project_domain_name': 'default',
}


class ClientTest(utils.BaseTestCase):
    @staticmethod
    def create_client(env, api_version=2, endpoint=None, exclude=[]):
        env = dict((k, v) for k, v in env.items()
                   if k not in exclude)
        with mock.patch('ceilometerclient.v2.client.Client._get_alarm_client',
                        return_value=None):
            return client.get_client(api_version, **env)

    def test_client_v2_with_session(self):
        resp = mock.Mock(status_code=200, text=b'')
        resp.json.return_value = []
        session = mock.Mock()
        session.request.return_value = resp
        c = client.get_client(2, session=session)
        c.resources.list()
        self.assertTrue(session.request.called)
        self.assertTrue(resp.json.called)

    def test_client_version(self):
        c2 = self.create_client(env=FAKE_ENV, api_version=2)
        self.assertIsInstance(c2, v2client.Client)

    def test_client_auth_lambda(self):
        env = FAKE_ENV.copy()
        env['token'] = lambda: env['token']
        self.assertIsInstance(env['token'],
                              types.FunctionType)
        c2 = self.create_client(env)
        self.assertIsInstance(c2, v2client.Client)

    def test_client_auth_non_lambda(self):
        env = FAKE_ENV.copy()
        env['token'] = "1234"
        self.assertIsInstance(env['token'], str)
        c2 = self.create_client(env)
        self.assertIsInstance(c2, v2client.Client)

    @mock.patch('keystoneclient.v2_0.client', fakes.FakeKeystone)
    def test_client_without_auth_plugin(self):
        env = FAKE_ENV.copy()
        del env['auth_plugin']
        c = self.create_client(env, api_version=2, endpoint='fake_endpoint')
        self.assertIsInstance(c.auth_plugin, client.AuthPlugin)

    def test_client_without_auth_plugin_keystone_v3(self):
        env = FAKE_ENV.copy()
        del env['auth_plugin']
        expected = {
            'username': 'username',
            'endpoint': 'http://no.where',
            'tenant_name': 'tenant_name',
            'service_type': None,
            'token': '1234',
            'endpoint_type': None,
            'region_name': None,
            'auth_url': 'http://no.where',
            'tenant_id': None,
            'insecure': None,
            'cacert': None,
            'password': 'password',
            'user_domain_name': 'default',
            'user_domain_id': None,
            'project_domain_name': 'default',
            'project_domain_id': None,
        }
        with mock.patch('ceilometerclient.client.AuthPlugin') as auth_plugin:
            self.create_client(env, api_version=2, endpoint='http://no.where')
            self.assertEqual(mock.call(**expected),
                             auth_plugin.mock_calls[0])

    def test_v2_client_timeout_invalid_value(self):
        env = FAKE_ENV.copy()
        env['timeout'] = 'abc'
        self.assertRaises(ValueError, self.create_client, env)
        env['timeout'] = '1.5'
        self.assertRaises(ValueError, self.create_client, env)

    def _test_v2_client_timeout_integer(self, timeout, expected_value):
        env = FAKE_ENV.copy()
        env['timeout'] = timeout
        expected = {
            'auth_plugin': mock.ANY,
            'timeout': expected_value,
            'original_ip': None,
            'http': None,
            'region_name': None,
            'verify': True,
            'timings': None,
            'keyring_saver': None,
            'cert': None,
            'endpoint_type': None,
            'user_agent': None,
            'debug': None,
        }
        cls = 'ceilometerclient.openstack.common.apiclient.client.HTTPClient'
        with mock.patch(cls) as mocked:
            self.create_client(env)
            mocked.assert_called_with(**expected)

    def test_v2_client_timeout_zero(self):
        self._test_v2_client_timeout_integer(0, None)

    def test_v2_client_timeout_valid_value(self):
        self._test_v2_client_timeout_integer(30, 30)

    @mock.patch.object(ks_session, 'Session')
    def test_v2_client_timeout_keystone_session(self, mocked_session):
        mocked_session.side_effect = RuntimeError('Stop!')
        env = FAKE_ENV.copy()
        env['timeout'] = 5
        del env['auth_plugin']
        del env['token']
        client = self.create_client(env)
        self.assertRaises(RuntimeError, client.alarms.list)
        args, kwargs = mocked_session.call_args
        self.assertEqual(5, kwargs['timeout'])

    def test_v2_client_cacert_in_verify(self):
        env = FAKE_ENV.copy()
        env['cacert'] = '/path/to/cacert'
        client = self.create_client(env)
        self.assertEqual('/path/to/cacert',
                         client.http_client.http_client.verify)

    def test_v2_client_certfile_and_keyfile(self):
        env = FAKE_ENV.copy()
        env['cert_file'] = '/path/to/cert'
        env['key_file'] = '/path/to/keycert'
        client = self.create_client(env)
        self.assertEqual(('/path/to/cert', '/path/to/keycert'),
                         client.http_client.http_client.cert)

    def test_v2_client_insecure(self):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin')
        env['insecure'] = 'True'
        client = self.create_client(env)
        self.assertIn('insecure', client.auth_plugin.opts)
        self.assertEqual('True', client.auth_plugin.opts['insecure'])


class ClientTest2(ClientTest):
    @staticmethod
    def create_client(env, api_version=2, endpoint=None, exclude=[]):
        env = dict((k, v) for k, v in env.items()
                   if k not in exclude)
        with mock.patch('ceilometerclient.v2.client.Client._get_alarm_client',
                        return_value=None):
            return client.Client(api_version, endpoint, **env)


class ClientTestWithAodh(ClientTest):
    @staticmethod
    def create_client(env, api_version=2, endpoint=None, exclude=[]):
        env = dict((k, v) for k, v in env.items()
                   if k not in exclude)
        with mock.patch('ceilometerclient.openstack.common.apiclient.client.'
                        'HTTPClient.client_request',
                        return_value=mock.MagicMock()):
            return client.get_client(api_version, **env)

    @mock.patch('keystoneclient.v2_0.client', fakes.FakeKeystone)
    def test_client_without_auth_plugin(self):
        env = FAKE_ENV.copy()
        del env['auth_plugin']
        c = self.create_client(env, api_version=2, endpoint='fake_endpoint')
        self.assertIsInstance(c.alarm_client.http_client.auth_plugin,
                              client.AuthPlugin)

    def test_v2_client_insecure(self):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin')
        env['insecure'] = 'True'
        client = self.create_client(env)
        self.assertIn('insecure',
                      client.alarm_client.http_client.auth_plugin.opts)
        self.assertEqual('True', (client.alarm_client.http_client.
                                  auth_plugin.opts['insecure']))

    def test_ceilometerclient_available_without_aodh_services_running(self):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)
        with mock.patch('ceilometerclient.openstack.common.apiclient.client.'
                        'HTTPClient.client_request') as mocked_request:
            mocked_request.side_effect = requests.exceptions.ConnectionError
            ceiloclient = client.get_client(2, **env)
            self.assertIsInstance(ceiloclient, v2client.Client)


class ClientAuthTest(utils.BaseTestCase):

    @staticmethod
    def create_client(env, api_version=2, endpoint=None, exclude=[]):
        env = dict((k, v) for k, v in env.items()
                   if k not in exclude)
        with mock.patch('ceilometerclient.openstack.common.apiclient.client.'
                        'HTTPClient.client_request',
                        return_value=mock.MagicMock()):
            return client.get_client(api_version, **env)

    @mock.patch('keystoneclient.discover.Discover')
    @mock.patch('keystoneclient.session.Session')
    def test_discover_auth_versions(self, session, discover_mock):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)

        mock_session_instance = mock.MagicMock()
        session.return_value = mock_session_instance

        client = self.create_client(env)
        client.auth_plugin.opts.pop('token', None)
        client.auth_plugin._do_authenticate(mock.MagicMock())

        self.assertEqual([mock.call(auth_url='http://no.where',
                                    session=mock_session_instance)],
                         discover_mock.call_args_list)
        self.assertIsInstance(mock_session_instance.auth, v3_auth.Password)

    @mock.patch('keystoneclient.discover.Discover')
    @mock.patch('keystoneclient.session.Session')
    def test_discover_auth_versions_v2_only(self, session, discover):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)
        env.pop('user_domain_name', None)
        env.pop('user_domain_id', None)
        env.pop('project_domain_name', None)
        env.pop('project_domain_id', None)

        session_instance_mock = mock.MagicMock()
        session.return_value = session_instance_mock

        discover_instance_mock = mock.MagicMock()
        discover_instance_mock.url_for.side_effect = (lambda v: v
                                                      if v == '2.0' else None)
        discover.return_value = discover_instance_mock

        client = self.create_client(env)
        client.auth_plugin.opts.pop('token', None)
        client.auth_plugin._do_authenticate(mock.MagicMock())
        self.assertEqual([mock.call(auth_url='http://no.where',
                                    session=session_instance_mock)],
                         discover.call_args_list)

        self.assertIsInstance(session_instance_mock.auth, v2_auth.Password)

    @mock.patch('keystoneclient.discover.Discover')
    @mock.patch('keystoneclient.session.Session')
    def test_discover_auth_versions_raise_discovery_failure(self,
                                                            session,
                                                            discover):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)
        env.pop('token', None)

        session_instance_mock = mock.MagicMock()
        session.return_value = session_instance_mock

        discover_instance_mock = mock.MagicMock()
        discover_instance_mock.url_for.side_effect = (lambda v: v
                                                      if v == '2.0' else None)
        discover.side_effect = ks_exc.DiscoveryFailure
        client = self.create_client(env)
        self.assertRaises(ks_exc.DiscoveryFailure,
                          client.auth_plugin._do_authenticate,
                          mock.Mock())
        discover.side_effect = mock.MagicMock()
        client = self.create_client(env)
        discover.side_effect = ks_exc.DiscoveryFailure
        client.auth_plugin.opts.pop('token', None)

        self.assertRaises(ks_exc.DiscoveryFailure,
                          client.auth_plugin._do_authenticate,
                          mock.Mock())
        self.assertEqual([mock.call(auth_url='http://no.where',
                                    session=session_instance_mock),
                          mock.call(auth_url='http://no.where',
                                    session=session_instance_mock)],
                         discover.call_args_list)

    @mock.patch('ceilometerclient.client._get_keystone_session')
    def test_get_endpoint(self, session):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)
        env.pop('endpoint', None)

        session_instance_mock = mock.MagicMock()
        session.return_value = session_instance_mock

        client = self.create_client(env)
        client.auth_plugin.opts.pop('endpoint')
        client.auth_plugin.opts.pop('token', None)
        alarm_auth_plugin = client.alarm_client.http_client.auth_plugin
        alarm_auth_plugin.opts.pop('endpoint')
        alarm_auth_plugin.opts.pop('token', None)

        self.assertNotEqual(client.auth_plugin, alarm_auth_plugin)

        client.auth_plugin._do_authenticate(mock.MagicMock())
        alarm_auth_plugin._do_authenticate(mock.MagicMock())

        self.assertEqual([
            mock.call(interface='publicURL', region_name=None,
                      service_type='metering'),
            mock.call(interface='publicURL', region_name=None,
                      service_type='alarming'),
        ], session_instance_mock.get_endpoint.mock_calls)

    def test_http_client_with_session(self):
        session = mock.Mock()
        session.request.return_value = mock.Mock(status_code=404,
                                                 text=b'')

        env = {"session": session,
               "service_type": "metering",
               "user_agent": "python-ceilometerclient"}
        c = client.SessionClient(**env)
        self.assertRaises(exc.HTTPException, c.get, "/")

    def test_get_aodh_endpoint_without_auth_url(self):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)
        env.pop('endpoint', None)
        env.pop('auth_url', None)
        client = self.create_client(env, endpoint='fake_endpoint')
        self.assertEqual(client.alarm_client.http_client.auth_plugin.opts,
                         client.auth_plugin.opts)

    @mock.patch('ceilometerclient.client._get_keystone_session')
    def test_get_different_endpoint_type(self, session):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)
        env.pop('endpoint', None)
        env['endpoint_type'] = 'internal'

        session_instance_mock = mock.MagicMock()
        session.return_value = session_instance_mock

        client = self.create_client(env)
        client.auth_plugin.opts.pop('endpoint')
        client.auth_plugin.opts.pop('token', None)
        alarm_auth_plugin = client.alarm_client.http_client.auth_plugin
        alarm_auth_plugin.opts.pop('endpoint')
        alarm_auth_plugin.opts.pop('token', None)

        self.assertNotEqual(client.auth_plugin, alarm_auth_plugin)

        client.auth_plugin._do_authenticate(mock.MagicMock())
        alarm_auth_plugin._do_authenticate(mock.MagicMock())

        self.assertEqual([
            mock.call(interface='internal', region_name=None,
                      service_type='metering'),
            mock.call(interface='internal', region_name=None,
                      service_type='alarming'),
        ], session_instance_mock.get_endpoint.mock_calls)

    @mock.patch('ceilometerclient.client._get_keystone_session')
    def test_get_sufficient_options_missing(self, session):
        env = FAKE_ENV.copy()
        env.pop('auth_plugin', None)
        env.pop('password', None)
        env.pop('endpoint', None)
        env.pop('auth_token', None)
        env.pop('tenant_name', None)
        env.pop('username', None)

        session_instance_mock = mock.MagicMock()
        session.return_value = session_instance_mock
        client = self.create_client(env)
        client.auth_plugin.opts.pop('token', None)
        self.assertRaises(exceptions.AuthPluginOptionsMissing,
                          client.auth_plugin.sufficient_options)
