# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
OpenStack Swift client library used internally
"""
import socket
import re
import logging
from urllib3.exceptions import HTTPError as urllib_http_error

import warnings

from requests.exceptions import RequestException, SSLError
import http.client as http_client
from requests.structures import CaseInsensitiveDict
from urllib.parse import quote, unquote
from urllib.parse import urljoin, urlparse, urlunparse
from time import sleep, time

from swiftclient import version as swiftclient_version
from swiftclient.exceptions import ClientException
from swiftclient.requests_compat import SwiftClientRequestsSession
from swiftclient.utils import (
    iter_wrapper, LengthWrapper, ReadableToIterable, parse_api_response,
    get_body)

# Default is 100, increase to 256
http_client._MAXHEADERS = 256

VERSIONFUL_AUTH_PATH = re.compile(r'v[2-3](?:\.0)?$')
AUTH_VERSIONS_V1 = ('1.0', '1', 1)
AUTH_VERSIONS_V2 = ('2.0', '2', 2)
AUTH_VERSIONS_V3 = ('3.0', '3', 3)
USER_METADATA_TYPE = tuple('x-%s-meta-' % type_ for type_ in
                           ('container', 'account', 'object'))
URI_PATTERN_INFO = re.compile(r'/info')
URI_PATTERN_VERSION = re.compile(r'\/v\d+\.?\d*(\/.*)?')

ksexceptions = ksclient_v2 = ksclient_v3 = ksa_v3 = None
try:
    from keystoneclient import exceptions as ksexceptions
    # prevent keystoneclient warning us that it has no log handlers
    logging.getLogger('keystoneclient').addHandler(logging.NullHandler())
    from keystoneclient.v2_0 import client as ksclient_v2
except ImportError:
    pass
try:
    from keystoneclient.v3 import client as ksclient_v3
    from keystoneauth1.identity import v3 as ksa_v3
    from keystoneauth1 import session as ksa_session
    from keystoneauth1 import exceptions as ksauthexceptions
except ImportError:
    pass

logger = logging.getLogger("swiftclient")
logger.addHandler(logging.NullHandler())

#: Default behaviour is to redact header values known to contain secrets,
#: such as ``X-Auth-Key`` and ``X-Auth-Token``. Up to the first 16 chars
#: may be revealed.
#:
#: To disable, set the value of ``redact_sensitive_headers`` to ``False``.
#:
#: When header redaction is enabled, ``reveal_sensitive_prefix`` configures the
#: maximum length of any sensitive header data sent to the logs. If the header
#: is less than twice this length, only ``int(len(value)/2)`` chars will be
#: logged; if it is less than 15 chars long, even less will be logged.
logger_settings = {
    'redact_sensitive_headers': True,
    'reveal_sensitive_prefix': 16
}
#: A list of sensitive headers to redact in logs. Note that when extending this
#: list, the header names must be added in all lower case.
LOGGER_SENSITIVE_HEADERS = [
    'x-auth-token', 'x-auth-key', 'x-service-token', 'x-storage-token',
    'x-account-meta-temp-url-key', 'x-account-meta-temp-url-key-2',
    'x-container-meta-temp-url-key', 'x-container-meta-temp-url-key-2',
    'set-cookie'
]


def safe_value(name, value):
    """
    Only show up to logger_settings['reveal_sensitive_prefix'] characters
    from a sensitive header.

    :param name: Header name
    :param value: Header value
    :return: Safe header value
    """
    if name.lower() in LOGGER_SENSITIVE_HEADERS:
        prefix_length = logger_settings.get('reveal_sensitive_prefix', 16)
        prefix_length = int(
            min(prefix_length, (len(value) ** 2) / 32, len(value) / 2)
        )
        redacted_value = value[0:prefix_length]
        return redacted_value + '...'
    return value


def scrub_headers(headers):
    """
    Redact header values that can contain sensitive information that
    should not be logged.

    :param headers: Either a dict or an iterable of two-element tuples
    :return: Safe dictionary of headers with sensitive information removed
    """
    if isinstance(headers, dict):
        headers = headers.items()
    headers = [
        (parse_header_string(key), parse_header_string(val))
        for (key, val) in headers
    ]
    if not logger_settings.get('redact_sensitive_headers', True):
        return dict(headers)
    if logger_settings.get('reveal_sensitive_prefix', 16) < 0:
        logger_settings['reveal_sensitive_prefix'] = 16
    return {key: safe_value(key, val) for (key, val) in headers}


def http_log(args, kwargs, resp, body):
    if not logger.isEnabledFor(logging.INFO):
        return

    # create and log equivalent curl command
    string_parts = ['curl -i']
    for element in args:
        if element == 'HEAD':
            string_parts.append(' -I')
        elif element in ('GET', 'POST', 'PUT'):
            string_parts.append(' -X %s' % element)
        else:
            string_parts.append(' %s' % parse_header_string(element))
    if 'headers' in kwargs:
        headers = scrub_headers(kwargs['headers'])
        for element in headers:
            header = ' -H "%s: %s"' % (element, headers[element])
            string_parts.append(header)

    # log response as debug if good, or info if error
    if resp.status < 300:
        log_method = logger.debug
    else:
        log_method = logger.info

    log_method("REQ: %s", "".join(string_parts))
    log_method("RESP STATUS: %s %s", resp.status, resp.reason)
    log_method("RESP HEADERS: %s", scrub_headers(resp.getheaders()))
    if body:
        resp_headers = resp_header_dict(resp)
        nbody = get_body(resp_headers, body)
        log_method("RESP BODY: %s", nbody)


def parse_header_string(data):
    if not isinstance(data, (str, bytes)):
        data = str(data)
    if isinstance(data, bytes):
        # Under Python3 requests only returns text_type and tosses (!) the
        # rest of the headers. If that ever changes, this should be a sane
        # approach.
        try:
            data = data.decode('ascii')
        except UnicodeDecodeError:
            data = quote(data)
    try:
        unquoted = unquote(data, errors='strict')
    except UnicodeDecodeError:
        return data
    return unquoted


def encode_utf8(value):
    if type(value) in (int, float, bool):
        # As of requests 2.11.0, headers must be byte- or unicode-strings.
        # Convert some known-good types as a convenience for developers.
        # Note that we *don't* convert subclasses, as they may have overriddden
        # __str__ or __repr__.
        # See https://github.com/kennethreitz/requests/pull/3366 for more info
        value = str(value)
    if isinstance(value, str):
        value = value.encode('utf8')
    return value


def encode_meta_headers(headers):
    """Only encode metadata headers keys"""
    ret = {}
    for header, value in headers.items():
        value = encode_utf8(value)
        header = header.lower()

        if (isinstance(header, str) and
                header.startswith(USER_METADATA_TYPE)):
            header = encode_utf8(header)

        ret[header] = value
    return ret


class LowerKeyCaseInsensitiveDict(CaseInsensitiveDict):
    """
    CaseInsensitiveDict returning lower case keys for items()
    """

    def __iter__(self):
        return iter(self._store.keys())


class _ObjectBody:
    """
    Readable and iterable object body response wrapper.
    """

    def __init__(self, resp, chunk_size, conn_to_close):
        """
        Wrap the underlying response

        :param resp: the response to wrap
        :param chunk_size: number of bytes to return each iteration/next call
        """
        self.resp = resp
        self.chunk_size = chunk_size
        self.conn_to_close = conn_to_close

    def read(self, length=None):
        buf = self.resp.read(length)
        if length != 0 and not buf:
            self.close()
        return buf

    def __iter__(self):
        return self

    def next(self):
        buf = self.read(self.chunk_size)
        if not buf:
            raise StopIteration()
        return buf

    def __next__(self):
        return self.next()

    def close(self):
        self.resp.close()
        if self.conn_to_close:
            self.conn_to_close.close()


class _RetryBody(_ObjectBody):
    """
    Wrapper for object body response which triggers a retry
    (from offset) if the connection is dropped after partially
    downloading the object.
    """
    def __init__(self, resp, connection, container, obj,
                 resp_chunk_size=None, query_string=None, response_dict=None,
                 headers=None):
        """
        Wrap the underlying response

        :param resp: the response to wrap
        :param connection: Connection class instance
        :param container: the name of the container the object is in
        :param obj: the name of object we are downloading
        :param resp_chunk_size: if defined, chunk size of data to read
        :param query_string: if set will be appended with '?' to generated path
        :param response_dict: an optional dictionary into which to place
                         the response - status, reason and headers
        :param headers: an optional dictionary with additional headers to
                         include in the request
        """
        super(_RetryBody, self).__init__(resp, resp_chunk_size, None)
        self.expected_length = int(self.resp.getheader('Content-Length'))
        self.conn = connection
        self.container = container
        self.obj = obj
        self.query_string = query_string
        self.response_dict = response_dict
        self.headers = dict(headers) if headers is not None else {}
        self.bytes_read = 0

    def read(self, length=None):
        buf = None
        try:
            buf = self.resp.read(length)
            self.bytes_read += len(buf)
        except (socket.error, urllib_http_error, RequestException):
            if self.conn.attempts > self.conn.retries:
                raise
        if (not buf and self.bytes_read < self.expected_length and
                self.conn.attempts <= self.conn.retries):
            self.headers['Range'] = 'bytes=%d-' % self.bytes_read
            self.headers['If-Match'] = self.resp.getheader('ETag')
            hdrs, body = self.conn._retry(None, get_object,
                                          self.container, self.obj,
                                          resp_chunk_size=self.chunk_size,
                                          query_string=self.query_string,
                                          response_dict=self.response_dict,
                                          headers=self.headers,
                                          attempts=self.conn.attempts)
            expected_range = 'bytes %d-%d/%d' % (
                self.bytes_read,
                self.expected_length - 1,
                self.expected_length)
            if 'content-range' not in hdrs:
                # Server didn't respond with partial content; manually seek
                logger.warning('Received 200 while retrying %s/%s; seeking...',
                               self.container, self.obj)
                to_read = self.bytes_read
                while to_read > 0:
                    buf = body.resp.read(min(to_read, self.chunk_size))
                    to_read -= len(buf)
            elif hdrs['content-range'] != expected_range:
                msg = ('Expected range "%s" while retrying %s/%s '
                       'but got "%s"' % (expected_range, self.container,
                                         self.obj, hdrs['content-range']))
                raise ClientException(msg)
            self.resp = body.resp
            buf = self.read(length)
        return buf


class HTTPConnection:
    def __init__(self, url, proxy=None, cacert=None, insecure=False,
                 cert=None, cert_key=None, ssl_compression=False,
                 default_user_agent=None, timeout=None):
        """
        Make an HTTPConnection or HTTPSConnection

        :param url: url to connect to
        :param proxy: proxy to connect through, if any; None by default; str
                      of the format 'http://127.0.0.1:8888' to set one
        :param cacert: A CA bundle file to use in verifying a TLS server
                       certificate.
        :param insecure: Allow to access servers without checking SSL certs.
                         The server's certificate will not be verified.
        :param cert: Client certificate file to connect on SSL server
                            requiring SSL client certificate.
        :param cert_key: Client certificate private key file.
        :param ssl_compression: SSL compression should be disabled by default
                                and this setting is not usable as of now. The
                                parameter is kept for backward compatibility.
        :param default_user_agent: Set the User-Agent header on every request.
                                   If set to None (default), the user agent
                                   will be "python-swiftclient-<version>". This
                                   may be overridden on a per-request basis by
                                   explicitly setting the user-agent header on
                                   a call to request().
        :param timeout: socket read timeout value, passed directly to
                        the requests library.
        :raises ClientException: Unable to handle protocol scheme
        """
        self.url = url
        self.parsed_url = urlparse(url)
        self.host = self.parsed_url.netloc
        self.port = self.parsed_url.port
        self.requests_args = {}
        self.request_session = SwiftClientRequestsSession()
        # Don't use requests's default headers
        self.request_session.headers = None
        self.resp = None
        if self.parsed_url.scheme not in ('http', 'https'):
            raise ClientException('Unsupported scheme "%s" in url "%s"'
                                  % (self.parsed_url.scheme, url))
        self.requests_args['verify'] = not insecure
        if cacert and not insecure:
            # verify requests parameter is used to pass the CA_BUNDLE file
            # see: http://docs.python-requests.org/en/latest/user/advanced/
            self.requests_args['verify'] = cacert
        if cert:
            # NOTE(cbrandily): cert requests parameter is used to pass client
            # cert path or  a tuple with client certificate/key paths.
            if cert_key:
                self.requests_args['cert'] = cert, cert_key
            else:
                self.requests_args['cert'] = cert

        if proxy:
            proxy_parsed = urlparse(proxy)
            if not proxy_parsed.scheme:
                raise ClientException("Proxy's missing scheme")
            self.requests_args['proxies'] = {
                proxy_parsed.scheme: '%s://%s' % (
                    proxy_parsed.scheme, proxy_parsed.netloc
                )
            }
        self.requests_args['stream'] = True
        if default_user_agent is None:
            default_user_agent = \
                'python-swiftclient-%s' % swiftclient_version.version_string
        self.default_user_agent = default_user_agent
        if timeout:
            self.requests_args['timeout'] = timeout

    def _request(self, *arg, **kwarg):
        """Final wrapper before requests call, to be patched in tests"""
        return self.request_session.request(*arg, **kwarg)

    def request(self, method, full_path, data=None, headers=None, files=None):
        """Encode url and header, then call requests.request"""
        if headers is None:
            headers = {}
        else:
            headers = encode_meta_headers(headers)

        # set a default User-Agent header if it wasn't passed in
        if 'user-agent' not in headers:
            headers['user-agent'] = self.default_user_agent
        url = "%s://%s%s" % (
            self.parsed_url.scheme,
            self.parsed_url.netloc,
            full_path)
        self.resp = self._request(method, url, headers=headers, data=data,
                                  files=files, **self.requests_args)
        return self.resp

    def putrequest(self, full_path, data=None, headers=None, files=None):
        """
        Use python-requests files upload

        :param data: Use data generator for chunked-transfer
        :param files: Use files for default transfer
        """
        return self.request('PUT', full_path, data, headers, files)

    def getresponse(self):
        """Adapt requests response to httplib interface"""
        self.resp.status = self.resp.status_code
        old_getheader = self.resp.headers.get

        def _decode_header(string):
            if string is None:
                return string
            return string.encode('iso-8859-1').decode('utf-8')

        def _encode_header(string):
            if string is None:
                return string
            return string.encode('utf-8').decode('iso-8859-1')

        def getheaders():
            return [(_decode_header(k), _decode_header(v))
                    for k, v in self.resp.headers.items()]

        def getheader(k, v=None):
            return _decode_header(old_getheader(
                _encode_header(k.lower()), _encode_header(v)))

        def releasing_read(*args, **kwargs):
            chunk = self.resp.raw.read(*args, **kwargs)
            if not chunk:
                # NOTE(sigmavirus24): Release the connection back to the
                # urllib3's connection pool. This will reduce the number of
                # log messages seen in bug #1341777. This does not actually
                # close a socket. It will also prevent people from being
                # misled as to the cause of a bug as in bug #1424732.
                self.resp.close()
            return chunk

        self.resp.getheaders = getheaders
        self.resp.getheader = getheader
        self.resp.read = releasing_read

        return self.resp

    def close(self):
        if self.resp:
            self.resp.close()
        self.request_session.close()


def http_connection(*arg, **kwarg):
    """:returns: tuple of (parsed url, connection object)"""
    conn = HTTPConnection(*arg, **kwarg)
    return conn.parsed_url, conn


def get_auth_1_0(url, user, key, **kwargs):
    cacert = kwargs.get('cacert', None)
    insecure = kwargs.get('insecure', False)
    cert = kwargs.get('cert')
    cert_key = kwargs.get('cert_key')
    timeout = kwargs.get('timeout', None)
    parsed, conn = http_connection(url, cacert=cacert, insecure=insecure,
                                   cert=cert, cert_key=cert_key,
                                   timeout=timeout)
    method = 'GET'
    headers = {'X-Auth-User': user, 'X-Auth-Key': key}
    conn.request(method, parsed.path, '', headers)
    resp = conn.getresponse()
    body = resp.read()
    resp.close()
    conn.close()
    http_log((url, method,), headers, resp, body)
    url = resp.getheader('x-storage-url')

    # There is a side-effect on current Rackspace 1.0 server where a
    # bad URL would get you that document page and a 200. We error out
    # if we don't have a x-storage-url header and if we get a body.
    if resp.status < 200 or resp.status >= 300 or (body and not url):
        raise ClientException.from_response(resp, 'Auth GET failed', body)

    token = resp.getheader('x-storage-token', resp.getheader('x-auth-token'))
    return url, token


def get_keystoneclient_2_0(auth_url, user, key, os_options, **kwargs):
    # this function is only here to preserve the historic 'public'
    # interface of this module
    kwargs.update({'auth_version': '2.0'})
    return get_auth_keystone(auth_url, user, key, os_options, **kwargs)


def get_auth_keystone(auth_url, user, key, os_options, **kwargs):
    """
    Authenticate against a keystone server.

    We are using the keystoneclient library for authentication.
    """

    insecure = kwargs.get('insecure', False)
    timeout = kwargs.get('timeout', None)
    auth_version = kwargs.get('auth_version', None)
    debug = logger.isEnabledFor(logging.DEBUG)

    # Add the version suffix in case of versionless Keystone endpoints. If
    # auth_version is also unset it is likely that it is v3
    if not VERSIONFUL_AUTH_PATH.match(
            urlparse(auth_url).path.rstrip('/').rsplit('/', 1)[-1]):
        # Normalize auth_url to end in a slash because urljoin
        auth_url = auth_url.rstrip('/') + '/'
        if auth_version and auth_version in AUTH_VERSIONS_V2:
            auth_url = urljoin(auth_url, "v2.0")
        else:
            auth_url = urljoin(auth_url, "v3")
            auth_version = '3'
        logger.debug("Versionless auth_url - using %s as endpoint" % auth_url)

    # Legacy default if not set
    if auth_version is None:
        auth_version = '2'

    ksclient = None
    if auth_version in AUTH_VERSIONS_V3:
        if ksclient_v3 is not None:
            ksclient = ksclient_v3
    else:
        if ksclient_v2 is not None:
            ksclient = ksclient_v2

    if ksclient is None:
        raise ClientException('''
Auth versions 2.0 and 3 require python-keystoneclient, install it or use Auth
version 1.0 which requires ST_AUTH, ST_USER, and ST_KEY environment
variables to be set or overridden with -A, -U, or -K.''')

    filter_kwargs = {}
    service_type = os_options.get('service_type') or 'object-store'
    endpoint_type = os_options.get('endpoint_type') or 'public'
    if os_options.get('region_name'):
        filter_kwargs['attr'] = 'region'
        filter_kwargs['filter_value'] = os_options['region_name']

    if os_options.get('auth_type') and os_options['auth_type'] not in (
            'password', 'v2password', 'v3password',
            'v3applicationcredential'):
        raise ClientException(
            'Swiftclient currently only supports v3applicationcredential '
            'for auth_type')
    elif os_options.get('auth_type') == 'v3applicationcredential':
        if ksa_v3 is None:
            raise ClientException('Auth v3applicationcredential requires '
                                  'keystoneauth1 package; consider upgrading '
                                  'to python-keystoneclient>=2.0.0')

        try:
            auth = ksa_v3.ApplicationCredential(
                auth_url=auth_url,
                application_credential_secret=os_options.get(
                    'application_credential_secret'),
                application_credential_id=os_options.get(
                    'application_credential_id'))
            sess = ksa_session.Session(auth=auth, timeout=timeout)
            token = sess.get_token()
        except ksauthexceptions.Unauthorized:
            msg = 'Unauthorized. Check application credential id and secret.'
            raise ClientException(msg)
        except ksauthexceptions.AuthorizationFailure as err:
            raise ClientException('Authorization Failure. %s' % err)

        try:
            endpoint = sess.get_endpoint_data(service_type=service_type,
                                              endpoint_type=endpoint_type,
                                              **filter_kwargs)

            return endpoint.catalog_url, token
        except ksauthexceptions.EndpointNotFound:
            raise ClientException(
                'Endpoint for %s not found - '
                'have you specified a region?' % service_type)

    try:
        _ksclient = ksclient.Client(
            username=user,
            password=key,
            token=os_options.get('auth_token'),
            tenant_name=os_options.get('tenant_name'),
            tenant_id=os_options.get('tenant_id'),
            user_id=os_options.get('user_id'),
            user_domain_name=os_options.get('user_domain_name'),
            user_domain_id=os_options.get('user_domain_id'),
            project_name=os_options.get('project_name'),
            project_id=os_options.get('project_id'),
            project_domain_name=os_options.get('project_domain_name'),
            project_domain_id=os_options.get('project_domain_id'),
            debug=debug,
            cacert=kwargs.get('cacert'),
            cert=kwargs.get('cert'),
            key=kwargs.get('cert_key'),
            auth_url=auth_url, insecure=insecure, timeout=timeout)
    except ksexceptions.Unauthorized:
        msg = 'Unauthorized. Check username, password and tenant name/id.'
        if auth_version in AUTH_VERSIONS_V3:
            msg = ('Unauthorized. Check username/id, password, '
                   'tenant name/id and user/tenant domain name/id.')
        raise ClientException(msg)
    except ksexceptions.AuthorizationFailure as err:
        raise ClientException('Authorization Failure. %s' % err)

    try:
        endpoint = _ksclient.service_catalog.url_for(
            service_type=service_type,
            endpoint_type=endpoint_type,
            **filter_kwargs)
    except ksexceptions.EndpointNotFound:
        raise ClientException('Endpoint for %s not found - '
                              'have you specified a region?' % service_type)
    return endpoint, _ksclient.auth_token


def get_auth(auth_url, user, key, **kwargs):
    """
    Get authentication/authorization credentials.

    :kwarg auth_version: the api version of the supplied auth params
    :kwarg os_options: a dict, the openstack identity service options

    :returns: a tuple, (storage_url, token)

    N.B. if the optional os_options parameter includes a non-empty
    'object_storage_url' key it will override the default storage url returned
    by the auth service.
    """
    session = kwargs.get('session', None)
    auth_version = kwargs.get('auth_version', '1')
    os_options = kwargs.get('os_options', {})

    cacert = kwargs.get('cacert', None)
    insecure = kwargs.get('insecure', False)
    cert = kwargs.get('cert')
    cert_key = kwargs.get('cert_key')
    timeout = kwargs.get('timeout', None)

    if session:
        service_type = os_options.get('service_type', 'object-store')
        interface = os_options.get('endpoint_type', 'public')
        region_name = os_options.get('region_name')
        storage_url = session.get_endpoint(service_type=service_type,
                                           interface=interface,
                                           region_name=region_name)
        token = session.get_token()
    elif auth_version in AUTH_VERSIONS_V1:
        storage_url, token = get_auth_1_0(auth_url,
                                          user,
                                          key,
                                          cacert=cacert,
                                          insecure=insecure,
                                          cert=cert,
                                          cert_key=cert_key,
                                          timeout=timeout)
    elif auth_version in AUTH_VERSIONS_V2 + AUTH_VERSIONS_V3:
        # We are handling a special use case here where the user argument
        # specifies both the user name and tenant name in the form tenant:user
        if user and not kwargs.get('tenant_name') and ':' in user:
            os_options['tenant_name'], user = user.split(':')

        # We are allowing to have a tenant_name argument in get_auth
        # directly without having os_options
        if kwargs.get('tenant_name'):
            os_options['tenant_name'] = kwargs['tenant_name']

        if os_options.get('auth_type') == 'v3applicationcredential':
            pass
        elif not (os_options.get('tenant_name') or
                  os_options.get('tenant_id') or
                  os_options.get('project_name') or
                  os_options.get('project_id')):
            if auth_version in AUTH_VERSIONS_V2:
                raise ClientException('No tenant specified')
            raise ClientException('No project name or project id specified.')

        storage_url, token = get_auth_keystone(auth_url, user,
                                               key, os_options,
                                               cacert=cacert,
                                               insecure=insecure,
                                               cert=cert,
                                               cert_key=cert_key,
                                               timeout=timeout,
                                               auth_version=auth_version)
    else:
        raise ClientException('Unknown auth_version %s specified and no '
                              'session found.' % auth_version)

    # Override storage url, if necessary
    if os_options.get('object_storage_url'):
        return os_options['object_storage_url'], token
    else:
        return storage_url, token


def resp_header_dict(resp):
    resp_headers = LowerKeyCaseInsensitiveDict()
    for header, value in resp.getheaders():
        header = parse_header_string(header)
        resp_headers[header] = parse_header_string(value)
    return resp_headers


def store_response(resp, response_dict):
    """
    store information about an operation into a dict

    :param resp: an http response object containing the response
                 headers
    :param response_dict: a dict into which are placed the
       status, reason and a dict of lower-cased headers
    """
    if response_dict is not None:
        response_dict['status'] = resp.status
        response_dict['reason'] = resp.reason
        response_dict['headers'] = resp_header_dict(resp)


def get_account(url, token, marker=None, limit=None, prefix=None,
                end_marker=None, http_conn=None, full_listing=False,
                service_token=None, headers=None, delimiter=None):
    """
    Get a listing of containers for the account.

    :param url: storage URL
    :param token: auth token
    :param marker: marker query
    :param limit: limit query
    :param prefix: prefix query
    :param end_marker: end_marker query
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param full_listing: if True, return a full listing, else returns a max
                         of 10000 listings
    :param service_token: service auth token
    :param headers: additional headers to include in the request
    :param delimiter: delimiter query
    :returns: a tuple of (response headers, a list of containers) The response
              headers will be a dict and all header names will be lowercase.
    :raises ClientException: HTTP GET request failed
    """
    req_headers = {'X-Auth-Token': token, 'Accept-Encoding': 'gzip'}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)

    close_conn = False
    if not http_conn:
        http_conn = http_connection(url)
        close_conn = True
    if full_listing:
        rv = get_account(url, token, marker, limit, prefix, end_marker,
                         http_conn, headers=req_headers, delimiter=delimiter)
        listing = rv[1]
        while listing:
            marker = listing[-1]['name']
            listing = get_account(url, token, marker, limit, prefix,
                                  end_marker, http_conn, headers=req_headers,
                                  delimiter=delimiter)[1]
            if listing:
                rv[1].extend(listing)
        return rv
    parsed, conn = http_conn
    qs = 'format=json'
    if marker:
        qs += '&marker=%s' % quote(marker)
    if limit:
        qs += '&limit=%d' % limit
    if prefix:
        qs += '&prefix=%s' % quote(prefix)
    if delimiter:
        qs += '&delimiter=%s' % quote(delimiter)
    if end_marker:
        qs += '&end_marker=%s' % quote(end_marker)
    full_path = '%s?%s' % (parsed.path, qs)
    method = 'GET'
    conn.request(method, full_path, '', req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(("%s?%s" % (url, qs), method,), {'headers': req_headers},
             resp, body)

    resp_headers = resp_header_dict(resp)
    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Account GET failed', body)
    if resp.status == 204:
        return resp_headers, []
    return resp_headers, parse_api_response(resp_headers, body)


def head_account(url, token, http_conn=None, headers=None,
                 service_token=None):
    """
    Get account stats.

    :param url: storage URL
    :param token: auth token
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param headers: additional headers to include in the request
    :param service_token: service auth token
    :returns: a dict containing the response's headers (all header names will
              be lowercase)
    :raises ClientException: HTTP HEAD request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    method = "HEAD"
    req_headers = {'X-Auth-Token': token}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)

    conn.request(method, parsed.path, '', req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log((url, method,), {'headers': req_headers}, resp, body)
    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Account HEAD failed', body)
    resp_headers = resp_header_dict(resp)
    return resp_headers


def post_account(url, token, headers, http_conn=None, response_dict=None,
                 service_token=None, query_string=None, data=None):
    """
    Update an account's metadata.

    :param url: storage URL
    :param token: auth token
    :param headers: additional headers to include in the request
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :param query_string: if set will be appended with '?' to generated path
    :param data: an optional message body for the request
    :raises ClientException: HTTP POST request failed
    :returns: resp_headers, body
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    method = 'POST'
    path = parsed.path
    if query_string:
        path += '?' + query_string
    req_headers = {'X-Auth-Token': token}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)
    conn.request(method, path, data, req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log((url, method,), {'headers': req_headers}, resp, body)

    store_response(resp, response_dict)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Account POST failed', body)
    resp_headers = {}
    for header, value in resp.getheaders():
        resp_headers[header.lower()] = value
    return resp_headers, body


def get_container(url, token, container, marker=None, limit=None,
                  prefix=None, delimiter=None, end_marker=None,
                  version_marker=None, path=None, http_conn=None,
                  full_listing=False, service_token=None, headers=None,
                  query_string=None):
    """
    Get a listing of objects for the container.

    :param url: storage URL
    :param token: auth token
    :param container: container name to get a listing for
    :param marker: marker query
    :param limit: limit query
    :param prefix: prefix query
    :param delimiter: string to delimit the queries on
    :param end_marker: marker query
    :param version_marker: version marker query
    :param path: path query (equivalent: "delimiter=/" and "prefix=path/")
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param full_listing: if True, return a full listing, else returns a max
                         of 10000 listings
    :param service_token: service auth token
    :param headers: additional headers to include in the request
    :param query_string: if set will be appended with '?' to generated path
    :returns: a tuple of (response headers, a list of objects) The response
              headers will be a dict and all header names will be lowercase.
    :raises ClientException: HTTP GET request failed
    """
    close_conn = False
    if not http_conn:
        http_conn = http_connection(url)
        close_conn = True
    if full_listing:
        rv = get_container(url, token, container, marker, limit, prefix,
                           delimiter, end_marker, version_marker, path=path,
                           http_conn=http_conn, service_token=service_token,
                           headers=headers)
        listing = rv[1]
        while listing:
            if not delimiter:
                marker = listing[-1]['name']
            else:
                marker = listing[-1].get('name', listing[-1].get('subdir'))
            version_marker = listing[-1].get('version_id')
            listing = get_container(url, token, container, marker, limit,
                                    prefix, delimiter, end_marker,
                                    version_marker, path, http_conn,
                                    service_token=service_token,
                                    headers=headers)[1]
            if listing:
                rv[1].extend(listing)
        return rv
    parsed, conn = http_conn
    cont_path = '%s/%s' % (parsed.path, quote(container))
    qs = 'format=json'
    if marker:
        qs += '&marker=%s' % quote(marker)
    if limit:
        qs += '&limit=%d' % limit
    if prefix:
        qs += '&prefix=%s' % quote(prefix)
    if delimiter:
        qs += '&delimiter=%s' % quote(delimiter)
    if end_marker:
        qs += '&end_marker=%s' % quote(end_marker)
    if version_marker:
        qs += '&version_marker=%s' % quote(version_marker)
    if path:
        qs += '&path=%s' % quote(path)
    if query_string:
        qs += '&%s' % query_string.lstrip('?')
    req_headers = {'X-Auth-Token': token, 'Accept-Encoding': 'gzip'}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)
    method = 'GET'
    conn.request(method, '%s?%s' % (cont_path, qs), '', req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%(url)s%(cont_path)s?%(qs)s' %
              {'url': url.replace(parsed.path, ''),
               'cont_path': cont_path,
               'qs': qs}, method,),
             {'headers': req_headers}, resp, body)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Container GET failed', body)
    resp_headers = resp_header_dict(resp)
    if resp.status == 204:
        return resp_headers, []
    return resp_headers, parse_api_response(resp_headers, body)


def head_container(url, token, container, http_conn=None, headers=None,
                   service_token=None):
    """
    Get container stats.

    :param url: storage URL
    :param token: auth token
    :param container: container name to get stats for
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param headers: additional headers to include in the request
    :param service_token: service auth token
    :returns: a dict containing the response's headers (all header names will
              be lowercase)
    :raises ClientException: HTTP HEAD request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    path = '%s/%s' % (parsed.path, quote(container))
    method = 'HEAD'
    req_headers = {'X-Auth-Token': token}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)
    conn.request(method, path, '', req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), method,),
             {'headers': req_headers}, resp, body)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(
            resp, 'Container HEAD failed', body)
    resp_headers = resp_header_dict(resp)
    return resp_headers


