# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import argparse
import copy
import uuid

from keystoneauth1 import fixture
import mock

from keystoneclient import access
from keystoneclient.auth.identity import v3
from keystoneclient.auth.identity.v3 import base as v3_base
from keystoneclient import client
from keystoneclient import exceptions
from keystoneclient import session
from keystoneclient.tests.unit import utils


class V3IdentityPlugin(utils.TestCase):

    TEST_ROOT_URL = 'http://127.0.0.1:5000/'
    TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3')
    TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
    TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3')

    TEST_PASS = 'password'

    TEST_SERVICE_CATALOG = [{
        "endpoints": [{
            "url": "http://cdn.admin-nets.local:8774/v1.0/",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://127.0.0.1:8774/v1.0",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://cdn.admin-nets.local:8774/v1.0",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "nova_compat"
    }, {
        "endpoints": [{
            "url": "http://nova/novapi/public",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://nova/novapi/internal",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://nova/novapi/admin",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "compute",
        "name": "nova",
    }, {
        "endpoints": [{
            "url": "http://glance/glanceapi/public",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://glance/glanceapi/internal",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://glance/glanceapi/admin",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "image",
        "name": "glance"
    }, {
        "endpoints": [{
            "url": "http://127.0.0.1:5000/v3",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://127.0.0.1:5000/v3",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": TEST_ADMIN_URL,
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "identity"
    }, {
        "endpoints": [{
            "url": "http://swift/swiftapi/public",
            "region": "RegionOne",
            "interface": "public"
        }, {
            "url": "http://swift/swiftapi/internal",
            "region": "RegionOne",
            "interface": "internal"
        }, {
            "url": "http://swift/swiftapi/admin",
            "region": "RegionOne",
            "interface": "admin"
        }],
        "type": "object-store"
    }]

    def setUp(self):
        super(V3IdentityPlugin, self).setUp()

        self.TEST_DISCOVERY_RESPONSE = {
            'versions': {'values': [fixture.V3Discovery(self.TEST_URL)]}}

        self.deprecations.expect_deprecations()

        self.TEST_RESPONSE_DICT = {
            "token": {
                "methods": [
                    "token",
                    "password"
                ],

                "expires_at": "2020-01-01T00:00:10.000123Z",
                "project": {
                    "domain": {
                        "id": self.TEST_DOMAIN_ID,
                        "name": self.TEST_DOMAIN_NAME
                    },
                    "id": self.TEST_TENANT_ID,
                    "name": self.TEST_TENANT_NAME
                },
                "user": {
                    "domain": {
                        "id": self.TEST_DOMAIN_ID,
                        "name": self.TEST_DOMAIN_NAME
                    },
                    "id": self.TEST_USER,
                    "name": self.TEST_USER
                },
                "issued_at": "2013-05-29T16:55:21.468960Z",
                "catalog": self.TEST_SERVICE_CATALOG
            },
        }
        self.TEST_PROJECTS_RESPONSE = {
            "projects": [
                {
                    "domain_id": "1789d1",
                    "enabled": "True",
                    "id": "263fd9",
                    "links": {
                        "self": "https://identity:5000/v3/projects/263fd9"
                    },
                    "name": "Dev Group A"
                },
                {
                    "domain_id": "1789d1",
                    "enabled": "True",
                    "id": "e56ad3",
                    "links": {
                        "self": "https://identity:5000/v3/projects/e56ad3"
                    },
                    "name": "Dev Group B"
                }
            ],
            "links": {
                "self": "https://identity:5000/v3/projects",
            }
        }

    def stub_auth(self, subject_token=None, **kwargs):
        if not subject_token:
            subject_token = self.TEST_TOKEN

        self.stub_url('POST', ['auth', 'tokens'],
                      headers={'X-Subject-Token': subject_token}, **kwargs)

    def test_authenticate_with_username_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v3.Password(self.TEST_URL,
                        username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'identity':
               {'methods': ['password'],
                'password': {'user': {'name': self.TEST_USER,
                                      'password': self.TEST_PASS}}}}}

        self.assertRequestBodyIs(json=req)
        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_username_password_unscoped(self):
        del self.TEST_RESPONSE_DICT['token']['catalog']
        del self.TEST_RESPONSE_DICT['token']['project']

        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url(method="GET", json=self.TEST_DISCOVERY_RESPONSE)
        test_user_id = self.TEST_RESPONSE_DICT['token']['user']['id']
        self.stub_url(method="GET",
                      json=self.TEST_PROJECTS_RESPONSE,
                      parts=['users', test_user_id, 'projects'])

        a = v3.Password(self.TEST_URL,
                        username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)
        cs = client.Client(session=s)

        # As a sanity check on the auth_ref, make sure client has the
        # proper user id, that it fetches the right project response
        self.assertEqual(test_user_id, a.auth_ref.user_id)
        t = cs.projects.list(user=a.auth_ref.user_id)
        self.assertEqual(2, len(t))

    def test_authenticate_with_username_password_domain_scoped(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS, domain_id=self.TEST_DOMAIN_ID)
        s = session.Session(a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'identity':
               {'methods': ['password'],
                'password': {'user': {'name': self.TEST_USER,
                                      'password': self.TEST_PASS}}},
               'scope': {'domain': {'id': self.TEST_DOMAIN_ID}}}}
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_username_password_project_scoped(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS,
                        project_id=self.TEST_TENANT_ID)
        s = session.Session(a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'identity':
               {'methods': ['password'],
                'password': {'user': {'name': self.TEST_USER,
                                      'password': self.TEST_PASS}}},
               'scope': {'project': {'id': self.TEST_TENANT_ID}}}}
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
        self.assertEqual(s.auth.auth_ref.project_id, self.TEST_TENANT_ID)

    def test_authenticate_with_token(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v3.Token(self.TEST_URL, self.TEST_TOKEN)
        s = session.Session(auth=a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'identity':
               {'methods': ['token'],
                'token': {'id': self.TEST_TOKEN}}}}

        self.assertRequestBodyIs(json=req)

        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_with_expired(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        d = copy.deepcopy(self.TEST_RESPONSE_DICT)
        d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z'

        a = v3.Password(self.TEST_URL, username='username',
                        password='password')
        a.auth_ref = access.AccessInfo.factory(body=d)
        s = session.Session(auth=a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        self.assertEqual(a.auth_ref['expires_at'],
                         self.TEST_RESPONSE_DICT['token']['expires_at'])

    def test_with_domain_and_project_scoping(self):
        a = v3.Password(self.TEST_URL, username='username',
                        password='password', project_id='project',
                        domain_id='domain')

        self.assertRaises(exceptions.AuthorizationFailure,
                          a.get_token, None)
        self.assertRaises(exceptions.AuthorizationFailure,
                          a.get_headers, None)

    def test_with_trust_id(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS, trust_id='trust')
        s = session.Session(a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'identity':
               {'methods': ['password'],
                'password': {'user': {'name': self.TEST_USER,
                                      'password': self.TEST_PASS}}},
               'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_with_multiple_mechanisms_factory(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        p = v3.PasswordMethod(username=self.TEST_USER, password=self.TEST_PASS)
        t = v3.TokenMethod(token='foo')
        a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
        s = session.Session(a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'identity':
               {'methods': ['password', 'token'],
                'password': {'user': {'name': self.TEST_USER,
                                      'password': self.TEST_PASS}},
                'token': {'id': 'foo'}},
               'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_with_multiple_mechanisms(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        p = v3.PasswordMethod(username=self.TEST_USER,
                              password=self.TEST_PASS)
        t = v3.TokenMethod(token='foo')
        a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
        s = session.Session(auth=a)

        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        req = {'auth': {'identity':
               {'methods': ['password', 'token'],
                'password': {'user': {'name': self.TEST_USER,
                                      'password': self.TEST_PASS}},
                'token': {'id': 'foo'}},
               'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_with_multiple_scopes(self):
        s = session.Session()

        a = v3.Password(self.TEST_URL,
                        username=self.TEST_USER, password=self.TEST_PASS,
                        domain_id='x', project_id='x')
        self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s)

        a = v3.Password(self.TEST_URL,
                        username=self.TEST_USER, password=self.TEST_PASS,
                        domain_id='x', trust_id='x')
        self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s)

    def _do_service_url_test(self, base_url, endpoint_filter):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url('GET', ['path'],
                      base_url=base_url,
                      text='SUCCESS', status_code=200)

        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        resp = s.get('/path', endpoint_filter=endpoint_filter)

        self.assertEqual(resp.status_code, 200)
        self.assertEqual(self.requests_mock.last_request.url,
                         base_url + '/path')

    def test_service_url(self):
        endpoint_filter = {'service_type': 'compute',
                           'interface': 'admin',
                           'service_name': 'nova'}
        self._do_service_url_test('http://nova/novapi/admin', endpoint_filter)

    def test_service_url_defaults_to_public(self):
        endpoint_filter = {'service_type': 'compute'}
        self._do_service_url_test('http://nova/novapi/public', endpoint_filter)

    def test_endpoint_filter_without_service_type_fails(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        self.assertRaises(exceptions.EndpointNotFound, s.get, '/path',
                          endpoint_filter={'interface': 'admin'})

    def test_full_url_overrides_endpoint_filter(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url('GET', [],
                      base_url='http://testurl/',
                      text='SUCCESS', status_code=200)

        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        resp = s.get('http://testurl/',
                     endpoint_filter={'service_type': 'compute'})
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.text, 'SUCCESS')

    def test_invalid_auth_response_dict(self):
        self.stub_auth(json={'hello': 'world'})

        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
                          authenticated=True)

    def test_invalid_auth_response_type(self):
        self.stub_url('POST', ['auth', 'tokens'], text='testdata')

        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        self.assertRaises(exceptions.InvalidResponse, s.get, 'http://any',
                          authenticated=True)

    def test_invalidate_response(self):
        auth_responses = [{'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
                           'headers': {'X-Subject-Token': 'token1'}},
                          {'status_code': 200, 'json': self.TEST_RESPONSE_DICT,
                           'headers': {'X-Subject-Token': 'token2'}}]

        self.requests_mock.post('%s/auth/tokens' % self.TEST_URL,
                                auth_responses)

        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=self.TEST_PASS)
        s = session.Session(auth=a)

        with self.deprecations.expect_deprecations_here():
            self.assertEqual('token1', s.get_token())
        self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers())
        a.invalidate()
        with self.deprecations.expect_deprecations_here():
            self.assertEqual('token2', s.get_token())
        self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers())

    def test_doesnt_log_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        password = uuid.uuid4().hex
        a = v3.Password(self.TEST_URL, username=self.TEST_USER,
                        password=password)
        s = session.Session(a)
        with self.deprecations.expect_deprecations_here():
            self.assertEqual(self.TEST_TOKEN, s.get_token())
        self.assertEqual({'X-Auth-Token': self.TEST_TOKEN},
                         s.get_auth_headers())

        self.assertNotIn(password, self.logger.output)

    def test_sends_nocatalog(self):
        del self.TEST_RESPONSE_DICT['token']['catalog']
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        a = v3.Password(self.TEST_URL,
                        username=self.TEST_USER,
                        password=self.TEST_PASS,
                        include_catalog=False)
        s = session.Session(auth=a)

        s.get_auth_headers()

        auth_url = self.TEST_URL + '/auth/tokens'
        self.assertEqual(auth_url, a.token_url)
        self.assertEqual(auth_url + '?nocatalog',
                         self.requests_mock.last_request.url)

    def test_symbols(self):
        self.assertIs(v3.AuthMethod, v3_base.AuthMethod)
        self.assertIs(v3.AuthConstructor, v3_base.AuthConstructor)
        self.assertIs(v3.Auth, v3_base.Auth)

    def test_unscoped_request(self):
        token = fixture.V3Token()
        self.stub_auth(json=token)
        password = uuid.uuid4().hex

        a = v3.Password(self.TEST_URL,
                        user_id=token.user_id,
                        password=password,
                        unscoped=True)
        s = session.Session()

        auth_ref = a.get_access(s)

        with self.deprecations.expect_deprecations_here():
            self.assertFalse(auth_ref.scoped)
        body = self.requests_mock.last_request.json()

        ident = body['auth']['identity']

        self.assertEqual(['password'], ident['methods'])
        self.assertEqual(token.user_id, ident['password']['user']['id'])
        self.assertEqual(password, ident['password']['user']['password'])

        self.assertEqual({}, body['auth']['scope']['unscoped'])

    def test_unscoped_with_scope_data(self):
        a = v3.Password(self.TEST_URL,
                        user_id=uuid.uuid4().hex,
                        password=uuid.uuid4().hex,
                        unscoped=True,
                        project_id=uuid.uuid4().hex)

        s = session.Session()

        self.assertRaises(exceptions.AuthorizationFailure, a.get_auth_ref, s)

    @mock.patch('sys.stdin', autospec=True)
    def test_prompt_password(self, mock_stdin):
        parser = argparse.ArgumentParser()
        v3.Password.register_argparse_arguments(parser)

        username = uuid.uuid4().hex
        user_domain_id = uuid.uuid4().hex
        auth_url = uuid.uuid4().hex
        project_id = uuid.uuid4().hex
        password = uuid.uuid4().hex

        opts = parser.parse_args(['--os-username', username,
                                  '--os-auth-url', auth_url,
                                  '--os-user-domain-id', user_domain_id,
                                  '--os-project-id', project_id])

        with mock.patch('getpass.getpass') as mock_getpass:
            mock_getpass.return_value = password
            mock_stdin.isatty = lambda: True

            plugin = v3.Password.load_from_argparse_arguments(opts)

            self.assertEqual(auth_url, plugin.auth_url)
            self.assertEqual(username, plugin.auth_methods[0].username)
            self.assertEqual(project_id, plugin.project_id)
            self.assertEqual(user_domain_id,
                             plugin.auth_methods[0].user_domain_id)
            self.assertEqual(password, plugin.auth_methods[0].password)
