1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
from pathod import language
from pathod.language import websockets
import netlib.websockets
from . import tutils
def parse_request(s):
return next(language.parse_pathoc(s))
class TestWebsocketFrame:
def _test_messages(self, specs, message_klass):
for i in specs:
wf = parse_request(i)
assert isinstance(wf, message_klass)
assert wf
assert wf.values(language.Settings())
assert wf.resolve(language.Settings())
spec = wf.spec()
wf2 = parse_request(spec)
assert wf2.spec() == spec
def test_server_values(self):
specs = [
"wf",
"wf:dr",
"wf:b'foo'",
"wf:mask:r'foo'",
"wf:l1024:b'foo'",
"wf:cbinary",
"wf:c1",
"wf:mask:knone",
"wf:fin",
"wf:fin:rsv1:rsv2:rsv3:mask",
"wf:-fin:-rsv1:-rsv2:-rsv3:-mask",
"wf:k@4",
"wf:x10",
]
self._test_messages(specs, websockets.WebsocketFrame)
def test_parse_websocket_frames(self):
wf = language.parse_websocket_frame("wf:x10")
assert len(list(wf)) == 10
tutils.raises(
language.ParseException,
language.parse_websocket_frame,
"wf:x"
)
def test_client_values(self):
specs = [
"wf:f'wf'",
]
self._test_messages(specs, websockets.WebsocketClientFrame)
def test_nested_frame(self):
wf = parse_request("wf:f'wf'")
assert wf.nested_frame
def test_flags(self):
wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3")
frm = netlib.websockets.Frame.from_bytes(tutils.render(wf))
assert frm.header.fin
assert frm.header.mask
assert frm.header.rsv1
assert frm.header.rsv2
assert frm.header.rsv3
wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3")
frm = netlib.websockets.Frame.from_bytes(tutils.render(wf))
assert not frm.header.fin
assert not frm.header.mask
assert not frm.header.rsv1
assert not frm.header.rsv2
assert not frm.header.rsv3
def fr(self, spec, **kwargs):
settings = language.base.Settings(**kwargs)
wf = parse_request(spec)
return netlib.websockets.Frame.from_bytes(tutils.render(wf, settings))
def test_construction(self):
assert self.fr("wf:c1").header.opcode == 1
assert self.fr("wf:c0").header.opcode == 0
assert self.fr("wf:cbinary").header.opcode ==\
netlib.websockets.OPCODE.BINARY
assert self.fr("wf:ctext").header.opcode ==\
netlib.websockets.OPCODE.TEXT
def test_rawbody(self):
frm = self.fr("wf:mask:r'foo'")
assert len(frm.payload) == 3
assert frm.payload != b"foo"
assert self.fr("wf:r'foo'").payload == b"foo"
def test_construction_2(self):
# Simple server frame
frm = self.fr("wf:b'foo'")
assert not frm.header.mask
assert not frm.header.masking_key
# Simple client frame
frm = self.fr("wf:b'foo'", is_client=True)
assert frm.header.mask
assert frm.header.masking_key
frm = self.fr("wf:b'foo':k'abcd'", is_client=True)
assert frm.header.mask
assert frm.header.masking_key == b'abcd'
# Server frame, mask explicitly set
frm = self.fr("wf:b'foo':mask")
assert frm.header.mask
assert frm.header.masking_key
frm = self.fr("wf:b'foo':k'abcd'")
assert frm.header.mask
assert frm.header.masking_key == b'abcd'
# Client frame, mask explicitly unset
frm = self.fr("wf:b'foo':-mask", is_client=True)
assert not frm.header.mask
assert not frm.header.masking_key
frm = self.fr("wf:b'foo':-mask:k'abcd'", is_client=True)
assert not frm.header.mask
# We're reading back a corrupted frame - the first 3 characters of the
# mask is mis-interpreted as the payload
assert frm.payload == b"abc"
def test_knone(self):
with tutils.raises("expected 4 bytes"):
self.fr("wf:b'foo':mask:knone")
def test_length(self):
assert self.fr("wf:l3:b'foo'").header.payload_length == 3
frm = self.fr("wf:l2:b'foo'")
assert frm.header.payload_length == 2
assert frm.payload == b"fo"
tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'")
|