File: packet_decoding.py

package info (click to toggle)
bpack 1.3.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 644 kB
  • sloc: python: 6,088; makefile: 75
file content (76 lines) | stat: -rw-r--r-- 1,863 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import dataclasses
from typing import Tuple

import numpy as np

from bpack.np import unpackbits


@dataclasses.dataclass(frozen=True)
class PacketDescriptor:
    header_size: int
    bits_per_sample: int
    nsamples: int

    @property
    def packet_size(self):
        return self.header_size + self.bits_per_sample * self.nsamples


def decode_packet(
    data: bytes, descr: PacketDescriptor
) -> tuple[np.ndarray, np.ndarray]:
    if descr.header_size > 0:
        headers = unpackbits(
            data,
            descr.header_size,
            samples_per_block=1,
            blockstride=descr.packet_size,
        )
    else:
        headers = np.empty(shape=(), dtype="u1")

    samples = unpackbits(
        data,
        descr.bits_per_sample,
        samples_per_block=descr.nsamples,
        bit_offset=descr.header_size,
        blockstride=descr.packet_size,
    )

    return samples, headers


def test():
    from bpack.tests.test_packbits import (
        _make_sample_data_block as mk_sample_data,
    )

    descr = PacketDescriptor(header_size=13, bits_per_sample=5, nsamples=64)
    print("descr", descr)

    data, values = mk_sample_data(
        descr.header_size, descr.bits_per_sample, descr.nsamples, nblocks=2
    )

    values_per_block = 1 + descr.nsamples
    header_values = np.asarray(values[::values_per_block])
    sample_values = np.asarray(
        [
            values[start : start + descr.nsamples]
            for start in range(1, len(values), values_per_block)
        ]
    ).ravel()
    print("header_values:", header_values)
    print("sample_values:", sample_values)

    samples, headers = decode_packet(data, descr)
    print("headers:", headers)
    print("samples:", samples)

    assert (headers == header_values).all()
    assert (samples == sample_values).all()


if __name__ == "__main__":
    test()