"""Local server and cgitb support."""

import cgitb
#cgitb.enable(format="text")

import glob
import logging
import os
import subprocess
import sys
import time
import traceback
from unittest import defaultTestLoader, TextTestRunner, TestSuite, TestCase, \
     _TextTestResult


class ServerStartupError(Exception):

    pass


class ServerProcess:

    def __init__(self, filename, name=None):
        if filename is None:
            raise ValueError('filename arg must be a string')
        if name is None:
            name = filename
        self.name = os.path.basename(name)
        self.port = None
        self.report_hook = lambda msg: None
        self._filename = filename
        self._args = None
        self._process = None

    def _get_args(self):
        """Return list of command line arguments.

        Override me.
        """
        return []

    def start(self):
        self._args = [sys.executable, self._filename]+self._get_args()
        self.report_hook("starting (%s)" % (self._args,))
        self._process = subprocess.Popen(self._args)
        self.report_hook("waiting for startup")
        self._wait_for_startup()
        self.report_hook("running")

    def _wait_for_startup(self):
        import socket
        def connect():
            self._process.poll()
            if self._process.returncode is not None:
                message = ("server exited on startup with status %d: %r" %
                           (self._process.returncode, self._args))
                raise ServerStartupError(message)
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.settimeout(1.0)
            try:
                sock.connect(('127.0.0.1', self.port))
            finally:
                sock.close()
        backoff(connect, (socket.error,))

    def stop(self):
        """Kill process (forcefully if necessary)."""
        pid = self._process.pid
        if os.name == 'nt':
            kill_windows(pid, self.report_hook)
        else:
            kill_posix(pid, self.report_hook)

def backoff(func, errors,
            initial_timeout=1., hard_timeout=60., factor=1.2):
    starttime = time.time()
    timeout = initial_timeout
    while time.time() < starttime + hard_timeout - 0.01:
        try:
            func()
        except errors, exc:
            time.sleep(timeout)
            timeout *= factor
            hard_limit = hard_timeout - (time.time() - starttime)
            timeout = min(timeout, hard_limit)
        else:
            break
    else:
        raise

def kill_windows(handle, report_hook):
    try:
        import win32api
    except ImportError:
        import ctypes
        ctypes.windll.kernel32.TerminateProcess(int(handle), -1)
    else:
        win32api.TerminateProcess(int(handle), -1)

def kill_posix(pid, report_hook):
    import signal
    os.kill(pid, signal.SIGTERM)

    timeout = 10.
    starttime = time.time()
    report_hook("waiting for exit")
    def do_nothing(*args):
        pass
    old_handler = signal.signal(signal.SIGCHLD, do_nothing)
    try:
        while time.time() < starttime + timeout - 0.01:
            pid, sts = os.waitpid(pid, os.WNOHANG)
            if pid != 0:
                # exited, or error
                break
            newtimeout = timeout - (time.time() - starttime) - 1.
            time.sleep(newtimeout)  # wait for signal
        else:
            report_hook("forcefully killing")
            try:
                os.kill(pid, signal.SIGKILL)
            except OSError, exc:
                if exc.errno != errno.ECHILD:
                    raise
    finally:
        signal.signal(signal.SIGCHLD, old_handler)

class TwistedServerProcess(ServerProcess):

    def __init__(self, name=None):
        top_level_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
        path = os.path.join(top_level_dir, "test-tools/twisted-localserver.py")
        ServerProcess.__init__(self, path, name)

    def _get_args(self):
        return [str(self.port)]


class CgitbTextResult(_TextTestResult):
    def _exc_info_to_string(self, err, test):
        """Converts a sys.exc_info()-style tuple of values into a string."""
        exctype, value, tb = err
        # Skip test runner traceback levels
        while tb and self._is_relevant_tb_level(tb):
            tb = tb.tb_next
        if exctype is test.failureException:
            # Skip assert*() traceback levels
            length = self._count_relevant_tb_levels(tb)
            return cgitb.text((exctype, value, tb))
        return cgitb.text((exctype, value, tb))

class CgitbTextTestRunner(TextTestRunner):
    def _makeResult(self):
        return CgitbTextResult(self.stream, self.descriptions, self.verbosity)

def add_uri_attribute_to_test_cases(suite, uri):
    for test in suite._tests:
        if isinstance(test, TestCase):
            test.uri = uri
        else:
            try:
                add_uri_attribute_to_test_cases(test, uri)
            except AttributeError:
                pass


