# coding: utf-8

# Copyright 2019, 2021 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# from ibm_cloud_sdk_core.authenticators import Authenticator
import datetime
import gzip
import io
import json as json_import
import re
import ssl
from os import getenv, environ, getcwd
from os.path import isfile, join, expanduser
from typing import List, Union
from urllib.parse import urlparse, parse_qs

from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from urllib3.util.ssl_ import create_urllib3_context

import dateutil.parser as date_parser


class SSLHTTPAdapter(HTTPAdapter):
    """Wraps the original HTTP adapter and adds additional SSL context."""

    def __init__(self, *args, **kwargs):
        self._disable_ssl_verification = kwargs.pop('_disable_ssl_verification', None)

        super().__init__(*args, **kwargs)

    def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
        """Create and use custom SSL configuration."""

        ssl_context = create_urllib3_context()
        ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2

        if self._disable_ssl_verification:
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE

        super().init_poolmanager(connections, maxsize, block, ssl_context=ssl_context, **pool_kwargs)


class GzipStream(io.RawIOBase):
    """Compress files on the fly.

    GzipStream is a helper class around the gzip library. It helps to
    compress already opened files (file-like objects) on the fly, so
    there is no need to read everything into the memory and call the
    `compress` function on it.
    The GzipFile is opened on the instance itself so it needs to act
    as a file-like object.

    Args:
        input: the source of the data to be compressed.
               It can be a file-like object, bytes or string.
    """

    def __init__(self, source: Union[io.IOBase, bytes, str]):
        self.buffer = b''

        if isinstance(source, io.IOBase):
            # The input is already a file-like object, use it as-is.
            self.uncompressed = source
        elif isinstance(source, str):
            # Strings must be handled with StringIO.
            self.uncompressed = io.StringIO(source)
        else:
            # Handle the rest as raw bytes.
            self.uncompressed = io.BytesIO(source)

        self.compressor = gzip.GzipFile(fileobj=self, mode='wb')

    def read(self, size: int = -1) -> bytes:
        """Compresses and returns the requested size of data.

        Args:
            size: how many bytes to return. -1 to read and compress the whole file
        """
        compressed = b''

        if (size < 0) or (len(self.buffer) < size):
            for raw in self.uncompressed:
                # We need to encode text like streams (e.g. TextIOWrapper) to bytes.
                if isinstance(raw, str):
                    raw = raw.encode()

                self.compressor.write(raw)

                # Stop compressing if we reached the max allowed size.
                if 0 < size < len(self.buffer):
                    self.compressor.flush()
                    break
            else:
                self.compressor.close()

            if size < 0:
                # Return all data from the buffer.
                compressed = self.buffer
                self.buffer = b''
        else:
            # If we already have enough data in our buffer
            # return the desired chunk of bytes
            compressed = self.buffer[:size]
            # then remove them from the buffer.
            self.buffer = self.buffer[size:]

        return compressed

    def flush(self) -> None:
        """Not implemented."""
        # Since this "pipe" sits between 2 other stream (source/read -> target/write)
        # it wouldn't be worth to implemet flushing.
        pass

    def write(self, compressed: bytes) -> None:
        """Append the compressed data to the buffer

        This happens when the target stream calls the `read` method and
        that triggers the gzip "compressor".
        """
        self.buffer += compressed

    def close(self) -> None:
        """Closes the underlying file-like object."""
        self.uncompressed.close()


def has_bad_first_or_last_char(val: str) -> bool:
    """Returns true if a string starts with any of: {," ; or ends with any of: },".

    Args:
        val: The string to be tested.

    Returns:
        Whether or not the string starts or ends with bad characters.
    """
    return val is not None and (val.startswith('{') or val.startswith('"') or val.endswith('}') or val.endswith('"'))


def remove_null_values(dictionary: dict) -> dict:
    """Create a new dictionary without keys mapped to null values.

    Args:
        dictionary: The dictionary potentially containing keys mapped to values of None.

    Returns:
        A dict with no keys mapped to None.
    """
    if isinstance(dictionary, dict):
        return {k: v for (k, v) in dictionary.items() if v is not None}
    return dictionary


def cleanup_values(dictionary: dict) -> dict:
    """Create a new dictionary with boolean values converted to strings.

    Ex. true -> 'true', false -> 'false'.
    { 'key': true } -> { 'key': 'true' }

    Args:
        dictionary: The dictionary with keys mapped to booleans.

    Returns:
        The dictionary with certain keys mapped to s and not booleans.
    """
    if isinstance(dictionary, dict):
        return {k: cleanup_value(v) for (k, v) in dictionary.items()}
    return dictionary