def put_container(url, token, container, headers=None, http_conn=None,
                  response_dict=None, service_token=None, query_string=None):
    """
    Create a container

    :param url: storage URL
    :param token: auth token
    :param container: container name to create
    :param headers: additional headers to include in the request
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :param query_string: if set will be appended with '?' to generated path
    :raises ClientException: HTTP PUT request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    path = '%s/%s' % (parsed.path, quote(container))
    method = 'PUT'
    req_headers = {'X-Auth-Token': token}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)
    if 'content-length' not in (k.lower() for k in req_headers):
        req_headers['Content-Length'] = '0'
    if query_string:
        path += '?' + query_string.lstrip('?')
    conn.request(method, path, '', req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()

    store_response(resp, response_dict)

    http_log(('%s%s' % (url.replace(parsed.path, ''), path), method,),
             {'headers': req_headers}, resp, body)
    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Container PUT failed', body)


def post_container(url, token, container, headers, http_conn=None,
                   response_dict=None, service_token=None):
    """
    Update a container's metadata.

    :param url: storage URL
    :param token: auth token
    :param container: container name to update
    :param headers: additional headers to include in the request
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :raises ClientException: HTTP POST request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    path = '%s/%s' % (parsed.path, quote(container))
    method = 'POST'
    req_headers = {'X-Auth-Token': token}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)
    if 'content-length' not in (k.lower() for k in headers):
        req_headers['Content-Length'] = '0'
    conn.request(method, path, '', req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), method,),
             {'headers': req_headers}, resp, body)

    store_response(resp, response_dict)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(
            resp, 'Container POST failed', body)


