1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
import struct
from ..utils import nbytes
BIG_BYTES_SHARD_SIZE = 2 ** 26
msgpack_opts = {
("max_%s_len" % x): 2 ** 31 - 1 for x in ["str", "bin", "array", "map", "ext"]
}
msgpack_opts["strict_map_key"] = False
msgpack_opts["raw"] = False
def frame_split_size(frame, n=BIG_BYTES_SHARD_SIZE) -> list:
"""
Split a frame into a list of frames of maximum size
This helps us to avoid passing around very large bytestrings.
Examples
--------
>>> frame_split_size([b'12345', b'678'], n=3) # doctest: +SKIP
[b'123', b'45', b'678']
"""
frame = memoryview(frame)
if frame.nbytes <= n:
return [frame]
nitems = frame.nbytes // frame.itemsize
items_per_shard = n // frame.itemsize
return [frame[i : i + items_per_shard] for i in range(0, nitems, items_per_shard)]
def merge_frames(header, frames):
"""Merge frames into original lengths
Examples
--------
>>> merge_frames({'lengths': [3, 3]}, [b'123456'])
[b'123', b'456']
>>> merge_frames({'lengths': [6]}, [b'123', b'456'])
[b'123456']
"""
lengths = list(header["lengths"])
writeables = list(header["writeable"])
assert len(lengths) == len(writeables)
assert sum(lengths) == sum(map(nbytes, frames))
if all(len(f) == l for f, l in zip(frames, lengths)):
return [
(bytearray(f) if w else bytes(f)) if w == memoryview(f).readonly else f
for w, f in zip(header["writeable"], frames)
]
frames = frames[::-1]
lengths = lengths[::-1]
writeables = writeables[::-1]
out = []
while lengths:
l = lengths.pop()
w = writeables.pop()
L = []
while l:
frame = frames.pop()
if nbytes(frame) <= l:
L.append(frame)
l -= nbytes(frame)
else:
frame = memoryview(frame)
L.append(frame[:l])
frames.append(frame[l:])
l = 0
if len(L) == 1 and w != memoryview(L[0]).readonly: # no work necessary
out.extend(L)
elif w:
out.append(bytearray().join(L))
else:
out.append(bytes().join(L))
return out
def pack_frames_prelude(frames):
nframes = len(frames)
nbytes_frames = map(nbytes, frames)
return struct.pack(f"Q{nframes}Q", nframes, *nbytes_frames)
def pack_frames(frames):
"""Pack frames into a byte-like object
This prepends length information to the front of the bytes-like object
See Also
--------
unpack_frames
"""
return b"".join([pack_frames_prelude(frames), *frames])
def unpack_frames(b):
"""Unpack bytes into a sequence of frames
This assumes that length information is at the front of the bytestring,
as performed by pack_frames
See Also
--------
pack_frames
"""
b = memoryview(b)
fmt = "Q"
fmt_size = struct.calcsize(fmt)
(n_frames,) = struct.unpack_from(fmt, b)
lengths = struct.unpack_from(f"{n_frames}{fmt}", b, fmt_size)
frames = []
start = fmt_size * (1 + n_frames)
for length in lengths:
end = start + length
frame = b[start:end]
frames.append(frame)
start = end
return frames
|