File: read_hybrid_observables_dump.py

package info (click to toggle)
gnss-sdr 0.0.20-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 27,564 kB
  • sloc: cpp: 218,512; ansic: 36,754; python: 2,423; xml: 1,479; sh: 459; makefile: 8
file content (112 lines) | stat: -rw-r--r-- 3,987 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
 read_hybrid_observables_dump.py

   This function plots the tracking results for the given channel list.

 Irene Pérez Riega, 2023. iperrie@inta.es

 read_hybrid_observables_dump(channels, filename)

   Args:
       channels        - list of channels to be processed
       filename        - path to the observables file

 -----------------------------------------------------------------------------

 GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
 This file is part of GNSS-SDR.

 Copyright (C) 2022  (see AUTHORS file for a list of contributors)
 SPDX-License-Identifier: GPL-3.0-or-later

 -----------------------------------------------------------------------------
"""

import struct

def read_hybrid_observables_dump(channels, filename):

    double_size_bytes = 8
    bytes_shift = 0

    RX_time = [[] for _ in range(channels+1)]
    d_TOW_at_current_symbol = [[] for _ in range(channels+1)]
    Carrier_Doppler_hz = [[] for _ in range(channels+1)]
    Carrier_phase_hz = [[] for _ in range(channels+1)]
    Pseudorange_m = [[] for _ in range(channels+1)]
    PRN = [[] for _ in range(channels+1)]
    valid = [[] for _ in range(channels+1)]

    f = open(filename, 'rb')
    if f is None:
        return None
    else:
        while True:
            try:
                # There is an empty channel at the end (Channel-6)
                for N in range(0, channels+1):
                    f.seek(bytes_shift, 0)

                    RX_time[N].append(struct.unpack(
                        'd',f.read(double_size_bytes))[0])
                    bytes_shift += double_size_bytes
                    f.seek(bytes_shift, 0)

                    d_TOW_at_current_symbol[N].append(struct.unpack(
                        'd', f.read(double_size_bytes))[0])
                    bytes_shift += double_size_bytes
                    f.seek(bytes_shift, 0)

                    Carrier_Doppler_hz[N].append(struct.unpack(
                        'd', f.read(double_size_bytes))[0])
                    bytes_shift += double_size_bytes
                    f.seek(bytes_shift, 0)

                    Carrier_phase_hz[N].append(struct.unpack(
                        'd', f.read(double_size_bytes))[0])
                    bytes_shift += double_size_bytes
                    f.seek(bytes_shift, 0)

                    Pseudorange_m[N].append(struct.unpack(
                        'd', f.read(double_size_bytes))[0])
                    bytes_shift += double_size_bytes
                    f.seek(bytes_shift, 0)

                    PRN[N].append(struct.unpack(
                        'd', f.read(double_size_bytes))[0])
                    bytes_shift += double_size_bytes
                    f.seek(bytes_shift, 0)

                    valid[N].append(struct.unpack(
                        'd', f.read(double_size_bytes))[0])
                    bytes_shift += double_size_bytes
                    f.seek(bytes_shift, 0)

            except:
                break

        # Delete last Channel:
        RX_time = [row for i, row in enumerate(RX_time) if i != 5]
        d_TOW_at_current_symbol = [row for i, row in enumerate(
            d_TOW_at_current_symbol)if i != 5]
        Carrier_Doppler_hz = [row for i, row in enumerate(
            Carrier_Doppler_hz) if i != 5]
        Carrier_phase_hz = [row for i, row in enumerate(
            Carrier_phase_hz) if i != 5]
        Pseudorange_m = [row for i, row in enumerate(Pseudorange_m) if i != 5]
        PRN = [row for i, row in enumerate(PRN) if i != 5]
        valid = [row for i, row in enumerate(valid) if i != 5]

        observables = {
            'RX_time': RX_time,
            'd_TOW_at_current_symbol': d_TOW_at_current_symbol,
            'Carrier_Doppler_hz': Carrier_Doppler_hz,
            'Carrier_phase_hz': Carrier_phase_hz,
            'Pseudorange_m': Pseudorange_m,
            'PRN': PRN,
            'valid':valid
        }

    f.close()

    return observables