File: tag_block.py

package info (click to toggle)
python-libais 0.17%2Bgit.20190917.master.e464cf8-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 5,820 kB
  • sloc: cpp: 56,058; python: 11,979; makefile: 537; sh: 466
file content (225 lines) | stat: -rw-r--r-- 5,906 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""Manage a stream of NMEA TAG block messages.

The TAG (Transport, Annotate, and Group) Block messages are defined in NMEA 4.0
section 7.

Some providers violate the NMEA standard by using lower case letters in NMEA
checksums.  This module forces checksums to always be upper case.

TODO(schwehr): Add a queue method to drop old groups caches.
TODO(schwehr): Catch a wider variety of incomplete groups.
TODO(schwehr): Compute running stats in the queue.
"""

import hashlib
import logging
import re

import six
import six.moves.queue as Queue

import ais
from ais import nmea
from ais import nmea_messages
from ais import util
from ais import vdm

logger = logging.getLogger('libais')

# Added a decimal value to time beyond the normal TAG BLOCK spec.
TAG_BLOCK_RE = re.compile(r"""
(\\
(?P<metadata>(
  (
    c:(?P<time>\d{10,15}(\.\d*)?) |  # Receiver Unix time in seconds or millisec
    d:(?P<dest>[^*,\\]{1,15}) |  # Destination
    g:(?P<group>(?P<sentence_num>\d)-(?P<sentence_tot>\d)-(?P<group_id>\d+)) |
    n:(?P<line_num>\d+) |  # Line count
    q:(?P<quality>\w) |  # Orbcomm specific character code
    r:(?P<rel_time>\d+) | # Relative time
    s:(?P<rcvr>[^$*,!\\]{1,15}) |  # Source / station
    t:(?P<text>[^$*,!\\]+) |  # Text string
    T:(?P<text_date>[^$*,!\\]+)  # Orbcomm human readable date
  )[,]?
)+([*](?P<tag_checksum>[0-9A-Fa-f]{2}))?)
\\)(?P<payload>.*)
""", re.VERBOSE)

NUMERIC_FIELDS = (
  'dest',
  'group',
  'group_id',
  'line_num',
  'rel_time',
  'sentence_num',
  'sentence_tot',
  'time'
)


def Parse(data):
  """Unpack a TAG Block line or return None.

  Makes sure that the line matches the regex and the checksum matches.

  Args:
    data: Line of text or a dict from at TAG_BLOCK_RE.

  Returns:
    A NMEA TAG Block dict or None if the line would not parse or has
    an invalid checksum.
  """
  if isinstance(data, str):
    try:
      result = TAG_BLOCK_RE.search(data).groupdict()
    except AttributeError:
      return
  elif isinstance(data, dict):
    result = data
  else:
    return

  result.update({k: util.MaybeToNumber(v)
                 for k, v in six.iteritems(result) if k in NUMERIC_FIELDS})

  actual = nmea.Checksum(result['metadata'])
  expected = result['tag_checksum'].upper()
  if actual != expected:
    return

  return result


class TagQueue(Queue.Queue):
  """Aggregate TAG Block group messages into logical units.

  This queue tracks message lines with the "g" group TAG and finds matching
  lines.  It will pass single line messages straight through.
  """

  def __init__(self):
    self.groups = {}
    self.line_num = 0
    Queue.Queue.__init__(self)

  def put(self, line, line_num=None):
    if line_num is not None:
      self.line_num = line_num
    else:
      self.line_num += 1

    line = line.rstrip()
    match = Parse(line)

    if not match:
      Queue.Queue.put(self, {
          'line_nums': [self.line_num],
          'lines': [line],
      })
      return

    time = util.MaybeToNumber(match['time'])

    if not match['group']:
      msg = {
          'line_nums': [self.line_num],
          'lines': [line],
          'matches': [match],
          'times': [time],
      }
      decoded = DecodeTagSingle(msg)
      if decoded:
        msg['decoded'] = decoded
      else:
        logger.info('Unable to decode. Passing without decoded block.')
        decoded = nmea_messages.DecodeLine(match['payload'])
        if decoded:
          msg['decoded'] = decoded
        else:
          logger.info('No NMEA match for line: %d, %s', self.line_num, line)
      Queue.Queue.put(self, msg)
      return

    sentence_num = int(match['sentence_num'])
    sentence_total = int(match['sentence_tot'])
    group_id = int(match['group_id'])

    if sentence_num == 1:
      self.groups[group_id] = {
          'line_nums': [self.line_num],
          'lines': [line],
          'matches': [match],
          'times': [time],
      }
      return

    if group_id not in self.groups:
      logger.error('group_id not in groups: %d', group_id)
      return

    entry = self.groups[group_id]
    entry['line_nums'].append(self.line_num)
    entry['lines'].append(line)
    entry['matches'].append(match)
    entry['times'].append(time)

    if sentence_num != sentence_total:
      # Found the middle part of a message.
      return

    # Found the final message in a group.
    decoded = DecodeTagMultiple(entry)
    if decoded:
      entry['decoded'] = decoded
    else:
      logger.info('Unable to process: %s', entry)
    Queue.Queue.put(self, entry)
    self.groups.pop(group_id)


def DecodeTagSingle(tag_block_message):
  """Decode the payload of one (but NOT more) NMEA TAG block.

  Args:
    tag_block_message: dict, A dictionary with a matches entry.

  Returns:
    A message dictionary compatible with vdm.BareQueue.
  """
  line = tag_block_message['matches'][0]['payload']
  match = vdm.Parse(line)
  if not match:
    logger.info('Single line NMEA TAG block decode failed for: %s',
                 tag_block_message)
    return

  sentence_total = int(match['sen_tot'])
  if sentence_total != 1:
    logger.error('Multi-line message? %s', tag_block_message)
    return

  body = match['body']
  fill_bits = int(match['fill_bits'])
  try:
    decoded = ais.decode(body, fill_bits)
  except ais.DecodeError as error:
    logger.error('Unable to decode: %s', error)
    return

  decoded['md5'] = hashlib.md5(body.encode('utf-8')).hexdigest()
  return decoded


def DecodeTagMultiple(tag_block_message):
  """Decode a TAG block message that spans multiple lines."""
  payloads = [msg['payload'] for msg in tag_block_message['matches']]

  q = vdm.BareQueue()
  for line in vdm.VdmLines(payloads):
    q.put(line)
  if q.qsize() != 1:
    logger.info('Error: Should get just one message decoded from this: %s',
                 tag_block_message)
    return
  msg = q.get()
  return msg['decoded']