1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
import time
from slixmpp import Message
import unittest
from slixmpp.test import SlixTest
class TestFilters(SlixTest):
"""
Test using incoming and outgoing filters.
"""
def setUp(self):
self.stream_start()
def tearDown(self):
self.stream_close()
def testIncoming(self):
data = []
def in_filter(stanza):
if isinstance(stanza, Message):
if stanza['body'] == 'testing':
stanza['subject'] = stanza['body'] + ' filter'
print('>>> %s' % stanza['subject'])
return stanza
def on_message(msg):
print('<<< %s' % msg['subject'])
data.append(msg['subject'])
self.xmpp.add_filter('in', in_filter)
self.xmpp.add_event_handler('message', on_message)
self.recv("""
<message>
<body>no filter</body>
</message>
""")
self.recv("""
<message>
<body>testing</body>
</message>
""")
self.assertEqual(data, ['', 'testing filter'],
'Incoming filter did not apply %s' % data)
def testOutgoing(self):
def out_filter(stanza):
if isinstance(stanza, Message):
if stanza['body'] == 'testing':
stanza['body'] = 'changed!'
return stanza
self.xmpp.add_filter('out', out_filter)
m1 = self.Message()
m1['body'] = 'testing'
m1.send()
m2 = self.Message()
m2['body'] = 'blah'
m2.send()
self.send("""
<message>
<body>changed!</body>
</message>
""")
self.send("""
<message>
<body>blah</body>
</message>
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestFilters)
|