File: autopolarization.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (85 lines) | stat: -rw-r--r-- 3,282 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2020-2023 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

import numpy as np
from gnuradio import gr


class autopolarization(gr.sync_block):
    def __init__(self, fft_size=2048, fft_avg=1, iir_weight=0.1):
        gr.sync_block.__init__(
            self,
            name='autopolarization',
            in_sig=[np.complex64]*2,
            out_sig=[np.complex64]*2
        )
        self.fft_size = fft_size
        self.fft_avg = fft_avg
        self.set_output_multiple(fft_size * fft_avg)
        self.iir_weight = iir_weight
        self.noise_ampl = np.ones(2)
        self.sig_ampl = np.ones(2)
        self.phase = 0

    def work(self, input_items, output_items):
        x = np.array([inp.reshape((-1, self.fft_avg, self.fft_size))
                      for inp in input_items])
        # axes for x are:
        # (channel: 2, block, fft_num: fft_avg, fft_bin: fft_size)

        fx = np.fft.fft(x)
        fx_sq_avg = np.average(np.abs(fx)**2, axis=2)
        noise_ampl = np.sqrt(np.median(fx_sq_avg, axis=2))

        fx_sq_avg_ch_max = np.max(fx_sq_avg/noise_ampl[..., np.newaxis]**2,
                                  axis=0)
        peak_idx = np.argmax(fx_sq_avg_ch_max, axis=1)

        f_cross = np.average(fx[0] * np.conjugate(fx[1]), axis=1)
        phase = np.angle(f_cross[np.arange(f_cross.shape[0]),
                                 peak_idx])
        # technically this should have - noise_ampl
        sig_ampl = fx_sq_avg[:, np.arange(fx_sq_avg.shape[1]), peak_idx]
        sig_ampl = np.sqrt(np.clip(sig_ampl, 0, np.inf))

        # update IIR filters
        for j in range(x.shape[1]):
            noise_ampl[:, j] = (1-self.iir_weight)*self.noise_ampl \
                + self.iir_weight*noise_ampl[:, j]
            self.noise_ampl = noise_ampl[:, j]
            sig_ampl[:, j] = (1-self.iir_weight)*self.sig_ampl \
                + self.iir_weight*sig_ampl[:, j]
            self.sig_ampl = sig_ampl[:, j]
            phase_diff = phase[j] - self.phase
            phase_diff = (phase_diff + np.pi) % (2*np.pi) - np.pi
            phase[j] = self.phase + self.iir_weight*phase_diff
            self.phase = (phase[j] + np.pi) % (2*np.pi) - np.pi

        tau = np.log10(sig_ampl[0]/noise_ampl[0]+1e-6) \
            - np.log10(sig_ampl[1]/noise_ampl[1]+1e-6)
        tau = 20*tau/6*0.5 + 0.5
        tau = np.clip(tau, 0, 1)
        # note tau = 1 if sig[0] is 6dB stronger than sig[1] and
        # tau = 0 if sig[0] is 6dB weaker than sig[1]
        a_phase = (tau - 1) * phase
        b_phase = tau * phase
        # note -a_phase + b_phase = phase

        alpha = (np.exp(1j*a_phase)
                 * sig_ampl[0] / noise_ampl[0])[:, np.newaxis]
        beta = (np.exp(1j*b_phase)
                * sig_ampl[1] / noise_ampl[1])[:, np.newaxis]

        y = x.reshape((x.shape[0], x.shape[1], self.fft_avg * self.fft_size))
        y = y / noise_ampl[..., np.newaxis] * np.sqrt(self.fft_size)
        output_items[0][:] = (alpha * y[0] + beta * y[1]).ravel()
        output_items[1][:] = (
            np.conjugate(beta) * y[0] - np.conjugate(alpha) * y[1]).ravel()
        return len(output_items[0])