File: space_packet_time_stamp_adder.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (217 lines) | stat: -rw-r--r-- 12,166 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2019 Athanasios Theocharis <athatheoc@gmail.com>
# This was made under ESA Summer of Code in Space 2019
# by Athanasios Theocharis, mentored by Daniel Estevez
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

import numpy
from gnuradio import gr
from datetime import datetime
from datetime import timedelta
import construct
import pmt
import array
from . import space_packet

class space_packet_time_stamp_adder(gr.basic_block):
    """
    Time Stamp Adder (CCSDS 301.0-B-4)
    --- The user should study the time code formats book and fill only the necessary fields.

    --- The user, in general, should check the code of the preferred time format and confirm that the behavior is
    the wanted one. Great care should be taken on the variability of the size. (E.g. in ASCII A time format,
    the user should define the size of the decimal fraction of the second subfield and should change the finalHeader array.)

    --- Automated time only prints a timestamp down to microseconds

    --- When defining an epoch, the user should check that it fits into the bytes that were assigned to hold the difference.
        E.g. If the epoch "0001 January 1" is defined in CDS, then it will not be able to fit today's date into a 16-bit
        DAYS part.
    """
    def __init__(self, input_manual_automatic, time_format, pfield, pfield_extension, time_code_identification_cuc,
                 epoch_year_cuc, epoch_month_cuc, epoch_day_cuc, pfield_extension_extended,rsvd_cuc,
                 time_code_identification_cds, epoch_identification_cds, epoch_year_cds, epoch_month_cds, epoch_day_cds,
                 time_code_identification_ccs, year, month, day, hour, minute, second, microsecond, picosecond, id_time):
        gr.basic_block.__init__(self,
            name="space_packet_time_stamp_adder",
            in_sig=[],
            out_sig=[])

        ##################################################
        # Parameters
        ##################################################
        self.input_manual_automatic = input_manual_automatic
        self.time_format = time_format
        self.pfield = pfield #Checks if the P-Field will be used
        self.pfield_extension = pfield_extension #Checks if the PField will be extended
        self.time_code_identification_cuc = time_code_identification_cuc
        self.epoch_year_cuc = epoch_year_cuc
        self.epoch_month_cuc = epoch_month_cuc
        self.epoch_day_cuc = epoch_day_cuc
        self.basic_time_num_octets_cuc = id_time.basic_time_num_octets_cuc
        self.fractional_time_num_octets_cuc = id_time.fractional_time_num_octets_cuc
        self.pfield_extension_extended = pfield_extension_extended #Checks if the PField Extension will be extended
        self.additional_octets_basic_time_cuc = id_time.additional_basic_time_num_octets_cuc
        self.additional_octets_fractional_time_cuc = id_time.additional_fractional_time_num_octets_cuc
        self.rsvd_cuc = rsvd_cuc
        self.time_code_identification_cds = time_code_identification_cds
        self.epoch_identification_cds = epoch_identification_cds
        self.epoch_year_cds = epoch_year_cds
        self.epoch_month_cds = epoch_month_cds
        self.epoch_day_cds = epoch_day_cds
        self.length_of_day_cds = id_time.len_of_day
        self.length_of_submillisecond_cds = id_time.len_of_submilsecs
        self.time_code_identification_ccs = time_code_identification_ccs
        self.calendar_variation_ccs = id_time.calendar_variation
        self.number_of_subsecond_ccs = id_time.num_of_subsecs
        self.add_z_terminator = id_time.add_z
        self.ascii_dec_num = id_time.ascii_dec
        self.year = year
        self.month = month
        self.day = day
        self.hour = hour
        self.minute = minute
        self.second = second
        self.microsecond = microsecond
        self.picosecond = picosecond
        self.id_time = id_time
        if self.time_code_identification_cuc == 1:
            self.epoch_cuc = datetime(1958, 1, 1)
        else:
            self.epoch_cuc = datetime(self.epoch_year_cuc, self.epoch_month_cuc, self.epoch_day_cuc)

        if self.epoch_identification_cds == 0:
            self.epoch_cds = datetime(1958, 1, 1)
        else:
            self.epoch_cds = datetime(self.epoch_year_cds, self.epoch_month_cds, self.epoch_day_cds)

        ##################################################
        # Blocks
        ##################################################
        self.message_port_register_in(pmt.intern('in'))
        self.set_msg_handler(pmt.intern('in'), self.handle_msg)
        self.message_port_register_out(pmt.intern('out'))

    def handle_msg(self, msg_pmt):
        msg = pmt.cdr(msg_pmt)
        if not pmt.is_u8vector(msg):
            print()
            "[ERROR] Received invalid message type. Expected u8vector"
            return
        packet = pmt.u8vector_elements(msg)
        if self.input_manual_automatic == 1:
            self.now = datetime.utcnow()
            if self.length_of_submillisecond_cds >= 2:
                self.length_of_submillisecond_cds = 1
            if self.number_of_subsecond_ccs >= 4:
                self.length_of_susecond_ccs = 3
        else:
            self.now = datetime(self.year, self.month, self.day, self.hour, self.minute, self.second, self.microsecond)

        finalHeader = []

        if self.time_format == 0: #CUC
            basic_time = 1 + self.basic_time_num_octets_cuc
            fractional_time = self.fractional_time_num_octets_cuc

            if self.pfield == 1: #If it exists

                finalHeader.extend(array.array('B', space_packet.PFieldCUC.build(
                    dict(pfield_extension=self.pfield_extension,
                         time_code_identification=self.time_code_identification_cuc,
                         number_of_basic_time_unit_octets=self.basic_time_num_octets_cuc,
                         number_of_fractional_time_unit_octets=self.fractional_time_num_octets_cuc))).tolist())

                if self.pfield_extension == 1: #If it is extended
                    basic_time += self.additional_octets_basic_time_cuc
                    fractional_time +=self.additional_octets_fractional_time_cuc
                    finalHeader.extend(array.array('B', space_packet.PFieldCUCExtension.build(
                        dict(pfieldextension=self.pfield_extension_extended,
                             number_of_additional_basic_time_unit_octets=self.additional_octets_basic_time_cuc,
                             number_of_additional_fractional_time_unit_octets=self.additional_octets_fractional_time_cuc,
                             reserved_for_mission_definition=self.rsvd_cuc))).tolist())
            temp_diff = self.now - self.epoch_cuc
            total_basic = int(temp_diff.total_seconds())
            total_frac = int((temp_diff.total_seconds() - total_basic)*(256**fractional_time))
            finalHeader.extend(array.array('B', construct.BytesInteger(basic_time).build(total_basic)).tolist())
            finalHeader.extend(array.array('B', construct.BytesInteger(fractional_time).build(total_frac)).tolist())

        elif self.time_format == 1: #CDS
            if self.pfield == 1:
                finalHeader.extend(array.array('B', space_packet.PFieldCDS.build(dict(pfield_extension = self.pfield_extension,
                                                                                 time_code_identification = self.time_code_identification_cds,
                                                                                 epoch_identification = self.epoch_identification_cds,
                                                                                 length_of_day_segment = self.length_of_day_cds,
                                                                                 length_of_submillisecond_segment = self.length_of_submillisecond_cds))).tolist())
            days_len = 2 if self.length_of_day_cds == 0 else 3
            finalHeader.extend(
                array.array('B', construct.BytesInteger(days_len).build((self.now - self.epoch_cds).days)).tolist())
            finalHeader.extend(array.array('B', construct.Int32ub.build(self.now.microsecond/1000)).tolist())
            if self.length_of_submillisecond_cds == 1:
                finalHeader.extend(array.array('B', construct.Int16ub.build(self.now.microsecond % 1000)).tolist())
            elif self.length_of_submillisecond_cds == 2:
                finalHeader.extend(array.array('B', construct.Int32ub.build(self.picosecond)).tolist())

        elif self.time_format == 2: #CCS
            if self.pfield == 1:
                finalHeader.extend(array.array('B', space_packet.PFieldCCS.build(dict(pfield_extension = self.pfield_extension,
                                                                                 time_code_identification = self.time_code_identification_ccs,
                                                                                 calendar_variation_flag = self.calendar_variation_ccs,
                                                                                 resolution=self.number_of_subsecond_ccs))).tolist())
            finalHeader.extend(array.array('B', construct.Int16ub.build(self.now.year)).tolist())
            if self.calendar_variation_ccs == 0:
                finalHeader.extend(array.array('B', construct.Int8ub.build(self.now.month)).tolist())
                finalHeader.extend(array.array('B', construct.Int8ub.build(self.now.day)).tolist())
            else:
                finalHeader.extend(array.array('B', construct.Int16ub.build(self.now.timetuple().tm_yday)).tolist())

            finalHeader.extend(array.array('B', construct.Int8ub.build(self.now.hour)).tolist())
            finalHeader.extend(array.array('B', construct.Int8ub.build(self.now.minute)).tolist())
            finalHeader.extend(array.array('B', construct.Int8ub.build(self.now.second)).tolist())

            if self.number_of_subsecond_ccs >= 1:
                finalHeader.extend(array.array('B', construct.Int8ub.build(self.now.microsecond / 10 ** 4)).tolist())
            if self.number_of_subsecond_ccs >= 2:
                finalHeader.extend(
                    array.array('B', construct.Int8ub.build((self.now.microsecond / 10 ** 2) % 10 ** 2)).tolist())
            if self.number_of_subsecond_ccs >= 3:
                finalHeader.extend(array.array('B', construct.Int8ub.build(self.now.microsecond % 10 ** 2)).tolist())
            if self.number_of_subsecond_ccs >= 4:
                finalHeader.extend(
                    array.array('B', construct.Int8ub.build((self.picosecond % 10 ** 6) / 10 ** 4)).tolist())
            if self.number_of_subsecond_ccs >= 5:
                finalHeader.extend(
                    array.array('B', construct.Int8ub.build((self.picosecond % 10 ** 4) / 10 ** 2)).tolist())
            if self.number_of_subsecond_ccs >= 6:
                finalHeader.extend(array.array('B', construct.Int8ub.build(self.picosecond % 10 ** 2)).tolist())

        elif self.time_format == 3 or self.time_format == 4:
            if(self.ascii_dec_num < 0 or self.ascii_dec_num > 6):
                print("Decimals of ASCII in Time Stamp Adder block should be between 0 and 6. The number was automatically set to 1.")
                self.ascii_dec_num = 1

            if self.time_format == 3: # ASCII A
                arr = self.now.isoformat()
            else: #ASCII B
                arr = self.now.strftime("%Y-%jT%H:%M:%S.%f")

            arr = arr[: -(6 - self.ascii_dec_num)]

            if (self.add_z_terminator == 1):
                arr += 'Z'

            finalHeader= array.array('B', arr).tolist()

        else:
            print("Time Format Unknown")

        finalPacket = numpy.append(finalHeader, packet)
        finalPacket = array.array('B', finalPacket[:])
        finalPacket = pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(len(finalPacket), finalPacket))
        self.message_port_pub(pmt.intern('out'), finalPacket)