# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import copy
import json
import time
import uuid

from keystoneauth1 import _utils as ksa_utils
from keystoneauth1 import access
from keystoneauth1 import exceptions
from keystoneauth1 import fixture
from keystoneauth1.identity import v2
from keystoneauth1 import session
from keystoneauth1.tests.unit import utils


class V2IdentityPlugin(utils.TestCase):
    TEST_ROOT_URL = 'http://127.0.0.1:5000/'
    TEST_URL = '{}{}'.format(TEST_ROOT_URL, 'v2.0')
    TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
    TEST_ADMIN_URL = '{}{}'.format(TEST_ROOT_ADMIN_URL, 'v2.0')

    TEST_PASS = 'password'

    TEST_SERVICE_CATALOG = [
        {
            "endpoints": [
                {
                    "adminURL": "http://cdn.admin-nets.local:8774/v1.0",
                    "region": "RegionOne",
                    "internalURL": "http://127.0.0.1:8774/v1.0",
                    "publicURL": "http://cdn.admin-nets.local:8774/v1.0/",
                }
            ],
            "type": "nova_compat",
            "name": "nova_compat",
        },
        {
            "endpoints": [
                {
                    "adminURL": "http://nova/novapi/admin",
                    "region": "RegionOne",
                    "internalURL": "http://nova/novapi/internal",
                    "publicURL": "http://nova/novapi/public",
                }
            ],
            "type": "compute",
            "name": "nova",
        },
        {
            "endpoints": [
                {
                    "adminURL": "http://glance/glanceapi/admin",
                    "region": "RegionOne",
                    "internalURL": "http://glance/glanceapi/internal",
                    "publicURL": "http://glance/glanceapi/public",
                }
            ],
            "type": "image",
            "name": "glance",
        },
        {
            "endpoints": [
                {
                    "adminURL": TEST_ADMIN_URL,
                    "region": "RegionOne",
                    "internalURL": "http://127.0.0.1:5000/v2.0",
                    "publicURL": "http://127.0.0.1:5000/v2.0",
                }
            ],
            "type": "identity",
            "name": "keystone",
        },
        {
            "endpoints": [
                {
                    "adminURL": "http://swift/swiftapi/admin",
                    "region": "RegionOne",
                    "internalURL": "http://swift/swiftapi/internal",
                    "publicURL": "http://swift/swiftapi/public",
                }
            ],
            "type": "object-store",
            "name": "swift",
        },
    ]

    def setUp(self):
        super().setUp()

        nextyear = 1 + time.gmtime().tm_year
        self.TEST_RESPONSE_DICT = {
            "access": {
                "token": {
                    "expires": f"{nextyear}-02-01T00:00:10.000123Z",
                    "id": self.TEST_TOKEN,
                    "tenant": {"id": self.TEST_TENANT_ID},
                },
                "user": {"id": self.TEST_USER},
                "serviceCatalog": self.TEST_SERVICE_CATALOG,
            }
        }

    def stub_auth(self, **kwargs):
        self.stub_url('POST', ['tokens'], **kwargs)

    def test_authenticate_with_username_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS
        )
        self.assertIsNone(a.user_id)
        self.assertFalse(a.has_scope_parameters)
        s = session.Session(a)
        self.assertEqual(
            {'X-Auth-Token': self.TEST_TOKEN}, s.get_auth_headers()
        )

        req = {
            'auth': {
                'passwordCredentials': {
                    'username': self.TEST_USER,
                    'password': self.TEST_PASS,
                }
            }
        }
        self.assertRequestBodyIs(json=req)
        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_user_id_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(
            self.TEST_URL, user_id=self.TEST_USER, password=self.TEST_PASS
        )
        self.assertIsNone(a.username)
        self.assertFalse(a.has_scope_parameters)
        s = session.Session(a)
        self.assertEqual(
            {'X-Auth-Token': self.TEST_TOKEN}, s.get_auth_headers()
        )

        req = {
            'auth': {
                'passwordCredentials': {
                    'userId': self.TEST_USER,
                    'password': self.TEST_PASS,
                }
            }
        }
        self.assertRequestBodyIs(json=req)
        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_username_password_scoped(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(
            self.TEST_URL,
            username=self.TEST_USER,
            password=self.TEST_PASS,
            tenant_id=self.TEST_TENANT_ID,
        )
        self.assertTrue(a.has_scope_parameters)
        self.assertIsNone(a.user_id)
        s = session.Session(a)
        self.assertEqual(
            {'X-Auth-Token': self.TEST_TOKEN}, s.get_auth_headers()
        )

        req = {
            'auth': {
                'passwordCredentials': {
                    'username': self.TEST_USER,
                    'password': self.TEST_PASS,
                },
                'tenantId': self.TEST_TENANT_ID,
            }
        }
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_user_id_password_scoped(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(
            self.TEST_URL,
            user_id=self.TEST_USER,
            password=self.TEST_PASS,
            tenant_id=self.TEST_TENANT_ID,
        )
        self.assertIsNone(a.username)
        self.assertTrue(a.has_scope_parameters)
        s = session.Session(a)
        self.assertEqual(
            {'X-Auth-Token': self.TEST_TOKEN}, s.get_auth_headers()
        )

        req = {
            'auth': {
                'passwordCredentials': {
                    'userId': self.TEST_USER,
                    'password': self.TEST_PASS,
                },
                'tenantId': self.TEST_TENANT_ID,
            }
        }
        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_authenticate_with_token(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Token(self.TEST_URL, 'foo')
        s = session.Session(a)
        self.assertEqual(
            {'X-Auth-Token': self.TEST_TOKEN}, s.get_auth_headers()
        )

        req = {'auth': {'token': {'id': 'foo'}}}
        self.assertRequestBodyIs(json=req)
        self.assertRequestHeaderEqual('x-Auth-Token', 'foo')
        self.assertRequestHeaderEqual('Content-Type', 'application/json')
        self.assertRequestHeaderEqual('Accept', 'application/json')
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def test_with_trust_id(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        a = v2.Password(
            self.TEST_URL,
            username=self.TEST_USER,
            password=self.TEST_PASS,
            trust_id='trust',
        )
        self.assertTrue(a.has_scope_parameters)
        s = session.Session(a)
        self.assertEqual(
            {'X-Auth-Token': self.TEST_TOKEN}, s.get_auth_headers()
        )

        req = {
            'auth': {
                'passwordCredentials': {
                    'username': self.TEST_USER,
                    'password': self.TEST_PASS,
                },
                'trust_id': 'trust',
            }
        }

        self.assertRequestBodyIs(json=req)
        self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

    def _do_service_url_test(self, base_url, endpoint_filter):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url(
            'GET', ['path'], base_url=base_url, text='SUCCESS', status_code=200
        )

        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS
        )
        s = session.Session(auth=a)

        resp = s.get('/path', endpoint_filter=endpoint_filter)

        self.assertEqual(resp.status_code, 200)
        self.assertEqual(
            self.requests_mock.last_request.url, base_url + '/path'
        )

    def test_service_url(self):
        endpoint_filter = {
            'service_type': 'compute',
            'interface': 'admin',
            'service_name': 'nova',
        }
        self._do_service_url_test('http://nova/novapi/admin', endpoint_filter)

    def test_service_url_defaults_to_public(self):
        endpoint_filter = {'service_type': 'compute'}
        self._do_service_url_test('http://nova/novapi/public', endpoint_filter)

    def test_endpoint_filter_without_service_type_fails(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS
        )
        s = session.Session(auth=a)

        self.assertRaises(
            exceptions.EndpointNotFound,
            s.get,
            '/path',
            endpoint_filter={'interface': 'admin'},
        )

    def test_full_url_overrides_endpoint_filter(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url(
            'GET',
            [],
            base_url='http://testurl/',
            text='SUCCESS',
            status_code=200,
        )

        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS
        )
        s = session.Session(auth=a)

        resp = s.get(
            'http://testurl/', endpoint_filter={'service_type': 'compute'}
        )
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.text, 'SUCCESS')

    def test_invalid_auth_response_dict(self):
        self.stub_auth(json={'hello': 'world'})

        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS
        )
        s = session.Session(auth=a)

        self.assertRaises(
            exceptions.InvalidResponse, s.get, 'http://any', authenticated=True
        )

    def test_invalid_auth_response_type(self):
        self.stub_url('POST', ['tokens'], text='testdata')

        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS
        )
        s = session.Session(auth=a)

        self.assertRaises(
            exceptions.InvalidResponse, s.get, 'http://any', authenticated=True
        )

    def test_invalidate_response(self):
        resp_data1 = copy.deepcopy(self.TEST_RESPONSE_DICT)
        resp_data2 = copy.deepcopy(self.TEST_RESPONSE_DICT)

        resp_data1['access']['token']['id'] = 'token1'
        resp_data2['access']['token']['id'] = 'token2'

        auth_responses = [{'json': resp_data1}, {'json': resp_data2}]
        self.stub_auth(response_list=auth_responses)

        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=self.TEST_PASS
        )
        s = session.Session(auth=a)

        self.assertEqual('token1', s.get_token())
        self.assertEqual({'X-Auth-Token': 'token1'}, s.get_auth_headers())

        a.invalidate()
        self.assertEqual('token2', s.get_token())
        self.assertEqual({'X-Auth-Token': 'token2'}, s.get_auth_headers())

    def test_doesnt_log_password(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        password = uuid.uuid4().hex

        a = v2.Password(
            self.TEST_URL, username=self.TEST_USER, password=password
        )
        s = session.Session(auth=a)
        self.assertEqual(self.TEST_TOKEN, s.get_token())
        self.assertEqual(
            {'X-Auth-Token': self.TEST_TOKEN}, s.get_auth_headers()
        )
        self.assertNotIn(password, self.logger.output)

    def test_password_with_no_user_id_or_name(self):
        self.assertRaises(
            TypeError, v2.Password, self.TEST_URL, password=self.TEST_PASS
        )

    def test_password_cache_id(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        trust_id = uuid.uuid4().hex

        a = v2.Password(
            self.TEST_URL,
            username=self.TEST_USER,
            password=self.TEST_PASS,
            trust_id=trust_id,
        )

        b = v2.Password(
            self.TEST_URL,
            username=self.TEST_USER,
            password=self.TEST_PASS,
            trust_id=trust_id,
        )

        a_id = a.get_cache_id()
        b_id = b.get_cache_id()

        self.assertEqual(a_id, b_id)

        c = v2.Password(
            self.TEST_URL,
            username=self.TEST_USER,
            password=self.TEST_PASS,
            tenant_id=trust_id,
        )  # same value different param

        c_id = c.get_cache_id()

        self.assertNotEqual(a_id, c_id)

        self.assertIsNone(a.get_auth_state())
        self.assertIsNone(b.get_auth_state())
        self.assertIsNone(c.get_auth_state())

        s = session.Session()
        self.assertEqual(self.TEST_TOKEN, a.get_token(s))
        self.assertTrue(self.requests_mock.called)

    def test_password_change_auth_state(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        expired = ksa_utils.before_utcnow(days=2)
        token = fixture.V2Token(expires=expired)

        auth_ref = access.create(body=token)

        a = v2.Password(
            self.TEST_URL,
            username=self.TEST_USER,
            password=self.TEST_PASS,
            tenant_id=uuid.uuid4().hex,
        )

        initial_cache_id = a.get_cache_id()

        state = a.get_auth_state()
        self.assertIsNone(state)

        state = json.dumps(
            {'auth_token': auth_ref.auth_token, 'body': auth_ref._data}
        )
        a.set_auth_state(state)

        self.assertEqual(token.token_id, a.auth_ref.auth_token)

        s = session.Session()
        self.assertEqual(self.TEST_TOKEN, a.get_token(s))  # updates expired
        self.assertEqual(initial_cache_id, a.get_cache_id())
