File: telemetry_submit.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (92 lines) | stat: -rw-r--r-- 3,401 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2019,2023 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

from gnuradio import gr, blocks

from ... import submit, funcube_submit
from ... import pwsat2_submitter, bme_submitter, bme_ws_submitter, pdu_to_kiss
from ...utils.options_block import options_block


class telemetry_submit(gr.hier_block2, options_block):
    """
    Hierarchical block for telemetry submission

    The input are PDUs with frames

    These are submitted to a telemetry server

    Args:
        server: 'SatNOGS', 'FUNcube', 'PWSat', 'BME', 'BMEWS'
            or 'SIDS' (string)
        norad: NORAD ID (int)
        port: TCP port to connect to (used by HIT) (str)
        url: SIDS URL (used by SIDS) (str)
        config: configuration file from configparser
        options: options from argparse
    """
    def __init__(self, server, norad=None, port=None, url=None,
                 config=None, options=None):
        gr.hier_block2.__init__(
            self,
            'telemetry_submit',
            gr.io_signature(0, 0, 0),
            gr.io_signature(0, 0, 0))
        options_block.__init__(self, options)

        self.message_port_register_hier_in('in')

        if server in ['SatNOGS', 'SIDS']:
            if server == 'SatNOGS':
                url = 'https://db.satnogs.org/api/telemetry/'
            initial_timestamp = getattr(self.options, 'start_time', '')
            self.submit = submit(url, norad,
                                 config['Groundstation']['callsign'],
                                 float(config['Groundstation']['longitude']),
                                 float(config['Groundstation']['latitude']),
                                 initial_timestamp)
        elif server == 'FUNcube':
            url = 'http://data.amsat-uk.org'
            self.submit = funcube_submit(
                url, config['FUNcube']['site_id'],
                config['FUNcube']['auth_code'])
        elif server == 'PWSat':
            self.submit = pwsat2_submitter(
                config['PW-Sat2']['credentials_file'], '')
        elif server == 'BME':
            satellites = {44830: 'atl1', 44832: 'smogp', 47964: 'smog1'}
            satellite = satellites[norad]
            self.submit = bme_submitter(
                config['BME']['user'], config['BME']['password'], satellite)
        elif server == 'BMEWS':
            self.submit = bme_ws_submitter()
        elif server == 'HIT':
            try:
                self.tcp = blocks.socket_pdu(
                    'TCP_CLIENT', '127.0.0.1', port, 10000, False)
            except RuntimeError as e:
                print('Could not connect to telemetry proxy:', e)
                print('Disabling telemetry submission...')
                return
            self.submit = pdu_to_kiss(control_byte=False)
            self.msg_connect((self.submit, 'out'), (self.tcp, 'pdus'))
        else:
            raise ValueError('Unsupported telemetry server')

        self.msg_connect((self, 'in'), (self.submit, 'in'))

    @classmethod
    def add_options(cls, parser):
        """
        Adds telemetry submit specific options to the argparse parser
        """
        parser.add_argument(
            '--start_time', type=str, default='',
            help='Recording start timestamp')