File: framework.py

package info (click to toggle)
miro 1.2.3-2
  • links: PTS
  • area: main
  • in suites: lenny
  • size: 60,356 kB
  • ctags: 15,099
  • sloc: cpp: 58,491; python: 40,363; ansic: 796; xml: 265; sh: 197; makefile: 167
file content (158 lines) | stat: -rw-r--r-- 5,295 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import unittest
import threading

from miro import database
from miro import eventloop
from miro import app
from miro import downloader
from miro import views
from miro import util
from miro import databaseupgrade
from miro import signals
from miro import storedatabase
from miro import subscription
from miro import selection
from time import sleep

util.setupLogging()

# Generally, all test cases should extend DemocracyTestCase or
# EventLoopTest.  DemocracyTestCase cleans up any database changes you
# might have made, and EventLoopTest provides an API for accessing the
# eventloop in addition to managing the thread pool and cleaning up
# any events you may have scheduled.
# 
# Our general strategy here is to "revirginize" the environment after
# each test, rather than trying to reset applicable pieces of the
# environment before each test. This way, when writing new tests you
# don't have to anticipate what another test may have changed, you
# just have to make sure you clean up what you changed. Usually, that
# is handled transparently through one of these test cases

class HadToStopEventLoop(Exception):
    pass

class DummyMainFrame:
    def __init__(self):
        self.displays = {}
        self.mainDisplay = "mainDisplay"
        self.channelsDisplay = "channelsDisplay"
        self.collectionDisplay = "collectionDisplay"
        self.videoInfoDisplay = "videoInfoDisplay"

    def selectDisplay(self, display, area):
        self.displays[area] = display

    def getDisplay(self, area):
        return self.displays.get(area)

    def onSelectedTabChange(self, tabType, multiple, guideURL, videoFilename):
        pass

class DummyVideoDisplay:
    def fileDuration (self, filename, callback):
        pass

    def fillMovieData (self, filename, movie_data, callback):
        pass

class DummyController:
    def __init__(self):
        self.frame = DummyMainFrame()
        self.videoDisplay = DummyVideoDisplay()

class DemocracyTestCase(unittest.TestCase):
    def setUp(self):
        app.db = database.defaultDatabase
        database.set_thread(threading.currentThread())
        views.initialize()
        # reset the event loop
        util.chatter = False
        self.sawError = False
        self.errorSignalOkay = False
        signals.system.connect('error', self.handle_error)
        app.controller = DummyController()
        app.selection = selection.SelectionHandler()

    def tearDown(self):
        signals.system.disconnect_all()
        util.chatter = True

        # Remove any leftover database
        database.resetDefaultDatabase()

        # Remove anything that may have been accidentally queued up
        eventloop._eventLoop = eventloop.EventLoop()

    def handle_error(self, obj, report):
        if self.errorSignalOkay:
            self.sawError = True
        else:
            raise Exception("error signal")

class EventLoopTest(DemocracyTestCase):
    def setUp(self):
        DemocracyTestCase.setUp(self)
        self.hadToStopEventLoop = False

    def stopEventLoop(self, abnormal = True):
        self.hadToStopEventLoop = abnormal
        eventloop.quit()

    def runPendingIdles(self):
        idleQueue = eventloop._eventLoop.idleQueue
        urgentQueue = eventloop._eventLoop.urgentQueue
        while idleQueue.hasPendingIdle() or urgentQueue.hasPendingIdle():
            urgentQueue.processIdles()
            idleQueue.processNextIdle()

    def runEventLoop(self, timeout=10, timeoutNormal=False):
        eventloop.threadPoolInit()
        try:
            self.hadToStopEventLoop = False
            timeout = eventloop.addTimeout(timeout, self.stopEventLoop, 
                                           "Stop test event loop")
            eventloop._eventLoop.quitFlag = False
            eventloop._eventLoop.loop()
            if self.hadToStopEventLoop and not timeoutNormal:
                raise HadToStopEventLoop()
            else:
                timeout.cancel()
        finally:
            eventloop.threadPoolQuit()

    def addTimeout(self,delay, function, name, args=None, kwargs=None):
        eventloop.addTimeout(delay, function, name, args, kwargs)

    def addWriteCallback(self, socket, callback):
        eventloop.addWriteCallback(socket, callback)

    def removeWriteCallback(self, socket):
        eventloop.removeWriteCallback(socket)

    def addIdle(self, function, name, args=None, kwargs=None):
        eventloop.addIdle(function, name, args=None, kwargs=None)

    def hasIdles(self):
        return not (eventloop._eventLoop.idleQueue.queue.empty() and
                    eventloop._eventLoop.urgentQueue.queue.empty())

    def processThreads(self):
        eventloop._eventLoop.threadPool.initThreads()
        while not eventloop._eventLoop.threadPool.queue.empty():
            sleep(0.05)
        eventloop._eventLoop.threadPool.closeThreads()

    def processIdles(self):
        eventloop._eventLoop.idleQueue.processIdles()
        eventloop._eventLoop.urgentQueue.processIdles()

class DownloaderTestCase(EventLoopTest):
    def setUp(self):
        EventLoopTest.setUp(self)
        downloader.startupDownloader()

    def tearDown(self):
        downloader.shutdownDownloader(eventloop.quit)
        self.runEventLoop()
        EventLoopTest.tearDown(self)