1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
import unittest
import threading
from miro import database
from miro import eventloop
from miro import app
from miro import downloader
from miro import views
from miro import util
from miro import databaseupgrade
from miro import signals
from miro import storedatabase
from miro import subscription
from miro import selection
from time import sleep
util.setupLogging()
# Generally, all test cases should extend DemocracyTestCase or
# EventLoopTest. DemocracyTestCase cleans up any database changes you
# might have made, and EventLoopTest provides an API for accessing the
# eventloop in addition to managing the thread pool and cleaning up
# any events you may have scheduled.
#
# Our general strategy here is to "revirginize" the environment after
# each test, rather than trying to reset applicable pieces of the
# environment before each test. This way, when writing new tests you
# don't have to anticipate what another test may have changed, you
# just have to make sure you clean up what you changed. Usually, that
# is handled transparently through one of these test cases
class HadToStopEventLoop(Exception):
pass
class DummyMainFrame:
def __init__(self):
self.displays = {}
self.mainDisplay = "mainDisplay"
self.channelsDisplay = "channelsDisplay"
self.collectionDisplay = "collectionDisplay"
self.videoInfoDisplay = "videoInfoDisplay"
def selectDisplay(self, display, area):
self.displays[area] = display
def getDisplay(self, area):
return self.displays.get(area)
def onSelectedTabChange(self, tabType, multiple, guideURL, videoFilename):
pass
class DummyVideoDisplay:
def fileDuration (self, filename, callback):
pass
def fillMovieData (self, filename, movie_data, callback):
pass
class DummyController:
def __init__(self):
self.frame = DummyMainFrame()
self.videoDisplay = DummyVideoDisplay()
class DemocracyTestCase(unittest.TestCase):
def setUp(self):
app.db = database.defaultDatabase
database.set_thread(threading.currentThread())
views.initialize()
# reset the event loop
util.chatter = False
self.sawError = False
self.errorSignalOkay = False
signals.system.connect('error', self.handle_error)
app.controller = DummyController()
app.selection = selection.SelectionHandler()
def tearDown(self):
signals.system.disconnect_all()
util.chatter = True
# Remove any leftover database
database.resetDefaultDatabase()
# Remove anything that may have been accidentally queued up
eventloop._eventLoop = eventloop.EventLoop()
def handle_error(self, obj, report):
if self.errorSignalOkay:
self.sawError = True
else:
raise Exception("error signal")
class EventLoopTest(DemocracyTestCase):
def setUp(self):
DemocracyTestCase.setUp(self)
self.hadToStopEventLoop = False
def stopEventLoop(self, abnormal = True):
self.hadToStopEventLoop = abnormal
eventloop.quit()
def runPendingIdles(self):
idleQueue = eventloop._eventLoop.idleQueue
urgentQueue = eventloop._eventLoop.urgentQueue
while idleQueue.hasPendingIdle() or urgentQueue.hasPendingIdle():
urgentQueue.processIdles()
idleQueue.processNextIdle()
def runEventLoop(self, timeout=10, timeoutNormal=False):
eventloop.threadPoolInit()
try:
self.hadToStopEventLoop = False
timeout = eventloop.addTimeout(timeout, self.stopEventLoop,
"Stop test event loop")
eventloop._eventLoop.quitFlag = False
eventloop._eventLoop.loop()
if self.hadToStopEventLoop and not timeoutNormal:
raise HadToStopEventLoop()
else:
timeout.cancel()
finally:
eventloop.threadPoolQuit()
def addTimeout(self,delay, function, name, args=None, kwargs=None):
eventloop.addTimeout(delay, function, name, args, kwargs)
def addWriteCallback(self, socket, callback):
eventloop.addWriteCallback(socket, callback)
def removeWriteCallback(self, socket):
eventloop.removeWriteCallback(socket)
def addIdle(self, function, name, args=None, kwargs=None):
eventloop.addIdle(function, name, args=None, kwargs=None)
def hasIdles(self):
return not (eventloop._eventLoop.idleQueue.queue.empty() and
eventloop._eventLoop.urgentQueue.queue.empty())
def processThreads(self):
eventloop._eventLoop.threadPool.initThreads()
while not eventloop._eventLoop.threadPool.queue.empty():
sleep(0.05)
eventloop._eventLoop.threadPool.closeThreads()
def processIdles(self):
eventloop._eventLoop.idleQueue.processIdles()
eventloop._eventLoop.urgentQueue.processIdles()
class DownloaderTestCase(EventLoopTest):
def setUp(self):
EventLoopTest.setUp(self)
downloader.startupDownloader()
def tearDown(self):
downloader.shutdownDownloader(eventloop.quit)
self.runEventLoop()
EventLoopTest.tearDown(self)
|