#!/usr/bin/env python
# encoding: utf-8

"""Things directly related to all of twisted."""

__docformat__ = "restructuredtext en"

#-------------------------------------------------------------------------------
#  Copyright (C) 2008  The IPython Development Team
#
#  Distributed under the terms of the BSD License.  The full license is in
#  the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------

#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------

import os, sys
import threading, Queue, atexit

import twisted
from twisted.internet import defer, reactor
from twisted.python import log, failure

from IPython.kernel.error import FileTimeoutError

#-------------------------------------------------------------------------------
# Classes related to twisted and threads
#-------------------------------------------------------------------------------


class ReactorInThread(threading.Thread):
    """Run the twisted reactor in a different thread.
    
    For the process to be able to exit cleanly, do the following:
    
    rit = ReactorInThread()
    rit.setDaemon(True)
    rit.start()
    
    """
    
    def run(self):
        reactor.run(installSignalHandlers=0)
        # self.join()
        
    def stop(self):
        # I don't think this does anything useful.
        blockingCallFromThread(reactor.stop)
        self.join()

if(twisted.version.major >= 8):
    import twisted.internet.threads
    def blockingCallFromThread(f, *a, **kw):
        """
        Run a function in the reactor from a thread, and wait for the result
        synchronously, i.e. until the callback chain returned by the function get a
        result.
        
        Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
        passing twisted.internet.reactor for the first argument.
        
        @param f: the callable to run in the reactor thread
        @type f: any callable.
        @param a: the arguments to pass to C{f}.
        @param kw: the keyword arguments to pass to C{f}.

        @return: the result of the callback chain.
        @raise: any error raised during the callback chain.
        """
        return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
    
else:   
    def blockingCallFromThread(f, *a, **kw):
        """
        Run a function in the reactor from a thread, and wait for the result
        synchronously, i.e. until the callback chain returned by the function get a
        result.

        @param f: the callable to run in the reactor thread
        @type f: any callable.
        @param a: the arguments to pass to C{f}.
        @param kw: the keyword arguments to pass to C{f}.

        @return: the result of the callback chain.
        @raise: any error raised during the callback chain.
        """
        from twisted.internet import reactor
        queue = Queue.Queue()
        def _callFromThread():
            result = defer.maybeDeferred(f, *a, **kw)
            result.addBoth(queue.put)
        
        reactor.callFromThread(_callFromThread)
        result = queue.get()
        if isinstance(result, failure.Failure):
            # This makes it easier for the debugger to get access to the instance
            try:
                result.raiseException()
            except Exception, e:
                raise e
        return result
    


#-------------------------------------------------------------------------------
# Things for managing deferreds
#-------------------------------------------------------------------------------


def parseResults(results):
    """Pull out results/Failures from a DeferredList."""
    return [x[1] for x in results]

def gatherBoth(dlist, fireOnOneCallback=0, 
                      fireOnOneErrback=0,
                      consumeErrors=0,
                      logErrors=0):
    """This is like gatherBoth, but sets consumeErrors=1."""
    d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
                     consumeErrors, logErrors)
    if not fireOnOneCallback:
        d.addCallback(parseResults)
    return d

SUCCESS = True
FAILURE = False

class DeferredList(defer.Deferred):
    """I combine a group of deferreds into one callback.

    I track a list of L{Deferred}s for their callbacks, and make a single
    callback when they have all completed, a list of (success, result)
    tuples, 'success' being a boolean.

    Note that you can still use a L{Deferred} after putting it in a
    DeferredList.  For example, you can suppress 'Unhandled error in Deferred'
    messages by adding errbacks to the Deferreds *after* putting them in the
    DeferredList, as a DeferredList won't swallow the errors.  (Although a more
    convenient way to do this is simply to set the consumeErrors flag)
    
    Note: This is a modified version of the twisted.internet.defer.DeferredList
    """

    fireOnOneCallback = 0
    fireOnOneErrback = 0

    def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
                 consumeErrors=0, logErrors=0):
        """Initialize a DeferredList.

        @type deferredList:  C{list} of L{Deferred}s
        @param deferredList: The list of deferreds to track.
        @param fireOnOneCallback: (keyword param) a flag indicating that
                             only one callback needs to be fired for me to call
                             my callback
        @param fireOnOneErrback: (keyword param) a flag indicating that
                            only one errback needs to be fired for me to call
                            my errback
        @param consumeErrors: (keyword param) a flag indicating that any errors
                            raised in the original deferreds should be
                            consumed by this DeferredList.  This is useful to
                            prevent spurious warnings being logged.
        """
        self.resultList = [None] * len(deferredList)
        defer.Deferred.__init__(self)
        if len(deferredList) == 0 and not fireOnOneCallback:
            self.callback(self.resultList)

        # These flags need to be set *before* attaching callbacks to the
        # deferreds, because the callbacks use these flags, and will run
        # synchronously if any of the deferreds are already fired.
        self.fireOnOneCallback = fireOnOneCallback
        self.fireOnOneErrback = fireOnOneErrback
        self.consumeErrors = consumeErrors
        self.logErrors = logErrors
        self.finishedCount = 0

        index = 0
        for deferred in deferredList:
            deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
                                  callbackArgs=(index,SUCCESS),
                                  errbackArgs=(index,FAILURE))
            index = index + 1

    def _cbDeferred(self, result, index, succeeded):
        """(internal) Callback for when one of my deferreds fires.
        """
        self.resultList[index] = (succeeded, result)

        self.finishedCount += 1
        if not self.called:
            if succeeded == SUCCESS and self.fireOnOneCallback:
                self.callback((result, index))
            elif succeeded == FAILURE and self.fireOnOneErrback:
                # We have modified this to fire the errback chain with the actual
                # Failure instance the originally occured rather than twisted's
                # FirstError which wraps the failure
                self.errback(result)
            elif self.finishedCount == len(self.resultList):
                self.callback(self.resultList)

        if succeeded == FAILURE and self.logErrors:
            log.err(result)
        if succeeded == FAILURE and self.consumeErrors:
            result = None

        return result


def wait_for_file(filename, delay=0.1, max_tries=10):
    """Wait (poll) for a file to be created.
    
    This method returns a Deferred that will fire when a file exists. It
    works by polling os.path.isfile in time intervals specified by the
    delay argument.  If `max_tries` is reached, it will errback with a 
    `FileTimeoutError`.
    
    Parameters
    ----------
    filename : str
        The name of the file to wait for.
    delay : float
        The time to wait between polls.
    max_tries : int
        The max number of attempts before raising `FileTimeoutError`
    
    Returns
    -------
    d : Deferred
        A Deferred instance that will fire when the file exists.
    """
    
    d = defer.Deferred()
    
    def _test_for_file(filename, attempt=0):
        if attempt >= max_tries:
            d.errback(FileTimeoutError(
                'timeout waiting for file to be created: %s' % filename
            ))
        else:
            if os.path.isfile(filename):
                d.callback(True)
            else:
                reactor.callLater(delay, _test_for_file, filename, attempt+1)
    
    _test_for_file(filename)
    return d