class TestProgram:
    """A command-line program that runs a set of tests; this is primarily
       for making test modules conveniently executable.
    """
    USAGE = """\
Usage: %(progName)s [options] [test] [...]

Note not all the functional tests take note of the --uri argument yet --
some currently always access the internet regardless of the --uri and
--run-local-server options.

Options:
  -l, --run-local-server
                   Run a local Twisted HTTP server for the functional
                   tests.  You need Twisted installed for this to work.
                   The server is run on the port given in the --uri
                   option.  If --run-local-server is given but no --uri is
                   given, http://127.0.0.1:8000 is used as the base URI.
                   Also, if you're on Windows and don't have pywin32 or
                   ctypes installed, this option won't work, and you'll
                   have to start up test-tools/localserver.py manually.
  --uri=URL        Base URI for functional tests
                   (test.py does not access the network, unless you tell
                   it to run module functional_tests;
                   functional_tests.py does access the network)
                   e.g. --uri=http://127.0.0.1:8000/
  -h, --help       Show this message
  -v, --verbose    Verbose output
  -q, --quiet      Minimal output

The following options are only available through test.py (you can still run the
functional tests through test.py, just give 'functional_tests' as the module
name to run):

  -u               Skip plain (non-doctest) unittests
  -d               Skip doctests
  -c               Run coverage (requires coverage.py, seems buggy)
  -t               Display tracebacks using cgitb's text mode

"""
    USAGE_EXAMPLES = """
Examples:
  %(progName)s
                 - run all tests
  %(progName)s test_cookies
                 - run module 'test_cookies'
  %(progName)s test_cookies.CookieTests
                 - run all 'test*' test methods in test_cookies.CookieTests
  %(progName)s test_cookies.CookieTests.test_expires
                 - run test_cookies.CookieTests.test_expires

  %(progName)s functional_tests
                 - run the functional tests
  %(progName)s -l functional_tests
                 - start a local Twisted HTTP server and run the functional
                   tests against that, rather than against SourceForge
                   (quicker!)
"""
    def __init__(self, moduleNames, localServerProcess, defaultTest=None,
                 argv=None, testRunner=None, testLoader=defaultTestLoader,
                 defaultUri="http://wwwsearch.sourceforge.net/",
                 usageExamples=USAGE_EXAMPLES,
                 ):
        self.modules = []
        for moduleName in moduleNames:
            module = __import__(moduleName)
            for part in moduleName.split('.')[1:]:
                module = getattr(module, part)
            self.modules.append(module)
        self.uri = None
        self._defaultUri = defaultUri
        if argv is None:
            argv = sys.argv
        self.verbosity = 1
        self.defaultTest = defaultTest
        self.testRunner = testRunner
        self.testLoader = testLoader
        self.progName = os.path.basename(argv[0])
        self.usageExamples = usageExamples
        self.runLocalServer = False
        self.parseArgs(argv)
        if self.runLocalServer:
            import urllib
            from mechanize._rfc3986 import urlsplit
            authority = urlsplit(self.uri)[1]
            host, port = urllib.splitport(authority)
            if port is None:
                port = "80"
            try:
                port = int(port)
            except:
                self.usageExit("port in --uri value must be an integer "
                               "(try --uri=http://127.0.0.1:8000/)")
            self._serverProcess = localServerProcess
            def report(msg):
                print "%s: %s" % (localServerProcess.name, msg)
            localServerProcess.port = port
            localServerProcess.report_hook = report

    def usageExit(self, msg=None):
        if msg: print msg
        print (self.USAGE + self.usageExamples) % self.__dict__
        sys.exit(2)

    def parseArgs(self, argv):
        import getopt
        try:
            options, args = getopt.getopt(
                argv[1:],
                'hHvql',
                ['help','verbose','quiet', 'uri=', 'run-local-server'],
                )
            uri = None
            for opt, value in options:
                if opt in ('-h','-H','--help'):
                    self.usageExit()
                if opt in ('--uri',):
                    uri = value
                if opt in ('-q','--quiet'):
                    self.verbosity = 0
                if opt in ('-v','--verbose'):
                    self.verbosity = 2
                if opt in ('-l', '--run-local-server'):
                    self.runLocalServer = True
            if uri is None:
                if self.runLocalServer:
                    uri = "http://127.0.0.1:8000"
                else:
                    uri = self._defaultUri
            self.uri = uri
            if len(args) == 0 and self.defaultTest is None:
                suite = TestSuite()
                for module in self.modules:
                    test = self.testLoader.loadTestsFromModule(module)
                    suite.addTest(test)
                self.test = suite
                add_uri_attribute_to_test_cases(self.test, self.uri)
                return
            if len(args) > 0:
                self.testNames = args
            else:
                self.testNames = (self.defaultTest,)
            self.createTests()
            add_uri_attribute_to_test_cases(self.test, self.uri)
        except getopt.error, msg:
            self.usageExit(msg)

    def createTests(self):
        self.test = self.testLoader.loadTestsFromNames(self.testNames)

    def runTests(self):
        if self.testRunner is None:
            self.testRunner = TextTestRunner(verbosity=self.verbosity)

        if self.runLocalServer:
            self._serverProcess.start()
        try:
            result = self.testRunner.run(self.test)
        finally:
            if self.runLocalServer:
                self._serverProcess.stop()
        return result
