# -*- coding: utf-8 -*-
#
#    Copyright (C) 2016 Red Hat, Inc.
#    Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import abc
import collections
import enum
import logging
import threading

from oslo_utils import excutils
from oslo_utils import netutils
from oslo_utils import timeutils
import six
from stevedore import driver

import tooz

LOG = logging.getLogger(__name__)


TOOZ_BACKENDS_NAMESPACE = "tooz.backends"


class Characteristics(enum.Enum):
    """Attempts to describe the characteristic that a driver supports."""

    DISTRIBUTED_ACROSS_THREADS = 'DISTRIBUTED_ACROSS_THREADS'
    """Coordinator components when used by multiple **threads** work
       the same as if those components were only used by a single thread."""

    DISTRIBUTED_ACROSS_PROCESSES = 'DISTRIBUTED_ACROSS_PROCESSES'
    """Coordinator components when used by multiple **processes** work
       the same as if those components were only used by a single thread."""

    DISTRIBUTED_ACROSS_HOSTS = 'DISTRIBUTED_ACROSS_HOSTS'
    """Coordinator components when used by multiple **hosts** work
       the same as if those components were only used by a single thread."""

    NON_TIMEOUT_BASED = 'NON_TIMEOUT_BASED'
    """The driver has the following property:

    * Its operations are not based on the timeout of other clients, but on some
    other more robust mechanisms.
    """

    LINEARIZABLE = 'LINEARIZABLE'
    """The driver has the following properties:

    * Ensures each operation must take place before its
      completion time.
    * Any operation invoked subsequently must take place
      after the invocation and by extension, after the original operation
      itself.
    """

    SEQUENTIAL = 'SEQUENTIAL'
    """The driver has the following properties:

    * Operations can take effect before or after completion – but all
      operations retain the constraint that operations from any given process
      must take place in that processes order.
    """

    CAUSAL = 'CAUSAL'
    """The driver has the following properties:

    * Does **not** have to enforce the order of every
      operation from a process, perhaps, only causally related operations
      must occur in order.
    """

    SERIALIZABLE = 'SERIALIZABLE'
    """The driver has the following properties:

    * The history of **all** operations is equivalent to
      one that took place in some single atomic order but with unknown
      invocation and completion times - it places no bounds on
      time or order.
    """

    SAME_VIEW_UNDER_PARTITIONS = 'SAME_VIEW_UNDER_PARTITIONS'
    """When a client is connected to a server and that server is partitioned
    from a group of other servers it will (somehow) have the same view of
    data as a client connected to a server on the other side of the
    partition (typically this is accomplished by write availability being
    lost and therefore nothing can change).
    """

    SAME_VIEW_ACROSS_CLIENTS = 'SAME_VIEW_ACROSS_CLIENTS'
    """A client connected to one server will *always* have the same view
    every other client will have (no matter what server those other
    clients are connected to). Typically this is a sacrifice in
    write availability because before a write can be acknowledged it must
    be acknowledged by *all* servers in a cluster (so that all clients
    that are connected to those servers read the exact *same* thing).
    """


class Hooks(list):
    def run(self, *args, **kwargs):
        return list(map(lambda cb: cb(*args, **kwargs), self))


class Event(object):
    """Base class for events."""


class MemberJoinedGroup(Event):
    """A member joined a group event."""

    def __init__(self, group_id, member_id):
        self.group_id = group_id
        self.member_id = member_id


class MemberLeftGroup(Event):
    """A member left a group event."""

    def __init__(self, group_id, member_id):
        self.group_id = group_id
        self.member_id = member_id


class LeaderElected(Event):
    """A leader as been elected."""

    def __init__(self, group_id, member_id):
        self.group_id = group_id
        self.member_id = member_id


