File: usp_deframer.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (84 lines) | stat: -rw-r--r-- 2,907 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2021 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

from gnuradio import gr, digital, fec

from ... import decode_rs
from ...hier.ccsds_descrambler import ccsds_descrambler
from ...hier.sync_to_pdu_soft import sync_to_pdu_soft
from ...usp import usp_ax25_crop, usp_pls_crop
from ...utils.options_block import options_block


_syncword = '0101000001110010111101100100101100101101100100001011000111110101'


class usp_deframer(gr.hier_block2, options_block):
    """
    Hierarchical block to deframe the Unified SPUTNIX Protocol (USP)

    This framing is based on the CCSDS concatenated framing, but is
    optimized for variable frame size and borrows some ideas from
    DVB-S2.

    The description of the protocol can be found in
    https://sputnix.ru/tpl/docs/amateurs/
    USP%20protocol%20description%20v1.04.pdf

    The input is a float stream of soft symbols. The output are PDUs
    with frames.

    Args:
        syncword_threshold: number of bit errors allowed in syncword (int)
        options: Options from argparse
    """
    def __init__(self, syncword_threshold=None, options=None):
        gr.hier_block2.__init__(
            self,
            'usp_deframer',
            gr.io_signature(1, 1, gr.sizeof_float),
            gr.io_signature(0, 0, 0))
        options_block.__init__(self, options)

        self.message_port_register_hier_out('out')

        if syncword_threshold is None:
            syncword_threshold = self.options.syncword_threshold

        self.deframer = sync_to_pdu_soft(
            packlen=4144, sync=_syncword, threshold=syncword_threshold)
        self.pls = usp_pls_crop()
        self.viterbi = fec.cc_decoder.make(
            4080, 7, 2, [79, -109], 0, -1, fec.CC_TRUNCATED, False)
        self.viterbi_decoder = fec.async_decoder(
            self.viterbi, False, False, 4080//8)
        self.scrambler = ccsds_descrambler()
        self.rs = decode_rs(True, 1)
        self.crop = usp_ax25_crop()

        self.connect(self, self.deframer)
        self.msg_connect((self.deframer, 'out'), (self.pls, 'in'))
        self.msg_connect((self.pls, 'out'), (self.viterbi_decoder, 'in'))
        self.msg_connect((self.viterbi_decoder, 'out'), (self.scrambler, 'in'))
        self.msg_connect((self.scrambler, 'out'), (self.rs, 'in'))
        self.msg_connect((self.rs, 'out'), (self.crop, 'in'))
        self.msg_connect((self.crop, 'out'), (self, 'out'))

    _default_sync_threshold = 13

    @classmethod
    def add_options(cls, parser):
        """
        Adds USP deframer specific options to the argparse parser
        """
        parser.add_argument(
            '--syncword_threshold', type=int,
            default=cls._default_sync_threshold,
            help='Syncword bit errors [default=%(default)r]')