def cleanup_value(value: any) -> any:
    """Convert a boolean value to string."""
    if isinstance(value, bool):
        return 'true' if value else 'false'
    return value


def strip_extra_slashes(value: str) -> str:
    """Combine multiple trailing slashes to a single slash"""
    if value.endswith('//'):
        return value.rstrip('/') + '/'
    return value


def datetime_to_string(val: datetime.datetime) -> str:
    """Convert a datetime object to string.

    If the supplied datetime does not specify a timezone,
    it is assumed to be UTC.

    Args:
        val: The datetime object.

    Returns:
        datetime serialized to iso8601 format.
    """
    if isinstance(val, datetime.datetime):
        if val.tzinfo is None:
            return val.isoformat() + 'Z'
        val = val.astimezone(datetime.timezone.utc)
        return val.isoformat().replace('+00:00', 'Z')
    return val


def string_to_datetime(string: str) -> datetime.datetime:
    """De-serializes string to datetime.

    Args:
        string: string containing datetime in iso8601 format.

    Returns:
        the de-serialized string as a datetime object.
    """
    val = date_parser.parse(string)
    if val.tzinfo is not None:
        return val
    return val.replace(tzinfo=datetime.timezone.utc)


def string_to_datetime_list(string_list: List[str]) -> List[datetime.datetime]:
    """De-serializes each string in a list to a datetime.

    Args:
        string_list: list of strings containing datetime in iso8601 format.

    Returns:
        the de-serialized list of strings as a list of datetime objects.
    """
    if not isinstance(string_list, list):
        raise ValueError(
            "Invalid argument type: " + str(type(string_list)) + ". Argument string_list must be of type List[str]"
        )
    datetime_list = []
    for string_val in string_list:
        datetime_list.append(string_to_datetime(string_val))
    return datetime_list


def datetime_to_string_list(datetime_list: List[datetime.datetime]) -> List[str]:
    """Convert a list of datetime objects to a list of strings.

    Args:
        datetime_list: The list of datetime objects.

    Returns:
        list of datetimes serialized as strings in iso8601 format.
    """
    if not isinstance(datetime_list, list):
        raise ValueError(
            "Invalid argument type: "
            + str(type(datetime_list))
            + ". Argument datetime_list must be of type List[datetime.datetime]"
        )
    string_list = []
    for datetime_val in datetime_list:
        string_list.append(datetime_to_string(datetime_val))
    return string_list


def date_to_string(val: datetime.date) -> str:
    """Convert a date object to string.

    Args:
        val: The date object.

    Returns:
        date serialized to `YYYY-MM-DD` format.
    """
    if isinstance(val, datetime.date):
        return str(val)
    return val


def string_to_date(string: str) -> datetime.date:
    """De-serializes string to date.

    Args:
        string: string containing date in 'YYYY-MM-DD' format.

    Returns:
        the de-serialized string as a date object.
    """
    return date_parser.parse(string).date()


def get_query_param(url_str: str, param: str) -> str:
    """Return a query parameter value from url_str

    Args:
        url_str: the URL from which to extract the query
            parameter value
        param: the name of the query parameter whose value
            should be returned

    Returns:
        the value of the `param` query parameter as a string

    Raises:
        ValueError: if errors are encountered parsing `url_str`
    """
    if not url_str:
        return None
    url = urlparse(url_str)
    if not url.query:
        return None
    query = parse_qs(url.query, strict_parsing=True)
    values = query.get(param)
    return values[0] if values else None


def convert_model(val: any) -> dict:
    """Convert a model object into an equivalent dict.

    Arguments:
        val: A dict or a model object

    Returns:
        A dict representation of the input object.
    """
    if isinstance(val, dict):
        return val
    if hasattr(val, "to_dict"):
        return val.to_dict()
    # Consider raising a ValueError here in the next major release
    return val


def convert_list(val: Union[str, List[str]]) -> str:
    """Convert a list of strings into comma-separated string.

    Arguments:
        val: A string or list of strings

    Returns:
        A comma-separated string of the items in the input list.
    """
    if isinstance(val, str):
        return val
    if isinstance(val, list) and all(isinstance(x, str) for x in val):
        return ",".join(val)
    # Consider raising a ValueError here in the next major release
    return val


def read_external_sources(service_name: str) -> dict:
    """Look for external configuration of a service.

    Try to get config from external sources, with the following priority:
    1. Credentials file(ibm-credentials.env)
    2. Environment variables
    3. VCAP Services(Cloud Foundry)

    Args:
        service_name: The service name

    Returns:
        A dictionary containing relevant configuration for the service if found.
    """
    config = {}

    config = __read_from_credential_file(service_name)

    if not config:
        config = __read_from_env_variables(service_name)

    if not config:
        config = __read_from_vcap_services(service_name)

    return config


