File: diy1_deframer.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (142 lines) | stat: -rw-r--r-- 4,493 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2021-2022 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

import struct

from gnuradio import gr, digital
import numpy as np
import pmt

from ...crcs import crc16_ccitt_zero
from ...hier.sync_to_pdu_packed import sync_to_pdu_packed
from ...utils.options_block import options_block


# Syncword is 0x2dd4
_syncword = '0010110111010100'


class uart_decode(gr.basic_block):
    """
    Helper block to remove UART-like encoding

    The input is unpacked bits having 10 bits per byte consisting of a
    start bit (which should be 0 but we don't look at), an 8-bit byte
    in MSB-first order, and a stop bit (which should be 1 but we don't
    look at). The output consists of the 8-bit bytes as packed bytes.
    """
    def __init__(self):
        gr.basic_block.__init__(
            self,
            name='uart_decode',
            in_sig=[],
            out_sig=[])
        self.message_port_register_in(pmt.intern('in'))
        self.set_msg_handler(pmt.intern('in'), self.handle_msg)
        self.message_port_register_out(pmt.intern('out'))

    def handle_msg(self, msg_pmt):
        msg = pmt.cdr(msg_pmt)
        if not pmt.is_u8vector(msg):
            print('[ERROR] Received invalid message type. Expected u8vector')
            return
        packet = np.array(pmt.u8vector_elements(msg), dtype='uint8')
        if packet.size % 10 != 0:
            print('[ERROR] Packet size is not a multiple of 10 bits')
            return
        packet = np.packbits(packet.reshape((-1, 10))[:, 1:-1])
        self.message_port_pub(
            pmt.intern('out'),
            pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(len(packet), packet)))


class crop(gr.basic_block):
    """
    Helper block to crop DIY-1 packets according to packet length

    The format of DIY-1 packets is:
    - Header: 4 bytes
    - Length field: 1 byte (contains length of data field)
    - Data
    - CRC-16

    This crop block reads the length field and trims the PDU accordingly.
    Its is a PDU with all the fields listed above.
    """
    def __init__(self, verbose=False):
        gr.basic_block.__init__(
            self,
            name='crop',
            in_sig=[],
            out_sig=[])
        self.verbose = verbose
        self.message_port_register_in(pmt.intern('in'))
        self.set_msg_handler(pmt.intern('in'), self.handle_msg)
        self.message_port_register_out(pmt.intern('out'))

    def handle_msg(self, msg_pmt):
        msg = pmt.cdr(msg_pmt)
        if not pmt.is_u8vector(msg):
            print('[ERROR] Received invalid message type. Expected u8vector')
            return
        packet = pmt.u8vector_elements(msg)
        if len(packet) < 7:
            # Packet is too short
            return
        length = packet[4]
        packet = packet[:5+length+2]
        self.message_port_pub(
            pmt.intern('out'),
            pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(len(packet), packet)))


class diy1_deframer(gr.hier_block2, options_block):
    """
    Hierarchical block to deframe DIY-1 RFM22 frames

    The input is a float stream of soft symbols. The output are PDUs
    with DIY-1 frames.

    Args:
        options: Options from argparse
    """
    def __init__(self, options=None):
        gr.hier_block2.__init__(
            self,
            'diy1_deframer',
            gr.io_signature(1, 1, gr.sizeof_float),
            gr.io_signature(0, 0, 0))
        options_block.__init__(self, options)

        self.message_port_register_hier_out('out')

        self.slicer = digital.binary_slicer_fb()
        self.deframer = sync_to_pdu_packed(
            packlen=262, sync=_syncword,
            threshold=self.options.syncword_threshold)
        self.crop = crop()
        self.crc = crc16_ccitt_zero()

        self.connect(self, self.slicer, self.deframer)
        self.msg_connect((self.deframer, 'out'), (self.crop, 'in'))
        self.msg_connect((self.crop, 'out'), (self.crc, 'in'))
        self.msg_connect((self.crc, 'ok'), (self, 'out'))

    _default_sync_threshold = 0

    @classmethod
    def add_options(cls, parser):
        """
        Adds DIY-1 deframer specific options to the argparse parser
        """
        parser.add_argument(
            '--syncword_threshold', type=int,
            default=cls._default_sync_threshold,
            help='Syncword bit errors [default=%(default)r]')