class Heart(object):
    """Coordination drivers main liveness pump (its heart)."""

    def __init__(self, driver, thread_cls=threading.Thread,
                 event_cls=threading.Event):
        self._thread_cls = thread_cls
        self._dead = event_cls()
        self._finished = event_cls()
        self._finished.set()
        self._runner = None
        self._driver = driver
        self._beats = 0

    @property
    def beats(self):
        """How many times the heart has beaten."""
        return self._beats

    def is_alive(self):
        """Returns if the heart is beating."""
        return not (self._runner is None
                    or not self._runner.is_alive()
                    or self._finished.is_set())

    @excutils.forever_retry_uncaught_exceptions
    def _beat_forever_until_stopped(self):
        """Inner beating loop."""
        try:
            while not self._dead.is_set():
                with timeutils.StopWatch() as w:
                    wait_until_next_beat = self._driver.heartbeat()
                ran_for = w.elapsed()
                if ran_for > wait_until_next_beat:
                    LOG.warning(
                        "Heartbeating took too long to execute (it ran for"
                        " %0.2f seconds which is %0.2f seconds longer than"
                        " the next heartbeat idle time). This may cause"
                        " timeouts (in locks, leadership, ...) to"
                        " happen (which will not end well).", ran_for,
                        ran_for - wait_until_next_beat)
                self._beats += 1
                # NOTE(harlowja): use the event object for waiting and
                # not a sleep function since doing that will allow this code
                # to terminate early if stopped via the stop() method vs
                # having to wait until the sleep function returns.
                self._dead.wait(wait_until_next_beat)
        finally:
            self._finished.set()

    def start(self, thread_cls=None):
        """Starts the heart beating thread (noop if already started)."""
        if not self.is_alive():
            self._finished.clear()
            self._dead.clear()
            self._beats = 0
            if thread_cls is None:
                thread_cls = self._thread_cls
            self._runner = thread_cls(target=self._beat_forever_until_stopped)
            self._runner.daemon = True
            self._runner.start()

    def stop(self):
        """Requests the heart beating thread to stop beating."""
        self._dead.set()

    def wait(self, timeout=None):
        """Wait up to given timeout for the heart beating thread to stop."""
        self._finished.wait(timeout)
        return self._finished.is_set()


