1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
#!/usr/bin/python -u
# -*- coding: UTF-8 -*-
import unittest
import libxml2
from pyxmpp import xmlextra
from pyxmpp.jid import JID,JIDError
from pyxmpp import xmppstringprep
def xml_elements_equal(a, b, ignore_level1_cdata = False):
if a.name!=b.name:
print "Name mismatch: %r, %r" % (a.name, b.name)
return False
try:
ns1 = a.ns()
except libxml2.treeError:
ns1 = None
try:
ns2 = b.ns()
except libxml2.treeError:
ns2 = None
if ns1 or ns2:
if None in (ns1,ns2):
print "Ns mismatch: %r, %r" % (ns1, ns2)
return False
if ns1.content != ns2.content:
print "Ns mismatch: %r, %r on %r, %r" % (ns1.content, ns2.content, a, b)
return False
ap = a.properties
bp = b.properties
while 1:
if (ap, bp) == (None, None):
break
if None in (ap, bp):
return False
if ap.name != bp.name:
return False
if ap.content != bp.content:
return False
ap = ap.next
bp = bp.next
ac = a.children
bc = b.children
while ac != None or bc != None:
if ignore_level1_cdata:
if ac and ac.type!='element':
ac=ac.next
continue
if bc and bc.type!='element':
bc=bc.next
continue
if None in (ac, bc):
return False
if ac.type != bc.type:
return False
if ac.type == 'element':
if not xml_elements_equal(ac, bc):
return False
elif ac.content != bc.content:
return False
ac = ac.next
bc = bc.next
return True
class EventTemplate:
def __init__(self, template):
self.event, offset, xml = template.split(None,2)
self.offset = int(offset)
self.xml = libxml2.parseDoc(eval(xml))
def __del__(self):
self.xml.freeDoc()
def match(self, event, node):
if self.event!=event:
return False
if event=="end":
return True
if node.type!='element':
return False
if not xml_elements_equal(self.xml.getRootElement(),node):
return False
return True
def __repr__(self):
return "<EventTemplate %r at %r: %r>" % (self.event, self.offset, self.xml.getRootElement().serialize())
class StreamHandler(xmlextra.StreamHandler):
def __init__(self, test_case):
self.test_case = test_case
def stream_start(self, doc):
self.test_case.event("start", doc.getRootElement())
def stream_end(self, doc):
self.test_case.event("end", None)
def stanza(self, doc, node):
self.test_case.event("node", node)
expected_events = []
whole_stream = None
def load_expected_events():
for l in file("data/stream_info.txt"):
if l.startswith("#"):
continue
l=l.strip()
expected_events.append(EventTemplate(l))
def load_whole_stream():
global whole_stream
whole_stream = libxml2.parseFile("data/stream.xml")
class TestStreamReader(unittest.TestCase):
def setUp(self):
self.expected_events = list(expected_events)
self.handler = StreamHandler(self)
self.reader = xmlextra.StreamReader(self.handler)
self.file = file("data/stream.xml")
self.chunk_start = 0
self.chunk_end = 0
self.whole_stream = libxml2.newDoc("1.0")
def tearDown(self):
del self.handler
del self.reader
self.whole_stream.freeDoc()
def test_1(self):
self.do_test(1)
def test_2(self):
self.do_test(2)
def test_10(self):
self.do_test(10)
def test_100(self):
self.do_test(100)
def test_1000(self):
self.do_test(1000)
def do_test(self, chunk_length):
while 1:
data=self.file.read(chunk_length)
if not data:
break
self.chunk_end += len(data)
r=self.reader.feed(data)
while r:
r=self.reader.feed()
if r is None:
self.event("end", None)
break
self.chunk_start = self.chunk_end
r1 = self.whole_stream.getRootElement()
r2 = whole_stream.getRootElement()
if not xml_elements_equal(r1, r2, True):
self.fail("Whole stream invalid. Got: %r, Expected: %r"
% (self.whole_stream.serialize(), whole_stream.serialize()))
def event(self, event, node):
expected = self.expected_events.pop(0)
self.failUnless(event==expected.event, "Got %r, expected %r" % (event, expected.event))
if expected.offset < self.chunk_start:
self.fail("Delayed event: %r. Expected at: %i, found at %i:%i"
% (event, expected.offset, self.chunk_start, self.chunk_end))
if expected.offset > self.chunk_end:
self.fail("Early event: %r. Expected at: %i, found at %i:%i"
% (event, expected.offset, self.chunk_start, self.chunk_end))
if not expected.match(event,node):
self.fail("Unmatched event. Expected: %r, got: %r;%r"
% (expected, event, node.serialize()))
if event == "start":
n = node.docCopyNode(self.whole_stream,1)
self.whole_stream.addChild(n)
self.whole_stream.setRootElement(n)
elif event == "node":
n = node.docCopyNode(self.whole_stream,1)
r = self.whole_stream.getRootElement()
r.addChild(n)
def suite():
load_expected_events()
load_whole_stream()
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestStreamReader))
return suite
if __name__ == '__main__':
unittest.TextTestRunner(verbosity=2).run(suite())
# vi: sts=4 et sw=4
|