1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
#
# Copyright 2021 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
X4XX GPS Manager
Handles GPS-related tasks
"""
import re
from usrp_mpm.gpsd_iface import GPSDIfaceExtension
class X4xxGPSMgr:
"""
Manager class for GPS-related actions for the X4XX.
This also "disables" the sensors when the GPS is not enabled.
"""
def __init__(self, clk_aux_board, log):
assert clk_aux_board and clk_aux_board.is_gps_supported()
self._clocking_auxbrd = clk_aux_board
self.log = log.getChild('GPS')
self.log.trace("Initializing GPSd interface")
self._gpsd = GPSDIfaceExtension()
# To disable sensors, we simply return an empty value if GPS is disabled.
# For TPV, SKY, and GPGGA, we can do this in the same fashion (they are
# very similar). gps_time is different (it returns an int) so for sake
# of simplicity it's defined separately below.
for sensor_name in ('gps_tpv', 'gps_sky', 'gps_gpgga'):
sensor_api = f'get_{sensor_name}_sensor'
setattr(
self, sensor_api,
lambda sensor_name=sensor_name: {
'name': sensor_name, 'type': 'STRING',
'unit': '', 'value': 'n/a'} \
if not self.is_gps_enabled() \
else getattr(self._gpsd, f'get_{sensor_name}_sensor')()
)
def extend(self, context):
"""
Extend 'context' with the sensor methods of this class (get_gps_*_sensor).
If 'context' already has such a method, it is skipped.
Returns a dictionary compatible to mboard_sensor_callback_map.
"""
new_methods = {
re.search(r"get_(.*)_sensor", method_name).group(1): method_name
for method_name in dir(self)
if not method_name.startswith('_') \
and callable(getattr(self, method_name)) \
and method_name.endswith("sensor")}
for method_name in new_methods.values():
if hasattr(context, method_name):
continue
new_method = getattr(self, method_name)
self.log.trace("%s: Adding %s method", context, method_name)
setattr(context, method_name, new_method)
return new_methods
def is_gps_enabled(self):
"""
Return True if the GPS is enabled/active.
"""
return self._clocking_auxbrd.is_gps_enabled()
def get_gps_enabled_sensor(self):
"""
Get enabled status of GPS as a sensor dict
"""
gps_enabled = self.is_gps_enabled()
return {
'name': 'gps_enabled',
'type': 'BOOLEAN',
'unit': 'enabled' if gps_enabled else 'disabled',
'value': str(gps_enabled).lower(),
}
def get_gps_locked_sensor(self):
"""
Get lock status of GPS as a sensor dict
"""
gps_locked = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_locked())
return {
'name': 'gps_locked',
'type': 'BOOLEAN',
'unit': 'locked' if gps_locked else 'unlocked',
'value': str(gps_locked).lower(),
}
def get_gps_alarm_sensor(self):
"""
Get alarm status of GPS as a sensor dict
"""
gps_alarm = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_alarm())
return {
'name': 'gps_alarm',
'type': 'BOOLEAN',
'unit': 'active' if gps_alarm else 'not active',
'value': str(gps_alarm).lower(),
}
def get_gps_warmup_sensor(self):
"""
Get warmup status of GPS as a sensor dict
"""
gps_warmup = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_warmup())
return {
'name': 'gps_warmup',
'type': 'BOOLEAN',
'unit': 'warming up' if gps_warmup else 'warmup done',
'value': str(gps_warmup).lower(),
}
def get_gps_survey_sensor(self):
"""
Get survey status of GPS as a sensor dict
"""
gps_survey = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_survey())
return {
'name': 'gps_survey',
'type': 'BOOLEAN',
'unit': 'survey active' if gps_survey else 'survey not active',
'value': str(gps_survey).lower(),
}
def get_gps_phase_lock_sensor(self):
"""
Get phase_lock status of GPS as a sensor dict
"""
gps_phase_lock = self.is_gps_enabled() and \
bool(self._clocking_auxbrd.get_gps_phase_lock())
return {
'name': 'gps_phase_lock',
'type': 'BOOLEAN',
'unit': 'phase locked' if gps_phase_lock else 'no phase lock',
'value': str(gps_phase_lock).lower(),
}
def get_gps_time_sensor(self):
"""
Get GPS time in integer seconds as a sensor dict
"""
if not self.is_gps_enabled():
return {
'name': 'gps_time',
'type': 'INTEGER',
'unit': 'seconds',
'value': str(-1),
}
return self._gpsd.get_gps_time_sensor()
|