1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
"""
read_hybrid_observables_dump.py
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
read_hybrid_observables_dump(channels, filename)
Args:
channels - list of channels to be processed
filename - path to the observables file
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
def read_hybrid_observables_dump(channels, filename):
double_size_bytes = 8
bytes_shift = 0
RX_time = [[] for _ in range(channels+1)]
d_TOW_at_current_symbol = [[] for _ in range(channels+1)]
Carrier_Doppler_hz = [[] for _ in range(channels+1)]
Carrier_phase_hz = [[] for _ in range(channels+1)]
Pseudorange_m = [[] for _ in range(channels+1)]
PRN = [[] for _ in range(channels+1)]
valid = [[] for _ in range(channels+1)]
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
try:
# There is an empty channel at the end (Channel-6)
for N in range(0, channels+1):
f.seek(bytes_shift, 0)
RX_time[N].append(struct.unpack(
'd',f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
d_TOW_at_current_symbol[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Carrier_Doppler_hz[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Carrier_phase_hz[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Pseudorange_m[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
PRN[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
valid[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
except:
break
# Delete last Channel:
RX_time = [row for i, row in enumerate(RX_time) if i != 5]
d_TOW_at_current_symbol = [row for i, row in enumerate(
d_TOW_at_current_symbol)if i != 5]
Carrier_Doppler_hz = [row for i, row in enumerate(
Carrier_Doppler_hz) if i != 5]
Carrier_phase_hz = [row for i, row in enumerate(
Carrier_phase_hz) if i != 5]
Pseudorange_m = [row for i, row in enumerate(Pseudorange_m) if i != 5]
PRN = [row for i, row in enumerate(PRN) if i != 5]
valid = [row for i, row in enumerate(valid) if i != 5]
observables = {
'RX_time': RX_time,
'd_TOW_at_current_symbol': d_TOW_at_current_symbol,
'Carrier_Doppler_hz': Carrier_Doppler_hz,
'Carrier_phase_hz': Carrier_phase_hz,
'Pseudorange_m': Pseudorange_m,
'PRN': PRN,
'valid':valid
}
f.close()
return observables
|