def delete_container(url, token, container, http_conn=None,
                     response_dict=None, service_token=None,
                     query_string=None, headers=None):
    """
    Delete a container

    :param url: storage URL
    :param token: auth token
    :param container: container name to delete
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :param query_string: if set will be appended with '?' to generated path
    :param headers: additional headers to include in the request
    :raises ClientException: HTTP DELETE request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    path = '%s/%s' % (parsed.path, quote(container))
    if headers:
        headers = dict(headers)
    else:
        headers = {}

    headers['X-Auth-Token'] = token
    if service_token:
        headers['X-Service-Token'] = service_token
    if query_string:
        path += '?' + query_string.lstrip('?')
    method = 'DELETE'
    conn.request(method, path, '', headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), method,),
             {'headers': headers}, resp, body)

    store_response(resp, response_dict)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(
            resp, 'Container DELETE failed', body)


def get_object(url, token, container, name, http_conn=None,
               resp_chunk_size=None, query_string=None,
               response_dict=None, headers=None, service_token=None):
    """
    Get an object

    :param url: storage URL
    :param token: auth token
    :param container: container name that the object is in
    :param name: object name to get
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object and close it
                      after all content is read)
    :param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
                            you specify a resp_chunk_size you must fully read
                            the object's contents before making another
                            request.
    :param query_string: if set will be appended with '?' to generated path
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param headers: an optional dictionary with additional headers to include
                    in the request
    :param service_token: service auth token
    :returns: a tuple of (response headers, the object's contents) The response
              headers will be a dict and all header names will be lowercase.
    :raises ClientException: HTTP GET request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
    if query_string:
        path += '?' + query_string
    method = 'GET'
    headers = headers.copy() if headers else {}
    headers['X-Auth-Token'] = token
    if service_token:
        headers['X-Service-Token'] = service_token
    conn.request(method, path, '', headers)
    resp = conn.getresponse()

    parsed_response = {}
    store_response(resp, parsed_response)
    if response_dict is not None:
        response_dict.update(parsed_response)

    if resp.status < 200 or resp.status >= 300:
        body = resp.read()
        http_log(('%s%s' % (url.replace(parsed.path, ''), path), method,),
                 {'headers': headers}, resp, body)
        raise ClientException.from_response(resp, 'Object GET failed', body)
    if resp_chunk_size:
        object_body = _ObjectBody(resp, resp_chunk_size,
                                  conn_to_close=conn if close_conn else None)
    else:
        object_body = resp.read()
        if close_conn:
            conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), method,),
             {'headers': headers}, resp, None)

    return parsed_response['headers'], object_body


