# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import argparse
import copy
from unittest import mock
import uuid


from keystoneclient.auth.identity import v2
from keystoneclient import exceptions
from keystoneclient import session
from keystoneclient.tests.unit import utils


class V2IdentityPlugin(utils.TestCase):

    TEST_ROOT_URL = 'http://127.0.0.1:5000/'
    TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v2.0')
    TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
    TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v2.0')

    TEST_PASS = 'password'

    TEST_SERVICE_CATALOG = [{
        "endpoints": [{
            "adminURL": "http://cdn.admin-nets.local:8774/v1.0",
            "region": "RegionOne",
            "internalURL": "http://127.0.0.1:8774/v1.0",
            "publicURL": "http://cdn.admin-nets.local:8774/v1.0/"
        }],
        "type": "nova_compat",
        "name": "nova_compat"
    }, {
        "endpoints": [{
            "adminURL": "http://nova/novapi/admin",
            "region": "RegionOne",
            "internalURL": "http://nova/novapi/internal",
            "publicURL": "http://nova/novapi/public"
        }],
        "type": "compute",
        "name": "nova"
    }, {
        "endpoints": [{
            "adminURL": "http://glance/glanceapi/admin",
            "region": "RegionOne",
            "internalURL": "http://glance/glanceapi/internal",
            "publicURL": "http://glance/glanceapi/public"
        }],
        "type": "image",
        "name": "glance"
    }, {
        "endpoints": [{
            "adminURL": TEST_ADMIN_URL,
            "region": "RegionOne",
            "internalURL": "http://127.0.0.1:5000/v2.0",
            "publicURL": "http://127.0.0.1:5000/v2.0"
        }],
        "type": "identity",
        "name": "keystone"
    }, {
        "endpoints": [{
            "adminURL": "http://swift/swiftapi/admin",
            "region": "RegionOne",
            "internalURL": "http://swift/swiftapi/internal",
            "publicURL": "http://swift/swiftapi/public"
        }],
        "type": "object-store",
        "name": "swift"
    }]

    def setUp(self):
        super(V2IdentityPlugin, self).setUp()
        self.deprecations.expect_deprecations()
        self.TEST_RESPONSE_DICT = {
            "access": {
                "token": {
                    "expires": "2999-01-01T00:00:10.000123Z",
                    "id": self.TEST_TOKEN,
                    "tenant": {
                        "id": self.TEST_TENANT_ID
                    },
                },
                "user": {
                    "id": self.TEST_USER
                },
                "serviceCatalog": self.TEST_SERVICE_CATALOG,
            },
        }

    def stub_auth(self, **kwargs):
        self.stub_url('POST', ['tokens'], **kwargs)

    def test_authenticate_with_username_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        self.assertIsNone(a.user_id)
        s = session.Session(a)
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
                                                'password': self.TEST_PASS}}}
        self.assertRequestBodyIs(json=req)
        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_user_id_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
                        password=self.TEST_PASS)
        self.assertIsNone(a.username)
        s = session.Session(a)
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
                                                'password': self.TEST_PASS}}}
        self.assertRequestBodyIs(json=req)
        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_username_password_scoped(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
        self.assertIsNone(a.user_id)
        s = session.Session(a)
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
                                                'password': self.TEST_PASS},
                        'tenantId': self.TEST_TENANT_ID}}
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_user_id_password_scoped(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
                        password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
        self.assertIsNone(a.username)
        s = session.Session(a)
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
                                                'password': self.TEST_PASS},
                        'tenantId': self.TEST_TENANT_ID}}
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_token(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Token(self.TEST_URL, 'foo')
        s = session.Session(a)
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'token': {'id': 'foo'}}}
        self.assertRequestBodyIs(json=req)
        self.assertRequestHeaderEqual('x-Auth-Token', 'foo')
        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_with_trust_id(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS, trust_id='trust')
        s = session.Session(a)
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
                                                'password': self.TEST_PASS},
                        'trust_id': 'trust'}}

        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def _do_service_url_test(self, base_url, endpoint_filter):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url('GET', ['path'],
                      base_url=base_url,
                      text='SUCCESS', status_code=200)

        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        resp = s.get('/path', endpoint_filter=endpoint_filter)

        self.assertEqual(resp.status_code, 200)
        self.assertEqual(self.requests_mock.last_request.url,
                         base_url + '/path')

    def test_service_url(self):
        endpoint_filter = {'service_type': 'compute',
                           'interface': 'admin',
                           'service_name': 'nova'}
        self._do_service_url_test('http://nova/novapi/admin', endpoint_filter)

    def test_service_url_defaults_to_public(self):
        endpoint_filter = {'service_type': 'compute'}
        self._do_service_url_test('http://nova/novapi/public', endpoint_filter)

    def test_endpoint_filter_without_service_type_fails(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        self.assertRaises(exceptions.EndpointNotFound, s.get, '/path',
                          endpoint_filter={'interface': 'admin'})

    def test_full_url_overrides_endpoint_filter(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url('GET', [],
                      base_url='http://testurl/',
                      text='SUCCESS', status_code=200)

        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        resp = s.get('http://testurl/',
                     endpoint_filter={'service_type': 'compute'})
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.text, 'SUCCESS')

    def test_invalid_auth_response_dict(self):
        self.stub_auth(json={'hello': 'world'})

        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
                          authenticated=True)

    def test_invalid_auth_response_type(self):
        self.stub_url('POST', ['tokens'], text='testdata')

        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
                          authenticated=True)

    def test_invalidate_response(self):
        resp_data1 = copy.deepcopy(self.TEST_RESPONSE_DICT)
        resp_data2 = copy.deepcopy(self.TEST_RESPONSE_DICT)

        resp_data1['access']['token']['id'] = 'token1'
        resp_data2['access']['token']['id'] = 'token2'

        auth_responses = [{'json': resp_data1}, {'json': resp_data2}]
        self.stub_auth(response_list=auth_responses)

        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        with self.deprecations.expect_deprecations_here():
            self.assertEqual('token1', s.get_token())
        self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers())

        a.invalidate()
        with self.deprecations.expect_deprecations_here():
            self.assertEqual('token2', s.get_token())
        self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers())

    def test_doesnt_log_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        password = uuid.uuid4().hex

        a = v2.Password(self.TEST_URL, username=self.TEST_USER,
                        password=password)
        s = session.Session(auth=a)
        with self.deprecations.expect_deprecations_here():
            self.assertEqual(self.TEST_TOKEN, s.get_token())
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())
        self.assertNotIn(password, self.logger.output)

    def test_password_with_no_user_id_or_name(self):
        self.assertRaises(TypeError,
                          v2.Password, self.TEST_URL, password=self.TEST_PASS)

    @mock.patch('sys.stdin', autospec=True)
    def test_prompt_password(self, mock_stdin):
        parser = argparse.ArgumentParser()
        v2.Password.register_argparse_arguments(parser)

        username = uuid.uuid4().hex
        auth_url = uuid.uuid4().hex
        tenant_id = uuid.uuid4().hex
        password = uuid.uuid4().hex

        opts = parser.parse_args(['--os-username', username,
                                  '--os-auth-url', auth_url,
                                  '--os-tenant-id', tenant_id])

        with mock.patch('getpass.getpass') as mock_getpass:
            mock_getpass.return_value = password
            mock_stdin.isatty = lambda: True

            plugin = v2.Password.load_from_argparse_arguments(opts)

            self.assertEqual(auth_url, plugin.auth_url)
            self.assertEqual(username, plugin.username)
            self.assertEqual(tenant_id, plugin.tenant_id)
            self.assertEqual(password, plugin.password)
