# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Oauth authentication plugins.

.. warning::
    This module requires installation of an extra package (`oauthlib`)
    not installed by default. Without the extra package an import error will
    occur. The extra package can be installed using::

      $ pip install keystoneauth['oauth1']
"""

import logging
import typing as ty

try:
    # explicitly re-export symbol
    # https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-no-implicit-reexport
    from oauthlib import oauth1 as oauth1
except ImportError:
    oauth1 = None

from keystoneauth1.identity import v3
from keystoneauth1 import session as ks_session

__all__ = ('OAuth1Method', 'OAuth1')

LOG = logging.getLogger(__name__)


class OAuth1Method(v3.AuthMethod):
    """OAuth based authentication method.

    :param string access_key: Access token key.
    :param string access_secret: Access token secret.
    :param string consumer_key: Consumer key.
    :param string consumer_secret: Consumer secret.
    """

    access_key: str
    access_secret: str
    consumer_key: str
    consumer_secret: str

    def __init__(
        self,
        *,
        access_key: str,
        access_secret: str,
        consumer_key: str,
        consumer_secret: str,
    ) -> None:
        self.access_key = access_key
        self.access_secret = access_secret
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret

    def get_auth_data(
        self,
        session: ks_session.Session,
        auth: v3.Auth,
        headers: dict[str, str],
        request_kwargs: dict[str, object],
    ) -> tuple[None, None] | tuple[str, ty.Mapping[str, object]]:
        # Add the oauth specific content into the headers
        oauth_client = oauth1.Client(
            self.consumer_key,
            client_secret=self.consumer_secret,
            resource_owner_key=self.access_key,
            resource_owner_secret=self.access_secret,
            signature_method=oauth1.SIGNATURE_HMAC,
        )

        o_url, o_headers, o_body = oauth_client.sign(
            auth.token_url, http_method='POST'
        )
        headers.update(o_headers)

        return 'oauth1', {}

    def get_cache_id_elements(self) -> dict[str, str | None]:
        return {
            'oauth1_access_key': self.access_key,
            'oauth1_access_secret': self.access_secret,
            'oauth1_consumer_key': self.consumer_key,
            'oauth1_consumer_secret': self.consumer_secret,
        }


class OAuth1(v3.Auth):
    _auth_method_class = OAuth1Method

    def __init__(
        self,
        auth_url: str,
        consumer_key: str,
        consumer_secret: str,
        access_key: str,
        access_secret: str,
        *,
        unscoped: bool = False,
        trust_id: str | None = None,
        system_scope: str | None = None,
        domain_id: str | None = None,
        domain_name: str | None = None,
        project_id: str | None = None,
        project_name: str | None = None,
        project_domain_id: str | None = None,
        project_domain_name: str | None = None,
        reauthenticate: bool = True,
        include_catalog: bool = True,
    ) -> None:
        method = self._auth_method_class(
            consumer_key=consumer_key,
            consumer_secret=consumer_secret,
            access_key=access_key,
            access_secret=access_secret,
        )
        super().__init__(
            auth_url,
            [method],
            unscoped=unscoped,
            trust_id=trust_id,
            system_scope=system_scope,
            domain_id=domain_id,
            domain_name=domain_name,
            project_id=project_id,
            project_name=project_name,
            project_domain_id=project_domain_id,
            project_domain_name=project_domain_name,
            reauthenticate=reauthenticate,
            include_catalog=include_catalog,
        )

        if self.has_scope_parameters:
            LOG.warning(
                'Scoping parameters such as a project were provided '
                'to the OAuth1 plugin. Because OAuth1 access is '
                'always scoped to a project these will be ignored by '
                'the identity server'
            )