def head_object(url, token, container, name, http_conn=None,
                service_token=None, headers=None, query_string=None):
    """
    Get object info

    :param url: storage URL
    :param token: auth token
    :param container: container name that the object is in
    :param name: object name to get info for
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param service_token: service auth token
    :param headers: additional headers to include in the request
    :returns: a dict containing the response's headers (all header names will
              be lowercase)
    :raises ClientException: HTTP HEAD request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
    if query_string:
        path += '?' + query_string
    if headers:
        headers = dict(headers)
    else:
        headers = {}
    headers['X-Auth-Token'] = token
    method = 'HEAD'
    if service_token:
        headers['X-Service-Token'] = service_token
    conn.request(method, path, '', headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), method,),
             {'headers': headers}, resp, body)
    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Object HEAD failed', body)
    resp_headers = resp_header_dict(resp)
    return resp_headers


def put_object(url, token=None, container=None, name=None, contents=None,
               content_length=None, etag=None, chunk_size=None,
               content_type=None, headers=None, http_conn=None, proxy=None,
               query_string=None, response_dict=None, service_token=None):
    """
    Put an object

    :param url: storage URL
    :param token: auth token; if None, no token will be sent
    :param container: container name that the object is in; if None, the
                      container name is expected to be part of the url
    :param name: object name to put; if None, the object name is expected to be
                 part of the url
    :param contents: a string, a file-like object or an iterable
                     to read object data from;
                     if None, a zero-byte put will be done
    :param content_length: value to send as content-length header; also limits
                           the amount read from contents; if None, it will be
                           computed via the contents or chunked transfer
                           encoding will be used
    :param etag: etag of contents; if None, no etag will be sent
    :param chunk_size: chunk size of data to write; it defaults to 65536;
                       used only if the contents object has a 'read'
                       method, e.g. file-like objects, ignored otherwise

    :param content_type: value to send as content-type header, overriding any
                       value included in the headers param; if None and no
                       value is found in the headers param, an empty string
                       value will be sent
    :param headers: additional headers to include in the request, if any
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param proxy: proxy to connect through, if any; None by default; str of the
                  format 'http://127.0.0.1:8888' to set one
    :param query_string: if set will be appended with '?' to generated path
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :returns: etag
    :raises ClientException: HTTP PUT request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url, proxy=proxy)
        close_conn = True
    path = parsed.path
    if container:
        path = '%s/%s' % (path.rstrip('/'), quote(container))
    if name:
        path = '%s/%s' % (path.rstrip('/'), quote(name))
    if query_string:
        path += '?' + query_string
    if headers:
        headers = dict(headers)
    else:
        headers = {}
    if token:
        headers['X-Auth-Token'] = token
    if service_token:
        headers['X-Service-Token'] = service_token
    if etag:
        headers['ETag'] = etag.strip('"')
    if content_length is not None:
        headers['Content-Length'] = str(content_length)
    else:
        for n, v in headers.items():
            if n.lower() == 'content-length':
                content_length = int(v)
    if content_type is not None:
        headers['Content-Type'] = content_type
    if not contents:
        headers['Content-Length'] = '0'

    if isinstance(contents, (ReadableToIterable, LengthWrapper)):
        conn.putrequest(path, headers=headers, data=contents)
    elif hasattr(contents, 'read'):
        if chunk_size is None:
            chunk_size = 65536

        if content_length is None:
            data = ReadableToIterable(contents, chunk_size, md5=False)
        else:
            data = LengthWrapper(contents, content_length, md5=False)

        conn.putrequest(path, headers=headers, data=data)
    else:
        if chunk_size is not None:
            warn_msg = ('%s object has no "read" method, ignoring chunk_size'
                        % type(contents).__name__)
            warnings.warn(warn_msg, stacklevel=2)
        # Match requests's is_stream test
        if hasattr(contents, '__iter__') and not isinstance(contents, (
                str, bytes, list, tuple, dict)):
            contents = iter_wrapper(contents)
        conn.request('PUT', path, contents, headers)

    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), 'PUT',),
             {'headers': headers}, resp, body)

    store_response(resp, response_dict)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Object PUT failed', body)

    etag = resp.getheader('etag', '').strip('"')
    return etag


