File: test_client.py

package info (click to toggle)
deejayd 0.9.0-4
  • links: PTS, VCS
  • area: main
  • in suites: squeeze, wheezy
  • size: 2,596 kB
  • ctags: 2,514
  • sloc: python: 11,904; xml: 594; sh: 338; makefile: 42
file content (191 lines) | stat: -rw-r--r-- 6,698 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Deejayd, a media player daemon
# Copyright (C) 2007-2009 Mickael Royer <mickael.royer@gmail.com>
#                         Alexandre Rossi <alexandre.rossi@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Deejayd Client library testing"""
import threading

from testdeejayd import TestCaseWithServer

from testdeejayd.server import TestServer
from testdeejayd.coreinterface import InterfaceTests, InterfaceSubscribeTests
from deejayd.net.client import DeejayDaemonSync, DeejayDaemonAsync, \
                               DeejayDaemonHTTP, DeejaydError, \
                               DeejaydWebradioList


class TestHTTPClient(TestCaseWithServer):
    """Test the http client library"""

    def setUp(self):
        TestCaseWithServer.setUp(self)

        # Instanciate the server object of the client library
        self.deejayd = DeejayDaemonHTTP('localhost', self.webServerPort)

    def testPing(self):
        """Ping server with a http connection"""
        self.failUnless(self.deejayd.ping().get_contents())


class TestSyncClient(TestCaseWithServer, InterfaceTests):
    """Test the DeejaydClient library in synchroneous mode."""

    def setUp(self):
        TestCaseWithServer.setUp(self)
        # Instanciate the server object of the client library
        self.deejayd = DeejayDaemonSync()
        self.deejayd.connect('localhost', self.serverPort)

    def tearDown(self):
        self.deejayd.disconnect()
        TestCaseWithServer.tearDown(self)

    def testPing(self):
        """Ping server"""
        self.failUnless(self.deejayd.ping().get_contents())


class TestAsyncClient(TestCaseWithServer, InterfaceSubscribeTests):
    """Test the DeejaydClient library in asynchroenous mode."""

    def setUp(self):
        TestCaseWithServer.setUp(self)

        # Instanciate the server object of the client library
        self.deejayd = DeejayDaemonAsync()
        self.deejayd.connect('localhost', self.serverPort)

        # Prepare in case we need other clients
        self.clients = [self.deejayd]

    def tearDown(self):
        for client in self.clients:
            client.disconnect()

        TestCaseWithServer.tearDown(self)

    def get_another_client(self):
        client = DeejayDaemonAsync()
        self.clients.append(client)
        return client

    def test_ping(self):
        """Ping server asynchroneously"""
        ans = self.deejayd.ping()
        self.failUnless(ans.get_contents(),
                        'Server did not respond well to ping.')

    def test_answer_callback(self):
        """Ping server asynchroneously and check for the callback to be triggered"""
        cb_called = threading.Event()
        def tcb(answer):
            cb_called.set()

        ans = self.deejayd.ping()
        ans.add_callback(tcb)
        # some seconds should be enough for the callback to be called
        cb_called.wait(4)
        self.failUnless(cb_called.isSet(), 'Answer callback was not triggered.')

    def testPlaylistSaveRetrieve(self):
        """Test playlist commands asynchroneously and callback"""

        # Get current playlist and add callback
        self.pl = None
        cb_called = threading.Event()
        def tcb(answer):
            cb_called.set()
            self.pl = answer.get_medias()

        djpl = self.deejayd.get_playlist()
        djpl.get().add_callback(tcb)

        cb_called.wait(4)
        self.failUnless(cb_called.isSet(), 'Answer callback was not triggered.')
        self.assertEqual(self.pl, [])

        # Add songs to playlist and get status
        cb_called = threading.Event()
        self.should_stop = False
        self.status = None

        def tcb_status(answer):
            cb_called.set()
            self.status = answer.get_contents()
            self.should_stop = True

        def cb_update_status(answer):
            self.deejayd.get_status().add_callback(tcb_status)

        djpl.add_path(self.test_audiodata.getRandomSongPaths(1)[0]).\
                add_callback(cb_update_status)

        while not self.should_stop:
            cb_called.wait(4)

        self.failUnless(cb_called.isSet(), 'Answer callback was not triggered.')
        self.assertEqual(self.status['playlistlength'], 1)

    def testCallbackProcess(self):
        """ Send three commands asynchroneously at the same time and check callback """

        firstcb_called = threading.Event()
        secondcb_called = threading.Event()
        thirdcb_called = threading.Event()
        def first_cb(answer):
            firstcb_called.set()

        def second_cb(answer):
            secondcb_called.set()

        def third_cb(answer):
            thirdcb_called.set()

        self.deejayd.get_audio_dir("").add_callback(first_cb)
        self.deejayd.get_playlist_list().add_callback(second_cb)
        self.deejayd.ping().add_callback(third_cb)

        firstcb_called.wait(2)
        self.failUnless(firstcb_called.isSet(), \
            '1st Answer callback was not triggered.')
        secondcb_called.wait(2)
        self.failUnless(secondcb_called.isSet(), \
            '2nd Answer callback was not triggered.')
        thirdcb_called.wait(2)
        self.failUnless(thirdcb_called.isSet(), \
            '3rd Answer callback was not triggered.')

    def test_two_clients(self):
        """Checks that it is possible to instanciate two clients in the same process."""
        client2 = self.get_another_client()
        client2.connect('localhost', self.serverPort)

        self.failUnless(self.deejayd.ping().get_contents())
        self.failUnless(client2.ping().get_contents())

    def test_subscription_another_client(self):
        """Checks that a subscription is broadcasted to another client who did not orignate the event trigger."""

        # Instanciate a second client that connects to the same server
        client2 = self.get_another_client()
        client2.connect('localhost', self.serverPort)

        self.generic_sub_bcast_test('mode', client2.set_mode, ('video', ))


# vim: ts=4 sw=4 expandtab