@six.add_metaclass(abc.ABCMeta)
class CoordinationDriver(object):

    requires_beating = False
    """
    Usage requirement that if true requires that the :py:meth:`~.heartbeat`
    be called periodically (at a given rate) to avoid locks, sessions and
    other from being automatically closed/discarded by the coordinators
    backing store.
    """

    CHARACTERISTICS = ()
    """
    Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
    enum member(s) that can be used to interogate how this driver works.
    """

    def __init__(self):
        self._started = False
        self._hooks_join_group = collections.defaultdict(Hooks)
        self._hooks_leave_group = collections.defaultdict(Hooks)
        self._hooks_elected_leader = collections.defaultdict(Hooks)
        # A cache for group members
        self._group_members = collections.defaultdict(set)
        self.requires_beating = (
            CoordinationDriver.heartbeat != self.__class__.heartbeat
        )
        self.heart = Heart(self)

    def _has_hooks_for_group(self, group_id):
        return (len(self._hooks_join_group[group_id]) +
                len(self._hooks_leave_group[group_id]))

    @staticmethod
    def run_watchers(timeout=None):
        """Run the watchers callback.

        This may also activate :py:meth:`.run_elect_coordinator` (depending
        on driver implementation).
        """
        raise tooz.NotImplemented

    @staticmethod
    def run_elect_coordinator():
        """Try to leader elect this coordinator & activate hooks on success."""
        raise tooz.NotImplemented

    @abc.abstractmethod
    def watch_join_group(self, group_id, callback):
        """Call a function when group_id sees a new member joined.

        The callback functions will be executed when `run_watchers` is
        called.

        :param group_id: The group id to watch
        :param callback: The function to execute when a member joins this group

        """
        self._hooks_join_group[group_id].append(callback)

    @abc.abstractmethod
    def unwatch_join_group(self, group_id, callback):
        """Stop executing a function when a group_id sees a new member joined.

        :param group_id: The group id to unwatch
        :param callback: The function that was executed when a member joined
                         this group
        """
        try:
            self._hooks_join_group[group_id].remove(callback)
        except ValueError:
            raise WatchCallbackNotFound(group_id, callback)

        if (not self._has_hooks_for_group(group_id) and
           group_id in self._group_members):
            del self._group_members[group_id]

    @abc.abstractmethod
    def watch_leave_group(self, group_id, callback):
        """Call a function when group_id sees a new member leaving.

        The callback functions will be executed when `run_watchers` is
        called.

        :param group_id: The group id to watch
        :param callback: The function to execute when a member leaves this
                         group

        """
        self._hooks_leave_group[group_id].append(callback)

    @abc.abstractmethod
    def unwatch_leave_group(self, group_id, callback):
        """Stop executing a function when a group_id sees a new member leaving.

        :param group_id: The group id to unwatch
        :param callback: The function that was executed when a member left
                         this group
        """
        try:
            self._hooks_leave_group[group_id].remove(callback)
        except ValueError:
            raise WatchCallbackNotFound(group_id, callback)

        if (not self._has_hooks_for_group(group_id) and
           group_id in self._group_members):
            del self._group_members[group_id]

    @abc.abstractmethod
    def watch_elected_as_leader(self, group_id, callback):
        """Call a function when member gets elected as leader.

        The callback functions will be executed when `run_watchers` is
        called.

        :param group_id: The group id to watch
        :param callback: The function to execute when a member leaves this
                         group

        """
        self._hooks_elected_leader[group_id].append(callback)

    @abc.abstractmethod
    def unwatch_elected_as_leader(self, group_id, callback):
        """Call a function when member gets elected as leader.

        The callback functions will be executed when `run_watchers` is
        called.

        :param group_id: The group id to watch
        :param callback: The function to execute when a member leaves this
                         group

        """
        try:
            self._hooks_elected_leader[group_id].remove(callback)
        except ValueError:
            raise WatchCallbackNotFound(group_id, callback)

        if not self._hooks_elected_leader[group_id]:
            del self._hooks_elected_leader[group_id]

    @staticmethod
    def stand_down_group_leader(group_id):
        """Stand down as the group leader if we are.

        :param group_id: The group where we don't want to be a leader anymore
        """
        raise tooz.NotImplemented

    @property
    def is_started(self):
        return self._started

    def start(self, start_heart=False):
        """Start the service engine.

        If needed, the establishment of a connection to the servers
        is initiated.
        """
        if self._started:
            raise ToozError(
                "Can not start a driver which has not been stopped")
        self._start()
        if self.requires_beating and start_heart:
            self.heart.start()
        self._started = True

    def _start(self):
        pass

    def stop(self):
        """Stop the service engine.

        If needed, the connection to servers is closed and the client will
        disappear from all joined groups.
        """
        if not self._started:
            raise ToozError("Can not stop a driver which has not been started")
        if self.heart.is_alive():
            self.heart.stop()
            self.heart.wait()
        self._stop()
        self._started = False

    def _stop(self):
        pass

    @staticmethod
    def create_group(group_id):
        """Request the creation of a group asynchronously.

        :param group_id: the id of the group to create
        :type group_id: str
        :returns: None
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def get_groups():
        """Return the list composed by all groups ids asynchronously.

        :returns: the list of all created group ids
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def join_group(group_id, capabilities=b""):
        """Join a group and establish group membership asynchronously.

        :param group_id: the id of the group to join
        :type group_id: str
        :param capabilities: the capabilities of the joined member
        :type capabilities: object (typically str)
        :returns: None
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def leave_group(group_id):
        """Leave a group asynchronously.

        :param group_id: the id of the group to leave
        :type group_id: str
        :returns: None
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def delete_group(group_id):
        """Delete a group asynchronously.

        :param group_id: the id of the group to leave
        :type group_id: str
        :returns: Result
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def get_members(group_id):
        """Return the list of all members ids of the specified group.

        :returns: list of all created group ids
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def get_member_capabilities(group_id, member_id):
        """Return the capabilities of a member asynchronously.

        :param group_id: the id of the group of the member
        :type group_id: str
        :param member_id: the id of the member
        :type member_id: str
        :returns: capabilities of a member
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def get_member_info(group_id, member_id):
        """Return the statistics and capabilities of a member asynchronously.

        :param group_id: the id of the group of the member
        :type group_id: str
        :param member_id: the id of the member
        :type member_id: str
        :returns: capabilities and statistics of a member
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def update_capabilities(group_id, capabilities):
        """Update member capabilities in the specified group.

        :param group_id: the id of the group of the current member
        :type group_id: str
        :param capabilities: the capabilities of the updated member
        :type capabilities: object (typically str)
        :returns: None
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def get_leader(group_id):
        """Return the leader for a group.

        :param group_id: the id of the group:
        :returns: the leader
        :rtype: CoordAsyncResult
        """
        raise tooz.NotImplemented

    @staticmethod
    def get_lock(name):
        """Return a distributed lock.

        This is a exclusive lock, a second call to acquire() will block or
        return False.

        :param name: The lock name that is used to identify it across all
                     nodes.

        """
        raise tooz.NotImplemented

    @staticmethod
    def heartbeat():
        """Update member status to indicate it is still alive.

        Method to run once in a while to be sure that the member is not dead
        and is still an active member of a group.

        :return: The number of seconds to wait before sending a new heartbeat.
        """
        pass


@six.add_metaclass(abc.ABCMeta)
class CoordAsyncResult(object):
    """Representation of an asynchronous task.

    Every call API returns an CoordAsyncResult object on which the result or
    the status of the task can be requested.

    """

    @abc.abstractmethod
    def get(self, timeout=10):
        """Retrieve the result of the corresponding asynchronous call.

        :param timeout: block until the timeout expire.
        :type timeout: float
        """

    @abc.abstractmethod
    def done(self):
        """Returns True if the task is done, False otherwise."""


class _RunWatchersMixin(object):
    """Mixin to share the *mostly* common ``run_watchers`` implementation."""

    def run_watchers(self, timeout=None):
        with timeutils.StopWatch(duration=timeout) as w:
            known_groups = self.get_groups().get(
                timeout=w.leftover(return_none=True))
            result = []
            for group_id in known_groups:
                try:
                    group_members_fut = self.get_members(group_id)
                    group_members = group_members_fut.get(
                        timeout=w.leftover(return_none=True))
                except GroupNotCreated:
                    group_members = set()
                else:
                    group_members = set(group_members)
                if (group_id in self._joined_groups and
                        self._member_id not in group_members):
                    self._joined_groups.discard(group_id)
                old_group_members = self._group_members.get(group_id, set())
                for member_id in (group_members - old_group_members):
                    result.extend(
                        self._hooks_join_group[group_id].run(
                            MemberJoinedGroup(group_id, member_id)))
                for member_id in (old_group_members - group_members):
                    result.extend(
                        self._hooks_leave_group[group_id].run(
                            MemberLeftGroup(group_id, member_id)))
                self._group_members[group_id] = group_members
            return result


def get_coordinator(backend_url, member_id,
                    characteristics=frozenset(), **kwargs):
    """Initialize and load the backend.

    :param backend_url: the backend URL to use
    :type backend: str
    :param member_id: the id of the member
    :type member_id: str
    :param characteristics: set
    :type characteristics: set of :py:class:`.Characteristics` that will
                           be matched to the requested driver (this **will**
                           become a **required** parameter in a future tooz
                           version)
    :param kwargs: additional coordinator options (these take precedence over
                   options of the **same** name found in the ``backend_url``
                   arguments query string)
    """
    parsed_url = netutils.urlsplit(backend_url)
    parsed_qs = six.moves.urllib.parse.parse_qs(parsed_url.query)
    if kwargs:
        options = {}
        for (k, v) in six.iteritems(kwargs):
            options[k] = [v]
        for (k, v) in six.iteritems(parsed_qs):
            if k not in options:
                options[k] = v
    else:
        options = parsed_qs
    d = driver.DriverManager(
        namespace=TOOZ_BACKENDS_NAMESPACE,
        name=parsed_url.scheme,
        invoke_on_load=True,
        invoke_args=(member_id, parsed_url, options)).driver
    characteristics = set(characteristics)
    driver_characteristics = set(getattr(d, 'CHARACTERISTICS', set()))
    missing_characteristics = characteristics - driver_characteristics
    if missing_characteristics:
        raise ToozDriverChosenPoorly("Desired characteristics %s"
                                     " is not a strict subset of driver"
                                     " characteristics %s, %s"
                                     " characteristics were not found"
                                     % (characteristics,
                                        driver_characteristics,
                                        missing_characteristics))
    return d


class ToozError(Exception):
    """Exception raised when an internal error occurs.

    Raised for instance in case of server internal error.

    :ivar cause: the cause of the exception being raised, when not none this
                 will itself be an exception instance, this is useful for
                 creating a chain of exceptions for versions of python where
                 this is not yet implemented/supported natively.

    """

    def __init__(self, message, cause=None):
        super(ToozError, self).__init__(message)
        self.cause = cause


class ToozDriverChosenPoorly(ToozError):
    """Raised when a driver does not match desired characteristics."""


class ToozConnectionError(ToozError):
    """Exception raised when the client cannot connect to the server."""


class OperationTimedOut(ToozError):
    """Exception raised when an operation times out."""


class LockAcquireFailed(ToozError):
    """Exception raised when a lock acquire fails in a context manager."""


class GroupNotCreated(ToozError):
    """Exception raised when the caller request an nonexistent group."""
    def __init__(self, group_id):
        self.group_id = group_id
        super(GroupNotCreated, self).__init__(
            "Group %s does not exist" % group_id)


class GroupAlreadyExist(ToozError):
    """Exception raised trying to create an already existing group."""
    def __init__(self, group_id):
        self.group_id = group_id
        super(GroupAlreadyExist, self).__init__(
            "Group %s already exists" % group_id)


class MemberAlreadyExist(ToozError):
    """Exception raised trying to join a group already joined."""
    def __init__(self, group_id, member_id):
        self.group_id = group_id
        self.member_id = member_id
        super(MemberAlreadyExist, self).__init__(
            "Member %s has already joined %s" %
            (member_id, group_id))


class MemberNotJoined(ToozError):
    """Exception raised trying to access a member not in a group."""
    def __init__(self, group_id, member_id):
        self.group_id = group_id
        self.member_id = member_id
        super(MemberNotJoined, self).__init__("Member %s has not joined %s" %
                                              (member_id, group_id))


class GroupNotEmpty(ToozError):
    "Exception raised when the caller try to delete a group with members."
    def __init__(self, group_id):
        self.group_id = group_id
        super(GroupNotEmpty, self).__init__("Group %s is not empty" % group_id)


class WatchCallbackNotFound(ToozError):
    """Exception raised when unwatching a group.

    Raised when the caller tries to unwatch a group with a callback that
    does not exist.

    """
    def __init__(self, group_id, callback):
        self.group_id = group_id
        self.callback = callback
        super(WatchCallbackNotFound, self).__init__(
            'Callback %s is not registered on group %s' %
            (callback.__name__, group_id))


class SerializationError(ToozError):
    "Exception raised when serialization or deserialization breaks."


def raise_with_cause(exc_cls, message, *args, **kwargs):
    """Helper to raise + chain exceptions (when able) and associate a *cause*.

    **For internal usage only.**

    NOTE(harlowja): Since in py3.x exceptions can be chained (due to
    :pep:`3134`) we should try to raise the desired exception with the given
    *cause*.

    :param exc_cls: the :py:class:`~tooz.coordination.ToozError` class
                    to raise.
    :param message: the text/str message that will be passed to
                    the exceptions constructor as its first positional
                    argument.
    :param args: any additional positional arguments to pass to the
                 exceptions constructor.
    :param kwargs: any additional keyword arguments to pass to the
                   exceptions constructor.
    """
    if not issubclass(exc_cls, ToozError):
        raise ValueError("Subclass of tooz error is required")
    excutils.raise_with_cause(exc_cls, message, *args, **kwargs)