def post_object(url, token, container, name, headers, http_conn=None,
                response_dict=None, service_token=None):
    """
    Update object metadata

    :param url: storage URL
    :param token: auth token
    :param container: container name that the object is in
    :param name: name of the object to update
    :param headers: additional headers to include in the request
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :raises ClientException: HTTP POST request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True
    path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
    req_headers = {'X-Auth-Token': token}
    if service_token:
        req_headers['X-Service-Token'] = service_token
    if headers:
        req_headers.update(headers)
    conn.request('POST', path, '', req_headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), 'POST',),
             {'headers': req_headers}, resp, body)

    store_response(resp, response_dict)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Object POST failed', body)


def copy_object(url, token, container, name, destination=None,
                headers=None, fresh_metadata=None, http_conn=None,
                response_dict=None, service_token=None):
    """
    Copy object

    :param url: storage URL
    :param token: auth token; if None, no token will be sent
    :param container: container name that the source object is in
    :param name: source object name
    :param destination: The container and object name of the destination object
                        in the form of /container/object; if None, the copy
                        will use the source as the destination.
    :param headers: additional headers to include in the request
    :param fresh_metadata: Enables object creation that omits existing user
                           metadata, default None
    :param http_conn: HTTP connection object (If None, it will create the
                      conn object)
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :raises ClientException: HTTP COPY request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url)
        close_conn = True

    path = parsed.path
    container = quote(container)
    name = quote(name)
    path = '%s/%s/%s' % (path.rstrip('/'), container, name)

    headers = dict(headers) if headers else {}

    if destination is not None:
        headers['Destination'] = quote(destination)
    elif container and name:
        headers['Destination'] = '/%s/%s' % (container, name)

    if token is not None:
        headers['X-Auth-Token'] = token
    if service_token is not None:
        headers['X-Service-Token'] = service_token

    if fresh_metadata is not None:
        # remove potential fresh metadata headers
        for fresh_hdr in [hdr for hdr in headers.keys()
                          if hdr.lower() == 'x-fresh-metadata']:
            headers.pop(fresh_hdr)
        headers['X-Fresh-Metadata'] = 'true' if fresh_metadata else 'false'

    conn.request('COPY', path, '', headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), 'COPY',),
             {'headers': headers}, resp, body)

    store_response(resp, response_dict)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Object COPY failed', body)


