"""
Tests for L{eliot.logwriter}.
"""

import time
import threading

from io import BytesIO

try:
    from zope.interface.verify import verifyClass
    from twisted.internet import reactor
    from twisted.trial.unittest import TestCase
    from twisted.application.service import IService
    from twisted.python import threadable
except ImportError:
    # Make tests not run at all.
    TestCase = object
else:
    # Make sure we always import this if Twisted is available, so broken
    # logwriter.py causes a failure:
    from ..logwriter import ThreadedWriter

from .. import Logger, removeDestination, FileDestination


class BlockingFile(object):
    """
    A file-like whose writes can be blocked.

    Also, allow calling C{getvalue} after C{close}, unlike L{BytesIO}.
    """

    def __init__(self):
        self.file = BytesIO()
        self.lock = threading.Lock()
        self.data = b""

    def block(self):
        """
        Prevent writes until L{unblock} is called.
        """
        self.lock.acquire()

    def unblock(self):
        """
        Allow writes if L{block} was previous called.
        """
        self.lock.release()

    def getvalue(self):
        """
        Get written bytes.

        @return: Written bytes.
        """
        return self.data

    def write(self, data):
        with self.lock:
            self.file.write(data)

    def flush(self):
        self.data = self.file.getvalue()

    def close(self):
        self.file.close()


class ThreadedWriterTests(TestCase):
    """
    Tests for L{ThreadedWriter}.

    Many of these tests involve interactions across threads, so they
    arbitrarily wait for up to 5 seconds to reduce chances of slow thread
    switching causing the test to fail.
    """

    def test_interface(self):
        """
        L{ThreadedWriter} provides L{IService}.
        """
        verifyClass(IService, ThreadedWriter)

    def test_name(self):
        """
        L{ThreadedWriter} has a name.
        """
        self.assertEqual(ThreadedWriter.name, "Eliot Log Writer")

    def test_startServiceRunning(self):
        """
        L{ThreadedWriter.startService} starts the service as required by the
        L{IService} interface.
        """
        writer = ThreadedWriter(FileDestination(file=BytesIO()), reactor)
        self.assertFalse(writer.running)
        writer.startService()
        self.addCleanup(writer.stopService)
        self.assertTrue(writer.running)

    def test_stopServiceRunning(self):
        """
        L{ThreadedWriter.stopService} stops the service as required by the
        L{IService} interface.
        """
        writer = ThreadedWriter(FileDestination(file=BytesIO()), reactor)
        writer.startService()
        d = writer.stopService()
        d.addCallback(lambda _: self.assertFalse(writer.running))
        return d

    def test_startServiceStartsThread(self):
        """
        L{ThreadedWriter.startService} starts up a thread running
        L{ThreadedWriter._writer}.
        """
        previousThreads = threading.enumerate()
        result = []
        event = threading.Event()

        def _reader():
            current = threading.current_thread()
            if current not in previousThreads:
                result.append(current)
            event.set()

        writer = ThreadedWriter(FileDestination(file=BytesIO()), reactor)
        writer._reader = _reader
        writer.startService()
        event.wait()
        self.assertTrue(result)
        # Make sure thread is dead so it doesn't die half way through another
        # test:
        result[0].join(5)

    def test_stopServiceStopsThread(self):
        """
        L{ThreadedWriter.stopService} stops the writer thread.
        """
        previousThreads = set(threading.enumerate())
        writer = ThreadedWriter(FileDestination(file=BytesIO()), reactor)
        writer.startService()
        start = time.time()
        while set(threading.enumerate()) == previousThreads and (
            time.time() - start < 5
        ):
            time.sleep(0.0001)
        # If not true the next assertion might pass by mistake:
        self.assertNotEqual(set(threading.enumerate()), previousThreads)
        writer.stopService()
        while set(threading.enumerate()) != previousThreads and (
            time.time() - start < 5
        ):
            time.sleep(0.0001)
        self.assertEqual(set(threading.enumerate()), previousThreads)

    def test_stopServiceFinishesWriting(self):
        """
        L{ThreadedWriter.stopService} stops the writer thread, but only after
        all queued writes are written out.
        """
        f = BlockingFile()
        writer = ThreadedWriter(FileDestination(file=f), reactor)
        f.block()
        writer.startService()
        for i in range(100):
            writer({"write": 123})
        threads = threading.enumerate()
        writer.stopService()
        # Make sure writes didn't happen before the stopService, thus making the
        # test pointless:
        self.assertEqual(f.getvalue(), b"")
        f.unblock()
        start = time.time()
        while threading.enumerate() == threads and time.time() - start < 5:
            time.sleep(0.0001)
        self.assertEqual(f.getvalue(), b'{"write":123}\n' * 100)

    def test_stopServiceResult(self):
        """
        L{ThreadedWriter.stopService} returns a L{Deferred} that fires only
        after the thread has shut down.
        """
        f = BlockingFile()
        writer = ThreadedWriter(FileDestination(file=f), reactor)
        f.block()
        writer.startService()

        writer({"hello": 123})
        threads = threading.enumerate()
        d = writer.stopService()
        f.unblock()

        def done(_):
            self.assertEqual(f.getvalue(), b'{"hello":123}\n')
            self.assertNotEqual(threading.enumerate(), threads)

        d.addCallback(done)
        return d

    def test_noChangeToIOThread(self):
        """
        Running a L{ThreadedWriter} doesn't modify the Twisted registered IO
        thread.
        """
        writer = ThreadedWriter(FileDestination(file=BytesIO()), reactor)
        writer.startService()
        d = writer.stopService()
        # Either the current thread (the one running the tests) is the the I/O
        # thread or the I/O thread was never set. Either may happen depending on
        # how and whether the reactor has been started by the unittesting
        # framework.
        d.addCallback(
            lambda _: self.assertIn(
                threadable.ioThread, (None, threading.current_thread().ident)
            )
        )
        return d

    def test_startServiceRegistersDestination(self):
        """
        L{ThreadedWriter.startService} registers itself as an Eliot log
        destination.
        """
        f = BlockingFile()
        writer = ThreadedWriter(FileDestination(file=f), reactor)
        writer.startService()
        Logger().write({"x": "abc"})
        d = writer.stopService()
        d.addCallback(lambda _: self.assertIn(b"abc", f.getvalue()))
        return d

    def test_stopServiceUnregistersDestination(self):
        """
        L{ThreadedWriter.stopService} unregisters itself as an Eliot log
        destination.
        """
        writer = ThreadedWriter(FileDestination(file=BytesIO()), reactor)
        writer.startService()
        d = writer.stopService()
        d.addCallback(lambda _: removeDestination(writer))
        return self.assertFailure(d, ValueError)

    def test_call(self):
        """
        The message passed to L{ThreadedWriter.__call__} is passed to the
        underlying destination in the writer thread.
        """
        result = []

        def destination(message):
            result.append((message, threading.current_thread().ident))

        writer = ThreadedWriter(destination, reactor)
        writer.startService()
        thread_ident = writer._thread.ident
        msg = {"key": 123}
        writer(msg)
        d = writer.stopService()
        d.addCallback(lambda _: self.assertEqual(result, [(msg, thread_ident)]))
        return d
