File: ideassat_deframer.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (157 lines) | stat: -rw-r--r-- 5,528 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2021 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

import struct

from gnuradio import gr, digital
import numpy as np
import pmt

from ... import crc, nrzi_decode
from ...hier.sync_to_pdu import sync_to_pdu
from ...utils.options_block import options_block


# The 40 bit syncword is formed by 4130f000, which is the end of the
# AX.25 address of the first subpacket (including the subpacket counter),
# encoded as UART with 1 stop bit.
_syncword = '0010000011000110000101111000010000000001'


class uart_decode(gr.basic_block):
    """
    Helper block to remove UART-like encoding

    The input is unpacked bits having 10 bits per byte consisting of a
    start bit (which should be 0 but we don't look at), an 8-bit byte
    in MSB-first order, and a stop bit (which should be 1 but we don't
    look at). The output consists of the 8-bit bytes as packed bytes.
    """
    def __init__(self):
        gr.basic_block.__init__(
            self,
            name='uart_decode',
            in_sig=[],
            out_sig=[])
        self.message_port_register_in(pmt.intern('in'))
        self.set_msg_handler(pmt.intern('in'), self.handle_msg)
        self.message_port_register_out(pmt.intern('out'))

    def handle_msg(self, msg_pmt):
        msg = pmt.cdr(msg_pmt)
        if not pmt.is_u8vector(msg):
            print('[ERROR] Received invalid message type. Expected u8vector')
            return
        packet = np.array(pmt.u8vector_elements(msg), dtype='uint8')
        if packet.size % 10 != 0:
            print('[ERROR] Packet size is not a multiple of 10 bits')
            return
        packet = np.packbits(packet.reshape((-1, 10))[:, 1:-1])
        self.message_port_pub(
            pmt.intern('out'),
            pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(len(packet), packet)))


class extract_payload(gr.basic_block):
    """
    Helper block to throw subpacket headers and extract CRC-protected payload

    This also checks the CRC
    """
    def __init__(self, verbose=False):
        gr.basic_block.__init__(
            self,
            name='extract_payload',
            in_sig=[],
            out_sig=[])
        self.crc_calc = crc(16, 0x1021, 0xFFFF, 0x0, False, False)
        self.verbose = verbose
        self.message_port_register_in(pmt.intern('in'))
        self.set_msg_handler(pmt.intern('in'), self.handle_msg)
        self.message_port_register_out(pmt.intern('out'))

    def handle_msg(self, msg_pmt):
        msg = pmt.cdr(msg_pmt)
        if not pmt.is_u8vector(msg):
            print('[ERROR] Received invalid message type. Expected u8vector')
            return
        packet = np.array(pmt.u8vector_elements(msg), dtype='uint8')
        # Put initial padding to account for the AX.25 header we've lost
        initial_padding = np.zeros(17, dtype='uint8')
        packet = np.concatenate((initial_padding, packet))
        if packet.size != 40 * 9:
            print('[ERROR] Invalid packet size')
            return
        packet = packet.reshape((9, 40))
        # Save AX.25 header. we take it from the 2nd message
        header = packet[1, 1:16]
        # Drop AX.25 headers and final 0x7e
        packet = packet[:, 17:-1].ravel()
        # Drop final padding
        packet = packet[:-11]
        # Check CRC (do not include first 4 bytes)
        crc_val = self.crc_calc.compute(packet[4:-2])
        if crc_val != struct.unpack('<H', packet[-2:])[0]:
            if self.verbose:
                print('CRC failed')
                return
        elif self.verbose:
            print('CRC OK')
        # Drop CRC
        packet = packet[:-2]
        # Put AX.25 header back
        packet = np.concatenate((header, packet))
        self.message_port_pub(
            pmt.intern('out'),
            pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(len(packet), packet)))


class ideassat_deframer(gr.hier_block2, options_block):
    """
    Hierarchical block to deframe IDEASSat ad-hoc UART-like protocol

    The input is a float stream of soft symbols. The output are PDUs
    with IDEASSat frames.

    Args:
        options: Options from argparse
    """
    def __init__(self, options=None):
        gr.hier_block2.__init__(
            self,
            'ideassat_deframer',
            gr.io_signature(1, 1, gr.sizeof_float),
            gr.io_signature(0, 0, 0))
        options_block.__init__(self, options)

        self.message_port_register_hier_out('out')

        self.slicer = digital.binary_slicer_fb()
        self.nrzi = nrzi_decode()
        # A length of 40 bytes will give at the end the two 0x7e HDLC flags
        # we do not allow syncword errors because this protocol is very
        # brittle.
        self.deframer = sync_to_pdu(
            packlen=10 * (9*40 - 17), sync=_syncword, threshold=0)
        self.uart = uart_decode()
        self.payload = extract_payload(self.options.verbose_crc)

        self.connect(self, self.slicer, self.nrzi, self.deframer)
        self.msg_connect((self.deframer, 'out'), (self.uart, 'in'))
        self.msg_connect((self.uart, 'out'), (self.payload, 'in'))
        self.msg_connect((self.payload, 'out'), (self, 'out'))

    @classmethod
    def add_options(cls, parser):
        """
        Adds IDEASSat deframer specific options to the argparse parser
        """
        parser.add_argument(
            '--verbose_crc', action='store_true', help='Verbose CRC')