def delete_object(url, token=None, container=None, name=None, http_conn=None,
                  headers=None, proxy=None, query_string=None,
                  response_dict=None, service_token=None):
    """
    Delete object

    :param url: storage URL
    :param token: auth token; if None, no token will be sent
    :param container: container name that the object is in; if None, the
                      container name is expected to be part of the url
    :param name: object name to delete; if None, the object name is expected to
                 be part of the url
    :param http_conn: a tuple of (parsed url, HTTPConnection object),
                      (If None, it will create the conn object)
    :param headers: additional headers to include in the request
    :param proxy: proxy to connect through, if any; None by default; str of the
                  format 'http://127.0.0.1:8888' to set one
    :param query_string: if set will be appended with '?' to generated path
    :param response_dict: an optional dictionary into which to place
                     the response - status, reason and headers
    :param service_token: service auth token
    :raises ClientException: HTTP DELETE request failed
    """
    close_conn = False
    if http_conn:
        parsed, conn = http_conn
    else:
        parsed, conn = http_connection(url, proxy=proxy)
        close_conn = True
    path = parsed.path
    if container:
        path = '%s/%s' % (path.rstrip('/'), quote(container))
    if name:
        path = '%s/%s' % (path.rstrip('/'), quote(name))
    if query_string:
        path += '?' + query_string
    if headers:
        headers = dict(headers)
    else:
        headers = {}
    if token:
        headers['X-Auth-Token'] = token
    if service_token:
        headers['X-Service-Token'] = service_token
    conn.request('DELETE', path, '', headers)
    resp = conn.getresponse()
    body = resp.read()
    if close_conn:
        conn.close()
    http_log(('%s%s' % (url.replace(parsed.path, ''), path), 'DELETE',),
             {'headers': headers}, resp, body)

    store_response(resp, response_dict)

    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(resp, 'Object DELETE failed', body)


