#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import datetime
import json

import httpretty
import six

from keystoneclient import exceptions
from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import timeutils
from keystoneclient.tests.v2_0 import utils
from keystoneclient.v2_0 import client


class AuthenticateAgainstKeystoneTests(utils.TestCase):
    def setUp(self):
        super(AuthenticateAgainstKeystoneTests, self).setUp()
        self.TEST_RESPONSE_DICT = {
            "access": {
                "token": {
                    "expires": "2020-01-01T00:00:10.000123Z",
                    "id": self.TEST_TOKEN,
                    "tenant": {
                        "id": self.TEST_TENANT_ID
                    },
                },
                "user": {
                    "id": self.TEST_USER
                },
                "serviceCatalog": self.TEST_SERVICE_CATALOG,
            },
        }
        self.TEST_REQUEST_BODY = {
            "auth": {
                "passwordCredentials": {
                    "username": self.TEST_USER,
                    "password": self.TEST_TOKEN,
                },
                "tenantId": self.TEST_TENANT_ID,
            },
        }

    @httpretty.activate
    def test_authenticate_success_expired(self):
        # Build an expired token
        self.TEST_RESPONSE_DICT['access']['token']['expires'] = (
            (timeutils.utcnow() - datetime.timedelta(1)).isoformat())

        exp_resp = httpretty.Response(body=json.dumps(self.TEST_RESPONSE_DICT),
                                      content_type='application/json')

        # Build a new response
        TEST_TOKEN = "abcdef"
        self.TEST_RESPONSE_DICT['access']['token']['expires'] = (
            '2020-01-01T00:00:10.000123Z')
        self.TEST_RESPONSE_DICT['access']['token']['id'] = TEST_TOKEN

        new_resp = httpretty.Response(body=json.dumps(self.TEST_RESPONSE_DICT),
                                      content_type='application/json')

        # return expired first, and then the new response
        self.stub_auth(responses=[exp_resp, new_resp])

        cs = client.Client(tenant_id=self.TEST_TENANT_ID,
                           auth_url=self.TEST_URL,
                           username=self.TEST_USER,
                           password=self.TEST_TOKEN)

        self.assertEqual(cs.management_url,
                         self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3]
                         ['endpoints'][0]["adminURL"])

        self.assertEqual(cs.auth_token, TEST_TOKEN)
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_authenticate_failure(self):
        _auth = 'auth'
        _cred = 'passwordCredentials'
        _pass = 'password'
        self.TEST_REQUEST_BODY[_auth][_cred][_pass] = 'bad_key'
        error = {"unauthorized": {"message": "Unauthorized",
                                  "code": "401"}}

        self.stub_auth(status=401, json=error)

        # Workaround for issue with assertRaises on python2.6
        # where with assertRaises(exceptions.Unauthorized): doesn't work
        # right
        def client_create_wrapper():
            client.Client(username=self.TEST_USER,
                          password="bad_key",
                          tenant_id=self.TEST_TENANT_ID,
                          auth_url=self.TEST_URL)

        self.assertRaises(exceptions.Unauthorized, client_create_wrapper)
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_auth_redirect(self):
        self.stub_auth(status=305, body='Use Proxy',
                       location=self.TEST_ADMIN_URL + "/tokens")

        self.stub_auth(base_url=self.TEST_ADMIN_URL,
                       json=self.TEST_RESPONSE_DICT)

        cs = client.Client(username=self.TEST_USER,
                           password=self.TEST_TOKEN,
                           tenant_id=self.TEST_TENANT_ID,
                           auth_url=self.TEST_URL)

        self.assertEqual(cs.management_url,
                         self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3]
                         ['endpoints'][0]["adminURL"])
        self.assertEqual(cs.auth_token,
                         self.TEST_RESPONSE_DICT["access"]["token"]["id"])
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_authenticate_success_password_scoped(self):
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        cs = client.Client(username=self.TEST_USER,
                           password=self.TEST_TOKEN,
                           tenant_id=self.TEST_TENANT_ID,
                           auth_url=self.TEST_URL)
        self.assertEqual(cs.management_url,
                         self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3]
                         ['endpoints'][0]["adminURL"])
        self.assertEqual(cs.auth_token,
                         self.TEST_RESPONSE_DICT["access"]["token"]["id"])
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_authenticate_success_password_unscoped(self):
        del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
        del self.TEST_REQUEST_BODY['auth']['tenantId']

        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        cs = client.Client(username=self.TEST_USER,
                           password=self.TEST_TOKEN,
                           auth_url=self.TEST_URL)
        self.assertEqual(cs.auth_token,
                         self.TEST_RESPONSE_DICT["access"]["token"]["id"])
        self.assertFalse('serviceCatalog' in cs.service_catalog.catalog)
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_auth_url_token_authentication(self):
        fake_token = 'fake_token'
        fake_url = '/fake-url'
        fake_resp = {'result': True}

        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url('GET', [fake_url], json=fake_resp,
                      base_url=self.TEST_ADMIN_IDENTITY_ENDPOINT)

        cl = client.Client(auth_url=self.TEST_URL,
                           token=fake_token)
        body = httpretty.last_request().body
        if six.PY3:
            body = body.decode('utf-8')
        body = jsonutils.loads(body)
        self.assertEqual(body['auth']['token']['id'], fake_token)

        resp, body = cl.get(fake_url)
        self.assertEqual(fake_resp, body)

        self.assertEqual(httpretty.last_request().headers.get('X-Auth-Token'),
                         self.TEST_TOKEN)

    @httpretty.activate
    def test_authenticate_success_token_scoped(self):
        del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
        self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        cs = client.Client(token=self.TEST_TOKEN,
                           tenant_id=self.TEST_TENANT_ID,
                           auth_url=self.TEST_URL)
        self.assertEqual(cs.management_url,
                         self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3]
                         ['endpoints'][0]["adminURL"])
        self.assertEqual(cs.auth_token,
                         self.TEST_RESPONSE_DICT["access"]["token"]["id"])
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_authenticate_success_token_scoped_trust(self):
        del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
        self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
        self.TEST_REQUEST_BODY['auth']['trust_id'] = self.TEST_TRUST_ID
        response = self.TEST_RESPONSE_DICT.copy()
        response['access']['trust'] = {"trustee_user_id": self.TEST_USER,
                                       "id": self.TEST_TRUST_ID}
        self.stub_auth(json=response)

        cs = client.Client(token=self.TEST_TOKEN,
                           tenant_id=self.TEST_TENANT_ID,
                           trust_id=self.TEST_TRUST_ID,
                           auth_url=self.TEST_URL)
        self.assertTrue(cs.auth_ref.trust_scoped)
        self.assertEqual(cs.auth_ref.trust_id, self.TEST_TRUST_ID)
        self.assertEqual(cs.auth_ref.trustee_user_id, self.TEST_USER)
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_authenticate_success_token_unscoped(self):
        del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
        del self.TEST_REQUEST_BODY['auth']['tenantId']
        del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
        self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}

        self.stub_auth(json=self.TEST_RESPONSE_DICT)

        cs = client.Client(token=self.TEST_TOKEN,
                           auth_url=self.TEST_URL)
        self.assertEqual(cs.auth_token,
                         self.TEST_RESPONSE_DICT["access"]["token"]["id"])
        self.assertFalse('serviceCatalog' in cs.service_catalog.catalog)
        self.assertRequestBodyIs(json=self.TEST_REQUEST_BODY)

    @httpretty.activate
    def test_allow_override_of_auth_token(self):
        fake_url = '/fake-url'
        fake_token = 'fake_token'
        fake_resp = {'result': True}

        self.stub_auth(json=self.TEST_RESPONSE_DICT)
        self.stub_url('GET', [fake_url], json=fake_resp,
                      base_url=self.TEST_ADMIN_IDENTITY_ENDPOINT)

        cl = client.Client(username='exampleuser',
                           password='password',
                           tenant_name='exampleproject',
                           auth_url=self.TEST_URL)

        self.assertEqual(cl.auth_token, self.TEST_TOKEN)

        # the token returned from the authentication will be used
        resp, body = cl.get(fake_url)
        self.assertEqual(fake_resp, body)

        self.assertEqual(httpretty.last_request().headers.get('X-Auth-Token'),
                         self.TEST_TOKEN)

        # then override that token and the new token shall be used
        cl.auth_token = fake_token

        resp, body = cl.get(fake_url)
        self.assertEqual(fake_resp, body)

        self.assertEqual(httpretty.last_request().headers.get('X-Auth-Token'),
                         fake_token)

        # if we clear that overridden token then we fall back to the original
        del cl.auth_token

        resp, body = cl.get(fake_url)
        self.assertEqual(fake_resp, body)

        self.assertEqual(httpretty.last_request().headers.get('X-Auth-Token'),
                         self.TEST_TOKEN)
