File: csp_zmq_sub.py

package info (click to toggle)
gr-satellites 5.8.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,836 kB
  • sloc: python: 29,546; cpp: 5,448; ansic: 1,247; sh: 118; makefile: 24
file content (51 lines) | stat: -rw-r--r-- 1,446 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright 2025 Daniel Estevez <daniel@destevez.net>
#
# This file is part of gr-satellites
#
# SPDX-License-Identifier: GPL-3.0-or-later
#

import threading

import pmt
from gnuradio import gr
import zmq


class csp_zmq_sub(gr.sync_block):
    """CSP ZMQ SUB block"""
    def __init__(self, address, destinations):
        gr.sync_block.__init__(
            self,
            name='csp_zmq_sub',
            in_sig=[],
            out_sig=[]
        )

        ctx = zmq.Context()
        self.socket = ctx.socket(zmq.SUB)
        self.socket.connect(address)
        for dest in destinations:
            self.socket.setsockopt(zmq.SUBSCRIBE, bytes([dest]))

        self.message_port_register_out(pmt.intern('out'))

        self.run_thread = threading.Thread(target=self.run, daemon=True)
        self.run_thread.start()

    def run(self):
        while True:
            msg = self.socket.recv()
            if len(msg) < 5:
                print('[csp_zmq_sub] message too short; dropping')
            # strip out destination
            msg = msg[1:]
            # byte-swap CSP header, since CSP ZMQ uses CSP headers in opposite
            # endianness compared to over-the-air
            msg = msg[:4][::-1] + msg[4:]
            msg = pmt.cons(pmt.PMT_NIL,
                           pmt.init_u8vector(len(msg), list(msg)))
            self.message_port_pub(pmt.intern('out'), msg)