def get_capabilities(http_conn):
    """
    Get cluster capability infos.

    :param http_conn: a tuple of (parsed url, HTTPConnection object)
    :returns: a dict containing the cluster capabilities
    :raises ClientException: HTTP Capabilities GET failed
    """
    parsed, conn = http_conn
    headers = {'Accept-Encoding': 'gzip'}
    conn.request('GET', parsed.path, '', headers)
    resp = conn.getresponse()
    body = resp.read()
    http_log((parsed.geturl(), 'GET',), {'headers': headers}, resp, body)
    if resp.status < 200 or resp.status >= 300:
        raise ClientException.from_response(
            resp, 'Capabilities GET failed', body)
    resp_headers = resp_header_dict(resp)
    return parse_api_response(resp_headers, body)


class Connection:

    """
    Convenience class to make requests that will also retry the request

    Requests will have an X-Auth-Token header whose value is either
    the preauthtoken or a token obtained from the auth service using
    the user credentials provided as args to the constructor. If
    os_options includes a service_username then requests will also have
    an X-Service-Token header whose value is a token obtained from the
    auth service using the service credentials. In this case the request
    url will be set to the storage_url obtained from the auth service
    for the service user, unless this is overridden by a preauthurl.
    """

    def __init__(self, authurl=None, user=None, key=None, retries=5,
                 preauthurl=None, preauthtoken=None,
                 starting_backoff=1, max_backoff=64, tenant_name=None,
                 os_options=None, auth_version="1", cacert=None,
                 insecure=False, cert=None, cert_key=None,
                 ssl_compression=True, retry_on_ratelimit=True,
                 timeout=None, session=None, force_auth_retry=False):
        """
        :param authurl: authentication URL
        :param user: user name to authenticate as
        :param key: key/password to authenticate with
        :param retries: Number of times to retry the request before failing
        :param preauthurl: storage URL (if you have already authenticated)
        :param preauthtoken: authentication token (if you have already
                             authenticated) note authurl/user/key/tenant_name
                             are not required when specifying preauthtoken
        :param starting_backoff: initial delay between retries (seconds)
        :param max_backoff: maximum delay between retries (seconds)
        :param auth_version: OpenStack auth version, default is 1.0
        :param tenant_name: The tenant/account name, required when connecting
                            to an auth 2.0 system.
        :param os_options: The OpenStack options which can have tenant_id,
                           auth_token, service_type, endpoint_type,
                           tenant_name, object_storage_url, region_name,
                           service_username, service_project_name, service_key
        :param insecure: Allow to access servers without checking SSL certs.
                         The server's certificate will not be verified.
        :param cert: Client certificate file to connect on SSL server
                            requiring SSL client certificate.
        :param cert_key: Client certificate private key file.
        :param ssl_compression: Whether to enable compression at the SSL layer.
                                If set to 'False' and the pyOpenSSL library is
                                present an attempt to disable SSL compression
                                will be made. This may provide a performance
                                increase for https upload/download operations.
        :param retry_on_ratelimit: by default, a ratelimited connection will
                                   retry after a backoff. Setting this
                                   parameter to False will cause an exception
                                   to be raised to the caller.
        :param timeout: The connect timeout for the HTTP connection.
        :param session: A keystoneauth session object.
        :param force_auth_retry: reset auth info even if client got unexpected
                                 error except 401 Unauthorized.
        """
        self.session = session
        self.authurl = authurl
        self.user = user
        self.key = key
        self.retries = retries
        self.http_conn = None
        self.attempts = 0
        self.starting_backoff = starting_backoff
        self.max_backoff = max_backoff
        self.auth_version = auth_version
        self.os_options = dict(os_options or {})
        if tenant_name:
            self.os_options['tenant_name'] = tenant_name
        if preauthurl:
            self.os_options['object_storage_url'] = preauthurl
        self.url = preauthurl or self.os_options.get('object_storage_url')
        self.token = preauthtoken or self.os_options.get('auth_token')
        if self.os_options.get('service_username', None):
            self.service_auth = True
        else:
            self.service_auth = False
        self.service_token = None
        self.cacert = cacert
        self.insecure = insecure
        self.cert = cert
        self.cert_key = cert_key
        self.ssl_compression = ssl_compression
        self.auth_end_time = 0
        self.retry_on_ratelimit = retry_on_ratelimit
        self.timeout = timeout
        self.force_auth_retry = force_auth_retry

    def close(self):
        if (self.http_conn and isinstance(self.http_conn, tuple) and
                len(self.http_conn) > 1):
            conn = self.http_conn[1]
            conn.close()
            self.http_conn = None

    def get_auth(self):
        self.url, self.token = get_auth(self.authurl, self.user, self.key,
                                        session=self.session,
                                        auth_version=self.auth_version,
                                        os_options=self.os_options,
                                        cacert=self.cacert,
                                        insecure=self.insecure,
                                        cert=self.cert,
                                        cert_key=self.cert_key,
                                        timeout=self.timeout)
        return self.url, self.token

    def get_service_auth(self):
        opts = self.os_options
        service_options = {}
        service_options['tenant_name'] = opts.get('service_project_name', None)
        service_options['region_name'] = opts.get('region_name', None)
        service_options['object_storage_url'] = opts.get('object_storage_url',
                                                         None)
        service_user = opts.get('service_username', None)
        service_key = opts.get('service_key', None)
        return get_auth(self.authurl, service_user, service_key,
                        session=self.session,
                        auth_version=self.auth_version,
                        os_options=service_options,
                        cacert=self.cacert,
                        insecure=self.insecure,
                        timeout=self.timeout)

    def http_connection(self, url=None):
        return http_connection(url if url else self.url,
                               cacert=self.cacert,
                               insecure=self.insecure,
                               cert=self.cert,
                               cert_key=self.cert_key,
                               ssl_compression=self.ssl_compression,
                               timeout=self.timeout)

    def _add_response_dict(self, target_dict, kwargs):
        if target_dict is not None and 'response_dict' in kwargs:
            response_dict = kwargs['response_dict']
            if 'response_dicts' in target_dict:
                target_dict['response_dicts'].append(response_dict)
            else:
                target_dict['response_dicts'] = [response_dict]
            target_dict.update(response_dict)

    def _retry(self, reset_func, func, *args, **kwargs):
        retried_auth = False
        backoff = self.starting_backoff
        caller_response_dict = kwargs.pop('response_dict', None)
        self.attempts = kwargs.pop('attempts', 0)
        while self.attempts <= self.retries or retried_auth:
            self.attempts += 1
            try:
                if not self.url or not self.token:
                    self.url, self.token = self.get_auth()
                    self.close()
                if self.service_auth and not self.service_token:
                    self.url, self.service_token = self.get_service_auth()
                    self.close()
                self.auth_end_time = time()
                if not self.http_conn:
                    self.http_conn = self.http_connection()
                kwargs['http_conn'] = self.http_conn
                if caller_response_dict is not None:
                    kwargs['response_dict'] = {}
                rv = func(self.url, self.token, *args,
                          service_token=self.service_token, **kwargs)
                self._add_response_dict(caller_response_dict, kwargs)
                return rv
            except SSLError as e:
                self._add_response_dict(caller_response_dict, kwargs)
                if ('certificate verify' in str(e)) or \
                        ('hostname' in str(e)) or \
                        self.attempts > self.retries:
                    raise
                self.http_conn = None
            except (socket.error, RequestException):
                self._add_response_dict(caller_response_dict, kwargs)
                if self.attempts > self.retries:
                    raise
                self.http_conn = None
            except ClientException as err:
                self._add_response_dict(caller_response_dict, kwargs)
                if err.http_status == 401:
                    if self.session:
                        should_retry = self.session.invalidate()
                    else:
                        # Without a proper session, just check for auth creds
                        should_retry = all((self.authurl, self.user, self.key))

                    self.url = self.token = self.service_token = None

                    if retried_auth or not should_retry:
                        raise
                    retried_auth = True
                elif self.attempts > self.retries or err.http_status is None:
                    raise
                elif err.http_status in (408, 499):
                    # Server hit a timeout, so HTTP request/response framing
                    # are likely in a bad state; trash the connection
                    self.http_conn = None
                elif 500 <= err.http_status <= 599:
                    pass
                elif self.retry_on_ratelimit and err.http_status in (498, 429):
                    pass
                else:
                    raise

            if self.force_auth_retry:
                self.url = self.token = self.service_token = None

            sleep(backoff)
            backoff = min(backoff * 2, self.max_backoff)
            if reset_func:
                reset_func(func, *args, **kwargs)

    def head_account(self, headers=None):
        """Wrapper for :func:`head_account`"""
        return self._retry(None, head_account, headers=headers)

    def get_account(self, marker=None, limit=None, prefix=None,
                    end_marker=None, full_listing=False, headers=None,
                    delimiter=None):
        """Wrapper for :func:`get_account`"""
        # TODO(unknown): With full_listing=True this will restart the entire
        # listing with each retry. Need to make a better version that just
        # retries where it left off.
        return self._retry(None, get_account, marker=marker, limit=limit,
                           prefix=prefix, end_marker=end_marker,
                           full_listing=full_listing, headers=headers,
                           delimiter=delimiter)

    def post_account(self, headers, response_dict=None,
                     query_string=None, data=None):
        """Wrapper for :func:`post_account`"""
        return self._retry(None, post_account, headers,
                           query_string=query_string, data=data,
                           response_dict=response_dict)

    def head_container(self, container, headers=None):
        """Wrapper for :func:`head_container`"""
        return self._retry(None, head_container, container, headers=headers)

    def get_container(self, container, marker=None, limit=None, prefix=None,
                      delimiter=None, end_marker=None, version_marker=None,
                      path=None, full_listing=False, headers=None,
                      query_string=None):
        """Wrapper for :func:`get_container`"""
        # TODO(unknown): With full_listing=True this will restart the entire
        # listing with each retry. Need to make a better version that just
        # retries where it left off.
        return self._retry(None, get_container, container, marker=marker,
                           limit=limit, prefix=prefix, delimiter=delimiter,
                           end_marker=end_marker,
                           version_marker=version_marker, path=path,
                           full_listing=full_listing, headers=headers,
                           query_string=query_string)

    def put_container(self, container, headers=None, response_dict=None,
                      query_string=None):
        """Wrapper for :func:`put_container`"""
        return self._retry(None, put_container, container, headers=headers,
                           response_dict=response_dict,
                           query_string=query_string)

    def post_container(self, container, headers, response_dict=None):
        """Wrapper for :func:`post_container`"""
        return self._retry(None, post_container, container, headers,
                           response_dict=response_dict)

    def delete_container(self, container, response_dict=None,
                         query_string=None, headers={}):
        """Wrapper for :func:`delete_container`"""
        return self._retry(None, delete_container, container,
                           response_dict=response_dict,
                           query_string=query_string,
                           headers=headers)

    def head_object(self, container, obj, headers=None, query_string=None):
        """Wrapper for :func:`head_object`"""
        return self._retry(None, head_object, container, obj, headers=headers,
                           query_string=query_string)

    def get_object(self, container, obj, resp_chunk_size=None,
                   query_string=None, response_dict=None, headers=None):
        """Wrapper for :func:`get_object`"""
        rheaders, body = self._retry(None, get_object, container, obj,
                                     resp_chunk_size=resp_chunk_size,
                                     query_string=query_string,
                                     response_dict=response_dict,
                                     headers=headers)
        is_not_range_request = (
            not headers or 'range' not in (k.lower() for k in headers))
        retry_is_possible = (
            is_not_range_request and resp_chunk_size and
            self.attempts <= self.retries and
            rheaders.get('transfer-encoding') is None)

        if retry_is_possible:
            body = _RetryBody(body.resp, self, container, obj,
                              resp_chunk_size=resp_chunk_size,
                              query_string=query_string,
                              response_dict=response_dict,
                              headers=headers)
        return rheaders, body

    def put_object(self, container, obj, contents, content_length=None,
                   etag=None, chunk_size=None, content_type=None,
                   headers=None, query_string=None, response_dict=None):
        """Wrapper for :func:`put_object`"""

        def _default_reset(*args, **kwargs):
            raise ClientException('put_object(%r, %r, ...) failure and no '
                                  'ability to reset contents for reupload.'
                                  % (container, obj))

        if isinstance(contents, str) or not contents:
            # if its a str or None then you can retry as much as you want
            reset_func = None
        else:
            reset_func = _default_reset
            if self.retries > 0:
                tell = getattr(contents, 'tell', None)
                seek = getattr(contents, 'seek', None)
                reset = getattr(contents, 'reset', None)
                if tell and seek:
                    orig_pos = tell()

                    def reset_func(*a, **kw):
                        seek(orig_pos)
                elif reset:
                    reset_func = reset
        return self._retry(reset_func, put_object, container, obj, contents,
                           content_length=content_length, etag=etag,
                           chunk_size=chunk_size, content_type=content_type,
                           headers=headers, query_string=query_string,
                           response_dict=response_dict)

    def post_object(self, container, obj, headers, response_dict=None):
        """Wrapper for :func:`post_object`"""
        return self._retry(None, post_object, container, obj, headers,
                           response_dict=response_dict)

    def copy_object(self, container, obj, destination=None, headers=None,
                    fresh_metadata=None, response_dict=None):
        """Wrapper for :func:`copy_object`"""
        return self._retry(None, copy_object, container, obj, destination,
                           headers, fresh_metadata,
                           response_dict=response_dict)

    def delete_object(self, container, obj, query_string=None,
                      response_dict=None, headers=None):
        """Wrapper for :func:`delete_object`"""
        return self._retry(None, delete_object, container, obj,
                           query_string=query_string,
                           response_dict=response_dict,
                           headers=headers)

    def _map_url(self, url):
        url = url or self.url
        if not url:
            url, _ = self.get_auth()
        scheme, netloc, path, params, query, fragment = urlparse(url)
        if URI_PATTERN_VERSION.search(path):
            path = URI_PATTERN_VERSION.sub('/info', path)
        elif not URI_PATTERN_INFO.search(path):
            if path.endswith('/'):
                path += 'info'
            else:
                path += '/info'
        return urlunparse((scheme, netloc, path, params, query, fragment))

    def get_capabilities(self, url=None):
        parsed = urlparse(self._map_url(url))
        if not self.http_conn:
            self.http_conn = self.http_connection(url)
        return get_capabilities((parsed, self.http_conn[1]))
