File: fsk_demodulator.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (204 lines) | stat: -rw-r--r-- 8,041 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2019 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

from math import ceil, pi
import pathlib
import sys

from gnuradio import gr, blocks, analog, digital, filter
from gnuradio.filter import firdes
import numpy as np

from ...hier.rms_agc_f import rms_agc_f
from ...utils.options_block import options_block


class fsk_demodulator(gr.hier_block2, options_block):
    """
    Hierarchical block to demodulate FSK.

    The input can be either IQ or real. For IQ input, it is assumed
    that the data is FM modulated, so FM demodulation is performed.
    For real input, it is assumed that the data is already FM
    demodulated.

    Args:
        baudrate: Baudrate in symbols per second (float)
        sample_rate: Sample rate in samples per second (float)
        iq: Whether the input is IQ or real (bool)
        deviation: Deviation in Hz, negative inverts sidebands (float)
        subaudio: Use subaudio demodulation (bool)
        dc_block: Use DC-block (bool)
        dump_path: Path to dump internal signals to files (str)
        options: Options from argparse
    """
    def __init__(self, baudrate, samp_rate, iq, deviation=None,
                 subaudio=False, dc_block=True, dump_path=None,
                 options=None):
        gr.hier_block2.__init__(
            self,
            'fsk_demodulator',
            gr.io_signature(1, 1,
                            gr.sizeof_gr_complex if iq else gr.sizeof_float),
            gr.io_signature(1, 1, gr.sizeof_float))
        options_block.__init__(self, options)

        use_agc = self.options.use_agc or not iq
        if self.options.disable_dc_block:
            dc_block = False

        if dump_path is not None:
            dump_path = pathlib.Path(dump_path)

        if deviation is not None:
            _deviation = deviation
        else:
            _deviation = self.options.deviation

        # Prevent problems due to baudrate too high
        if baudrate >= samp_rate:
            print(f'Sample rate {samp_rate} sps insufficient for {baudrate} '
                  'baud FSK demodulation. Demodulator will not work.',
                  file=sys.stderr)
            baudrate = samp_rate / 2

        if iq:
            # Cut to Carson's bandwidth rule before quadrature demod.
            # Note that _deviation can be negative to encode that the
            # low tone corresponds to the symbol 1 and the high tone
            # corresponds to the symbol 0.
            carson_cutoff = abs(_deviation) + baudrate / 2
            self.demod = analog.quadrature_demod_cf(
                samp_rate/(2*pi*_deviation))
            if carson_cutoff >= samp_rate / 2:
                # Sample rate is already narrower than Carson's
                # bandwidth. Do not filter
                self.connect(self, self.demod)
            else:
                # Sample rate is wider than Carson's bandwidth.
                # Lowpass filter before demod.
                fir_taps = firdes.low_pass(
                    1, samp_rate, carson_cutoff, 0.1 * carson_cutoff)
                self.demod_filter = filter.fir_filter_ccf(1, fir_taps)
                self.connect(self, self.demod_filter, self.demod)
        else:
            self.demod = self

        sps = samp_rate / baudrate
        max_sps = 10
        if sps > max_sps:
            decimation = ceil(sps / max_sps)
        else:
            decimation = 1
        sps /= decimation

        if subaudio:
            # some not-so-bad filter parameters for subaudio processing
            subaudio_cutoff = 2.0/3.0 * baudrate
            subaudio_transition = subaudio_cutoff / 4.0
            subaudio_taps = firdes.low_pass(
                1, samp_rate, subaudio_cutoff, subaudio_transition)
            self.subaudio_lowpass = filter.fir_filter_fff(1, subaudio_taps)

        # square pulse filter
        sqfilter_len = int(samp_rate / baudrate)
        taps = np.ones(sqfilter_len)/sqfilter_len
        self.lowpass = filter.fir_filter_fff(decimation, taps)

        if dc_block:
            self.dcblock = filter.dc_blocker_ff(ceil(sps * 32), True)
        else:
            self.dcblock = self.lowpass  # to simplify connections below

        if use_agc:
            # This gives a time constant of 50 symbols
            agc_constant = 2e-2 / sps
            self.agc = rms_agc_f(agc_constant, 1)

        if dump_path is not None:
            self.waveform = blocks.file_sink(
                gr.sizeof_float, str(dump_path / 'waveform.f32'))

        # "Eempiric" formula for TED gain of Gardner detector
        # 1.47 symbol^{-1}
        ted_gain = 1.47
        damping = 1.0
        self.clock_recovery = digital.symbol_sync_ff(
            digital.TED_GARDNER, sps, self.options.clk_bw, damping, ted_gain,
            self.options.clk_limit * sps, 1,
            digital.constellation_bpsk().base(), digital.IR_PFB_NO_MF)

        if dump_path is not None:
            self.clock_recovery_out = blocks.file_sink(
                gr.sizeof_float,
                str(dump_path / 'clock_recovery_out.f32'), False)
            self.clock_recovery_err = blocks.file_sink(
                gr.sizeof_float,
                str(dump_path / 'clock_recovery_err.f32'), False)
            self.clock_recovery_T_inst = blocks.file_sink(
                gr.sizeof_float,
                str(dump_path / 'clock_recovery_T_inst.f32'), False)
            self.clock_recovery_T_avg = blocks.file_sink(
                gr.sizeof_float,
                str(dump_path / 'clock_recovery_T_avg.f32'), False)
            self.connect(self.clock_recovery, self.clock_recovery_out)
            self.connect((self.clock_recovery, 1), self.clock_recovery_err)
            self.connect((self.clock_recovery, 2), self.clock_recovery_T_inst)
            self.connect((self.clock_recovery, 3), self.clock_recovery_T_avg)

        conns = [self.demod]
        if subaudio:
            conns.append(self.subaudio_lowpass)
        conns.append(self.lowpass)
        if dc_block:
            conns.append(self.dcblock)
        self.connect(*conns)
        if use_agc:
            self.connect(self.dcblock, self.agc, self.clock_recovery)
            if dump_path is not None:
                self.connect(self.agc, self.waveform)
        else:
            self.connect(self.dcblock, self.clock_recovery)
            if dump_path is not None:
                self.connect(self.dcblock, self.waveform)
        if not iq and _deviation < 0:
            # when working with FM-demodulated input, if the deviation is
            # negative, the polarity of the signal needs to be inverted.
            self.invert_polarity = blocks.multiply_const_ff(-1, 1)
            self.connect(self.clock_recovery, self.invert_polarity, self)
        else:
            self.connect(self.clock_recovery, self)

    _default_clk_rel_bw = 0.06
    _default_clk_limit = 0.004
    _default_deviation_hz = 5000

    @classmethod
    def add_options(cls, parser):
        """
        Adds FSK demodulator specific options to the argparse parser
        """
        parser.add_argument(
            '--clk_bw', type=float, default=cls._default_clk_rel_bw,
            help=('Clock recovery bandwidth (relative to baudrate) '
                  '[default=%(default)r]'))
        parser.add_argument(
            '--clk_limit', type=float, default=cls._default_clk_limit,
            help=('Clock recovery limit (relative to baudrate) '
                  '[default=%(default)r]'))
        parser.add_argument(
            '--deviation', type=float, default=cls._default_deviation_hz,
            help='Deviation (Hz) [default=%(default)r]')
        parser.add_argument(
            '--use_agc', action='store_true',
            help='Use AGC (for IQ input. AGC always on for real input)')
        parser.add_argument(
            '--disable_dc_block', action='store_true',
            help='Disable DC block')