1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2018,2020 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
from gnuradio import gr
import numpy as np
import pmt
from .bch15 import decode_bch15
from .telemetry.snet import LTUFrameHeader
class snet_deframer(gr.basic_block):
"""docstring for block snet_deframer"""
def __init__(self, verbose=False, buggy_crc=True):
gr.basic_block.__init__(
self,
name='snet_deframer',
in_sig=[],
out_sig=[])
self.verbose = verbose
self.buggy_crc = buggy_crc
self.message_port_register_in(pmt.intern('in'))
self.set_msg_handler(pmt.intern('in'), self.handle_msg)
self.message_port_register_out(pmt.intern('out'))
def handle_msg(self, msg_pmt):
msg = pmt.cdr(msg_pmt)
if not pmt.is_u8vector(msg):
print('[ERROR] Received invalid message type. Expected u8vector')
return
bits = np.array(pmt.u8vector_elements(msg))
ltu = bits[:210].reshape((15, 14)).transpose()
# Decode BCH(15,5,7)
if not all((decode_bch15(ltu[j, :]) for j in range(14))):
# Decode failure
if self.verbose:
print('BCH decode failure')
return
ltu = np.fliplr(ltu[:, -5:]).ravel()
hdr = LTUFrameHeader.parse(np.packbits(ltu))
ltu_crc = np.concatenate((ltu[:-5], np.array([1, 0, 1, 1, 0, 1, 1])))
ltu_crc = ltu_crc.reshape((9, 8))
if self.buggy_crc:
# Reverse byte ordering for CRC5 calculation
ltu_crc = np.flipud(ltu_crc)
if self.buggy_crc:
# Force CRC5 bugs
ltu_crc[4, :] = ltu_crc[3, :]
# CRC5 calculation
crc = 0x1F
for bit in ltu_crc.ravel():
# Check most significant bit in the CRC buffer and save
# in a variable.
c = crc & 0x10
# Shift variable to make the compare op. possible (see beneath).
c >>= 4
# Shift CRC to the left and write 0 into the least significant bit.
crc <<= 1
if c != bit:
crc ^= 0x15 # CRC polynomial
crc &= 0x1F
if crc != hdr.CRC5:
if self.verbose:
print('CRC5 fail')
return
if self.verbose:
print(hdr)
if hdr.PduLength == 0:
return
codewords_per_block = 16
uncoded = False
if hdr.AiTypeSrc == 0:
uncoded = True
elif hdr.AiTypeSrc == 1:
data_bits_per_codeword = 11 # BCH(15,11,3)
bch_d = 3
elif hdr.AiTypeSrc == 2:
data_bits_per_codeword = 7 # BCH(15,7,5)
bch_d = 5
elif hdr.AiTypeSrc == 3:
data_bits_per_codeword = 5 # BCH(15,5,7)
bch_d = 7
else:
if self.verbose:
print('Invalid AiTypeSrc')
return
if uncoded:
pdu_bytes = bits[210:210+hdr.PduLength*8]
pdu_bytes = pdu_bytes.reshape((hdr.PduLength, 8))
pdu_bytes = np.fliplr(pdu_bytes)
else:
data_bytes_per_block = (codewords_per_block
* data_bits_per_codeword // 8)
num_blocks = int(np.ceil(float(
hdr.PduLength) / data_bytes_per_block))
blocks = list()
for k in range(num_blocks):
block = bits[210+k*16*15:210+(k+1)*16*15].reshape((15, 16))
if bch_d:
block = block.transpose()
if not bch_d:
print(block)
# Decode BCH
if (bch_d
and not all((decode_bch15(block[j, :], d=bch_d)
for j in range(16)))):
# Decode failure
if self.verbose:
print('BCH decode failure')
return
if bch_d:
blocks.append(block[:, -data_bits_per_codeword:].ravel())
else:
blocks.append(block.ravel())
pdu_bytes = np.concatenate(blocks)
pdu_bytes = pdu_bytes.reshape((data_bytes_per_block
* num_blocks, 8))
pdu_bytes = np.fliplr(pdu_bytes)
# Drop 0xDB padding bytes at the end
pdu_bytes = pdu_bytes[:hdr.PduLength]
if not bch_d:
print(pdu_bytes)
# CRC13
crc = 0x1FFF
pdu_crc = np.flipud(pdu_bytes) if self.buggy_crc else pdu_bytes
for bit in pdu_crc.ravel():
# Check most significant bit in the CRC buffer and save it
# in a variable.
c = crc & 0x1000
# Shift variable to make the compare op. possible (see beneath).
c >>= 12
# Shift CRC to the left and write 0 into the least significant bit.
crc <<= 1
if (c or bit if self.buggy_crc else c != bit):
crc ^= 0x1CF5 # CRC polynomial
crc &= 0x1FFF
if crc != hdr.CRC13:
if self.verbose:
print('CRC13 fail')
return
pdu = np.packbits(pdu_bytes)
pdu_tags = pmt.make_dict()
pdu_tags = pmt.dict_add(
pdu_tags, pmt.intern('SNET SrcId'), pmt.from_long(hdr.SrcId))
self.message_port_pub(
pmt.intern('out'),
pmt.cons(pdu_tags, pmt.init_u8vector(len(pdu), pdu)))
|