1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
|
#!/usr/bin/env python3
#
# This file is Copyright 2010 by the GPSD project
# SPDX-License-Identifier: BSD-2-clause
"""
cycle_analyzer - perform cycle analysis on GPS log files
This tool analyzes one or more NMEA or JSON files to determine the
cycle sequence of sentences. JSON files must be reports from a gpsd
driver which is without the CYCLE_END_RELIABLE capability and
therefore ships every sentence containing a fix; otherwise the results
will be meaningless because only the end-of-cycle sentences will show
in the JSON.
If a filename argument ends with '.log', and the sentence type in it
is not recognizable, this tool adds '.chk' to the name and tries again
assuming the latter is a JSON dump. Thus, invoking it again *.log in
a directory full of check files will do the right thing.
One purpose of this tool is to determine the end-of-cycle sentence
that a binary-protocol device emits, so the result can be patched into
the driver as a CYCLE_END_RELIABLE capability. To get this, apply the
tool to the JSON output from the driver using the -j switch. It will
ignore everything but tag and timestamp fields, and will also ignore
any NMEA in the file.
Another purpose is to sanity-check the assumptions of the NMEA
end-of-cycle detector. For this purpose, run without -j; if a device
has a regular reporting cycle with a constant end-of-cycle sentence,
this tool will confirm that. Otherwise, it will perform various
checks attempting to find an end-of-cycle marker and report on what it
finds.
When cycle_analyzer reports a split- or variable-cycle device, some arguments
to the -d switch can dump various analysis stages so you can get a better
idea what is going on. These are:
sequence - the entire sequence of dump tag/timestamp pairs from the log
events - show how those reduce to event sequences
bursts - show how sentences are grouped into bursts
trim - show the burst list after the end bursts have been removed
In an event sequence, a '<' is a nornal start of cycle where the
timestamp increments. A '>' is where the timestamp actually
*decreases* between a a sentence and the one that follows. The NMEA
cycle-end detector will ignore the '>' event; it sometimes occurs when
GPZDA starts a cycle, but has no effect on where the actual end of
fix reporting is.
If you see a message saying 'cycle-enders ... also occur in mid-cycle', the
device will confuse the NMEA cycle detector, leading to more reports per cycle
than the ideal.
"""
# This code runs compatibly under Python 2 and 3.x for x >= 2.
# Preserve this property!
from __future__ import absolute_import, print_function, division
import getopt
import gps
import json
import os
import sys
verbose = 0
suppress_regular = False
parse_json = False
class analyze_error(BaseException):
def __init__(self, filename, msg):
self.filename = filename
self.msg = msg
def __repr__(self):
return '%s: %s' % (self.filename, self.msg)
class event(object):
def __init__(self, tag, time=0):
self.tag = tag
self.time = time
def __str__(self):
if self.time == 0:
return self.tag
else:
return self.tag + ":" + self.time
__repr__ = __str__
def tags(lst):
return [x.tag for x in lst]
def extract_from_nmea(filename, lineno, line):
"Extend sequence of tag/timestamp tuples from an NMEA sentence"
hhmmss = {
"RMC": 1,
"GLL": 5,
"GGA": 1,
"GBS": 1,
"PASHR": {"POS": 4},
}
fields = line.split(",")
tag = fields[0]
if tag.startswith("$GP") or tag.startswith("$IN"):
tag = tag[3:]
elif tag[0] == "$":
tag = tag[1:]
field = hhmmss.get(tag)
if isinstance(field, dict):
field = field.get(fields[1])
if field:
timestamp = fields[field]
return [event(tag, timestamp)]
else:
return []
def extract_from_json(filename, lineno, line):
"Extend sequence of tag/timestamp tuples from a JSON dump of a sentence"
if not line.startswith("{"):
return []
try:
sentence = json.loads(line)
if "time" not in sentence:
return []
return [event(gps.polystr(sentence["class"]), "%.2f" %
gps.isotime(sentence["time"]))]
except ValueError as e:
print(line.rstrip(), file=sys.stderr)
print(repr(e), file=sys.stderr)
return []
def extract_timestamped_sentences(fp, json_parse=parse_json):
"Do the basic work of extracting tags and timestamps"
sequence = []
lineno = 0
while True:
line = gps.polystr(fp.readline())
if not line:
break
lineno += 1
if line.startswith("#"):
continue
if line[0] not in ("$", "!", "{"):
raise analyze_error(fp.name, "unknown sentence type.")
if not json_parse and line.startswith("$"):
sequence += extract_from_nmea(fp.name, lineno, line)
elif json_parse and line.startswith("{"):
sequence += extract_from_json(fp.name, lineno, line)
return sequence
def analyze(sequence, name):
"Analyze the cycle sequence of a device from its output logs."
# First, extract tags and timestamps
regular = False
if not sequence:
return
if "sequence" in stages:
print("Raw tag/timestamp sequence")
for e in sequence:
print(e)
# Then, do cycle detection
events = []
out_of_order = False
for i in range(len(sequence)):
this = sequence[i]
if this.time == "" or float(this.time) == 0:
continue
events.append(this)
if i < len(sequence)-1:
next = sequence[i+1]
if float(this.time) < float(next.time):
events.append(event("<"))
if float(this.time) > float(next.time):
events.append(event(">"))
out_of_order = True
if out_of_order and verbose:
sys.stderr.write("%s: has some timestamps out of order.\n" % name)
if "events" in stages:
print("Event list:")
for e in events:
print(e)
# Now group events into bursts
bursts = []
current = []
for e in events + [event('<')]:
if e.tag == '<':
bursts.append(tuple(current))
current = []
else:
current.append(e)
if "bursts" in stages:
print("Burst list:")
for burst in bursts:
print(burst)
# We need 4 cycles because the first and last might be incomplete.
if tags(events).count("<") < 4:
sys.stderr.write("%s: has fewer than 4 cycles.\n" % name)
return
# First try at detecting a regular cycle
unequal = False
for i in range(len(bursts)-1):
if tags(bursts[i]) != tags(bursts[i+1]):
unequal = True
break
if not unequal:
# All bursts looked the same
regular = True
else:
# Trim off first and last bursts, which are likely incomplete.
bursts = bursts[1:-1]
if "trim" in stages:
"After trimming:"
for burst in bursts:
print(burst)
# Now the actual clique analysis
unequal = False
for i in range(len(bursts)-1):
if tags(bursts[i]) != tags(bursts[i+1]):
unequal = True
break
if not unequal:
regular = True
# Should know now if cycle is regular
if regular:
if not suppress_regular:
print("%s: has a regular cycle %s." %
(name, " ".join(tags(bursts[0]))))
else:
# If it was not the case that all cycles matched, then we need
# a minimum of 6 cycles because the first and last might be
# incomplete, and we need at least 4 cycles in the middle to
# have two full ones on split-cycle devices like old Garmins.
if tags(events).count("<") < 6:
sys.stderr.write("%s: variable-cycle log has has fewer "
"than 6 cycles.\n" % name)
return
if verbose > 0:
print("%s: has a split or variable cycle." % name)
cycle_enders = []
for burst in bursts:
if burst[-1].tag not in cycle_enders:
cycle_enders.append(burst[-1].tag)
if len(cycle_enders) == 1:
if not suppress_regular:
print("%s: has a fixed end-of-cycle sentence %s." %
(name, cycle_enders[0]))
else:
print("%s: has multiple cycle-enders %s." %
(name, " ".join(cycle_enders)))
# Sanity check
pathological = []
for ender in cycle_enders:
for burst in bursts:
if ((ender in tags(burst) and
not ender == burst[-1].tag and
ender not in pathological)):
pathological.append(ender)
if pathological:
print("%s: cycle-enders %s also occur in mid-cycle!" %
(name, " ".join(pathological)))
if __name__ == "__main__":
stages = ""
try:
(options, arguments) = getopt.getopt(sys.argv[1:], "d:jsv")
for (switch, val) in options:
if (switch == '-d'): # Debug
stages = val
elif (switch == '-j'): # Interpret JSON, not NMEA
parse_json = True
elif (switch == '-v'): # Verbose
verbose += 1
elif (switch == '-s'): # Suppress logs with no problems
suppress_regular = True
except getopt.GetoptError as msg:
print("cycle_analyzer: " + str(msg))
raise SystemExit(1)
try:
if arguments:
for filename in arguments:
fp = open(filename, 'rb')
try:
sequence = extract_timestamped_sentences(fp)
analyze(sequence, filename)
except analyze_error as e:
if filename.endswith(".log") and os.path.exists(filename +
".chk"):
fp2 = open(filename+".chk", "rb")
try:
sequence = extract_timestamped_sentences(
fp2,
json_parse=True)
analyze(sequence, filename+".chk")
finally:
fp2.close()
else:
print(repr(e), file=sys.stderr)
fp.close()
else:
sequence = extract_timestamped_sentences(sys.stdin)
analyze(sequence, "standard input")
except analyze_error as e:
print(repr(e), file=sys.stderr)
raise SystemExit(1)
# vim: set expandtab shiftwidth=4
|