1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
import logging
import struct
log = logging.getLogger(__name__)
class WsFrame:
CONT = 0
TEXT = 1
BINARY = 2
RSVD3 = 3
RSVD4 = 4
RSVD5 = 5
RSVD6 = 6
RSVD7 = 7
CLOSE = 8
PING = 9
PONG = 10
RSVD11 = 11
RSVD12 = 12
RSVD13 = 13
RSVD14 = 14
RSVD15 = 15
OP_NAMES = [
"CONT",
"TEXT",
"BINARY",
"RSVD3",
"RSVD4",
"RSVD5",
"RSVD6",
"RSVD7",
"CLOSE",
"PING",
"PONG",
"RSVD11",
"RSVD12",
"RSVD13",
"RSVD14",
"RSVD15",
]
def __init__(self, opcode: int, fin: bool, mask: bytes, data: bytes):
self.opcode = opcode
self.fin = fin
self.mask = mask
self.data = data
self.length = len(data)
def __repr__(self):
return f'WsFrame[{self.OP_NAMES[self.opcode]} fin={self.fin}, mask={self.mask}, len={len(self.data)}]'
@property
def data_len(self) -> int:
return len(self.data) if self.data else 0
def to_network(self) -> bytes:
nd = bytearray()
h1 = self.opcode
if self.fin:
h1 |= 0x80
nd.extend(struct.pack("!B", h1))
mask_bit = 0x80 if self.mask is not None else 0x0
h2 = self.data_len
if h2 > 65535:
nd.extend(struct.pack("!BQ", 127|mask_bit, h2))
elif h2 > 126:
nd.extend(struct.pack("!BH", 126|mask_bit, h2))
else:
nd.extend(struct.pack("!B", h2|mask_bit))
if self.mask is not None:
nd.extend(self.mask)
if self.data is not None:
nd.extend(self.data)
return nd
@classmethod
def client_ping(cls, data: bytes, mask: bytes = None) -> 'WsFrame':
if mask is None:
mask = bytes.fromhex('00 00 00 00')
return WsFrame(opcode=WsFrame.PING, fin=True, mask=mask, data=data)
@classmethod
def client_close(cls, code: int, reason: str = None,
mask: bytes = None) -> 'WsFrame':
data = bytearray(struct.pack("!H", code))
if reason is not None:
data.extend(reason.encode())
if mask is None:
mask = bytes.fromhex('00 00 00 00')
return WsFrame(opcode=WsFrame.CLOSE, fin=True, mask=mask, data=data)
class WsFrameReader:
def __init__(self, data: bytes):
self.data = data
def _read(self, n: int):
if len(self.data) < n:
raise EOFError(f'have {len(self.data)} bytes left, but {n} requested')
elif n == 0:
return b''
chunk = self.data[:n]
del self.data[:n]
return chunk
def next_frame(self):
data = self._read(2)
h1, h2 = struct.unpack("!BB", data)
log.debug(f'parsed h1={h1} h2={h2} from {data}')
fin = True if h1 & 0x80 else False
opcode = h1 & 0xf
has_mask = True if h2 & 0x80 else False
mask = None
dlen = h2 & 0x7f
if dlen == 126:
(dlen,) = struct.unpack("!H", self._read(2))
elif dlen == 127:
(dlen,) = struct.unpack("!Q", self._read(8))
if has_mask:
mask = self._read(4)
return WsFrame(opcode=opcode, fin=fin, mask=mask, data=self._read(dlen))
def eof(self):
return len(self.data) == 0
@classmethod
def parse(cls, data: bytes):
frames = []
reader = WsFrameReader(data=data)
while not reader.eof():
frames.append(reader.next_frame())
return frames
|