1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2017, 2018, 2019, 2020 Daniel Estevez <daniel@destevez.net>
# Copyright 2021 The Regents of the University of Colorado
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
import copy
from datetime import datetime
from construct import Adapter, BitsInteger, BitStruct, Container, Enum, \
Flag, GreedyBytes, If, Int8ub, Int16ub, Int32ub, \
Padding, RawCopy, Struct, Switch
from .ax25 import Header
from .cute_bct_fsw import cute_bct_fsw
from .cute_bct_soh import cute_bct_soh
from .cute_pld import cute_pld_sw_stat
PrimaryHeader = BitStruct(
'ccsds_version' / BitsInteger(3),
'packet_type' / Flag,
'secondary_header_flag' / Flag,
'is_stored_data' / Flag,
'APID' / BitsInteger(10),
'grouping_flag' / Enum(BitsInteger(2), GRP_MIDDLE=0, GRP_BEGIN=1,
GRP_END=2, GRP_FIRST_AND_LAST=3),
'sequence_count' / BitsInteger(14),
'packet_length' / BitsInteger(16)
)
class TimeAdapter(Adapter):
def _encode(self, obj, context, path=None):
return Container()
def _decode(self, obj, context, path=None):
offset = datetime(2000, 1, 1, 12) - datetime(1970, 1, 1)
return (datetime.utcfromtimestamp(obj.time_stamp_seconds) + offset)
SecondaryHeaderRaw = Struct(
'time_stamp_seconds' / Int32ub,
'sub_seconds' / Int8ub,
Padding(1)
)
SecondaryHeader = TimeAdapter(
SecondaryHeaderRaw
)
cute_ax25_packet_header = RawCopy(Struct(
'ax25_header' / Header,
'primary_header' / PrimaryHeader,
'secondary_header' / If(
lambda c: c.primary_header.secondary_header_flag,
SecondaryHeader
)
))
cute_ax25_packet_fragment = Struct(
'header' / cute_ax25_packet_header,
'packet' / GreedyBytes
)
cute_ax25_packet_complete = Struct(
'ax25_header' / Header,
'primary_header' / PrimaryHeader,
'secondary_header' / If(
lambda c: c.primary_header.secondary_header_flag,
SecondaryHeader
),
'packet' / Switch(
lambda c: (c.primary_header.APID),
{
(0x55): cute_bct_fsw,
(0x56): cute_bct_soh,
(0x1FF): cute_pld_sw_stat
}
)
)
class FswParserState(Enum):
SEARCH_START = 0
SEARCH_MIDDLE = 1
class CUTE:
"""Telemetry parser for the Colorado Ultraviolet Transit Experiment (CUTE)
This is a stateful parser that reassembles flight software packets which
span multiple AX.25 frames.
"""
def __init__(self):
self.frames = []
self.state = FswParserState.SEARCH_START
# Parses a reassembled frame
def process(self):
reassembled_packet = b''.join([self.frames[0].header.data] +
[frame.packet for frame in self.frames])
self.frames = []
return cute_ax25_packet_complete.parse(reassembled_packet)
def parse(self, packet):
data = cute_ax25_packet_fragment.parse(packet)
if data is None:
return None
primary_header = data.header.value.primary_header
self.frames.append(data)
if self.state == FswParserState.SEARCH_START:
if primary_header.grouping_flag == 'GRP_MIDDLE':
# This means we dropped/missed a frame
self.frames = []
return None
elif primary_header.grouping_flag == 'GRP_BEGIN':
self.state = FswParserState.SEARCH_MIDDLE
return None
elif primary_header.grouping_flag == 'GRP_END':
# This means we dropped/missed a frame
self.frames = []
return None
elif primary_header.grouping_flag == 'GRP_FIRST_AND_LAST':
return self.process()
elif self.state == FswParserState.SEARCH_MIDDLE:
if primary_header.grouping_flag == 'GRP_BEGIN' or \
primary_header.grouping_flag == 'GRP_FIRST_AND_LAST':
# This means we dropped/missed a frame
self.state = FswParserState.SEARCH_START
self.frames = []
return None
prev_header = self.frames[-2].header.value.primary_header
if primary_header.sequence_count != \
(prev_header.sequence_count + 1) % (1 << 14):
# A frame was dropped.
self.state = FswParserState.SEARCH_START
self.frames = []
return None
if primary_header.grouping_flag == 'GRP_MIDDLE':
return None
elif primary_header.grouping_flag == 'GRP_END':
self.state = FswParserState.SEARCH_START
return self.process()
cute_70cm = CUTE()
|