def __read_from_env_variables(service_name: str) -> dict:
    """Return a config object based on environment variables for a service.

    Args:
        service_name: The name of the service to look for in env variables.

    Returns:
        A set of service configuration key-value pairs.
    """
    config = {}
    for key, value in environ.items():
        _parse_key_and_update_config(config, service_name, key, value)
    return config


def __read_from_credential_file(service_name: str, *, separator: str = '=') -> dict:
    """Return a config object based on credentials file for a service.

    Args:
        service_name: The name of the service to look for in env variables.

    Keyword Args:
        separator: The character to split on to de-serialize a line into a key-value pair.

    Returns:
        A set of service configuration key-value pairs.
    """
    default_credentials_file_name = 'ibm-credentials.env'

    # 1. ${IBM_CREDENTIALS_FILE}
    credential_file_path = getenv('IBM_CREDENTIALS_FILE')

    # 2. <current-working-directory>/ibm-credentials.env
    if credential_file_path is None:
        file_path = join(getcwd(), default_credentials_file_name)
        if isfile(file_path):
            credential_file_path = file_path

    # 3. <user-home-directory>/ibm-credentials.env
    if credential_file_path is None:
        file_path = join(expanduser('~'), default_credentials_file_name)
        if isfile(file_path):
            credential_file_path = file_path

    config = {}
    if credential_file_path is not None:
        try:
            with open(credential_file_path, 'r', encoding='utf-8') as fobj:
                for line in fobj:
                    key_val = line.strip().split(separator, 1)
                    if len(key_val) == 2:
                        key = key_val[0]
                        value = key_val[1]
                        _parse_key_and_update_config(config, service_name, key, value)
        except OSError:
            # just absorb the exception and make sure we return an empty response
            config = {}

    return config


def _parse_key_and_update_config(config: dict, service_name: str, key: str, value: str) -> None:
    service_name = service_name.replace(' ', '_').replace('-', '_').upper()
    if key.startswith(service_name):
        config[key[len(service_name) + 1 :]] = value


def __read_from_vcap_services(service_name: str) -> dict:
    """Return a config object based on the vcap services environment variable.

    Args:
        service_name: The name of the service to look for in the vcap.

    Returns:
        A set of service configuration key-value pairs.
    """
    vcap_services = getenv('VCAP_SERVICES')
    vcap_service_credentials = {}
    if vcap_services:
        services = json_import.loads(vcap_services)
        for key in services.keys():
            for i in range(len(services[key])):
                if vcap_service_credentials and isinstance(vcap_service_credentials, dict):
                    break
                if services[key][i].get('name') == service_name:
                    vcap_service_credentials = services[key][i].get('credentials', {})
        if not vcap_service_credentials:
            if service_name in services.keys():
                service = services.get(service_name)
                if service:
                    vcap_service_credentials = service[0].get('credentials', {})

        if vcap_service_credentials and isinstance(vcap_service_credentials, dict):
            new_vcap_creds = {}
            # cf
            if vcap_service_credentials.get('username') and vcap_service_credentials.get('password'):
                new_vcap_creds['AUTH_TYPE'] = 'basic'
                new_vcap_creds['USERNAME'] = vcap_service_credentials.get('username')
                new_vcap_creds['PASSWORD'] = vcap_service_credentials.get('password')
                vcap_service_credentials = new_vcap_creds
            elif vcap_service_credentials.get('iam_apikey'):
                new_vcap_creds['AUTH_TYPE'] = 'iam'
                new_vcap_creds['APIKEY'] = vcap_service_credentials.get('iam_apikey')
                vcap_service_credentials = new_vcap_creds
            elif vcap_service_credentials.get('apikey'):
                new_vcap_creds['AUTH_TYPE'] = 'iam'
                new_vcap_creds['APIKEY'] = vcap_service_credentials.get('apikey')
                vcap_service_credentials = new_vcap_creds
    return vcap_service_credentials


# A regex that matches an "application/json" mimetype.
json_mimetype_pattern = re.compile('^application/json(\\s*;.*)?$')


def is_json_mimetype(mimetype: str) -> bool:
    """Returns true if 'mimetype' is a JSON-like mimetype, false otherwise.

    Args:
        mimetype: The mimetype to check.

    Returns:
        true if mimetype is a JSON-line mimetype, false otherwise.
    """
    return mimetype is not None and json_mimetype_pattern.match(mimetype) is not None
