1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
|
import dataclasses
from typing import Tuple
import numpy as np
from bpack.np import unpackbits
@dataclasses.dataclass(frozen=True)
class PacketDescriptor:
header_size: int
bits_per_sample: int
nsamples: int
@property
def packet_size(self):
return self.header_size + self.bits_per_sample * self.nsamples
def decode_packet(
data: bytes, descr: PacketDescriptor
) -> tuple[np.ndarray, np.ndarray]:
if descr.header_size > 0:
headers = unpackbits(
data,
descr.header_size,
samples_per_block=1,
blockstride=descr.packet_size,
)
else:
headers = np.empty(shape=(), dtype="u1")
samples = unpackbits(
data,
descr.bits_per_sample,
samples_per_block=descr.nsamples,
bit_offset=descr.header_size,
blockstride=descr.packet_size,
)
return samples, headers
def test():
from bpack.tests.test_packbits import (
_make_sample_data_block as mk_sample_data,
)
descr = PacketDescriptor(header_size=13, bits_per_sample=5, nsamples=64)
print("descr", descr)
data, values = mk_sample_data(
descr.header_size, descr.bits_per_sample, descr.nsamples, nblocks=2
)
values_per_block = 1 + descr.nsamples
header_values = np.asarray(values[::values_per_block])
sample_values = np.asarray(
[
values[start : start + descr.nsamples]
for start in range(1, len(values), values_per_block)
]
).ravel()
print("header_values:", header_values)
print("sample_values:", sample_values)
samples, headers = decode_packet(data, descr)
print("headers:", headers)
print("samples:", samples)
assert (headers == header_values).all()
assert (samples == sample_values).all()
if __name__ == "__main__":
test()
|