File: remote_common.py

package info (click to toggle)
quisk 4.2.32-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,064 kB
  • sloc: python: 22,219; ansic: 19,027; makefile: 38; sh: 2
file content (439 lines) | stat: -rwxr-xr-x 19,532 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# THIS FILE IS FOR USE ON A REMOTELY CONTROLLED "REMOTE_RADIO" COMPUTER
#    RUNNING QUISK TO CONTROL ATTACHED ACTUAL/REAL RADIO HARDWARE.
# IT CONNECTS BY NETWORK TO A SEPARATE "CONTROL_HEAD" COMPUTER ALSO RUNNING QUISK.
#
# This software is Copyright (C) 2021-2022 by Ben Cahill and 2006-2022 by James C. Ahlstrom.,
# and is licensed for use under the GNU General Public License (GPL).
# See http://www.opensource.org.
# Note that there is NO WARRANTY AT ALL.  USE AT YOUR OWN RISK!!
#
# This file, remote_common.py, along with a radio-specific file, e.g.
# remote_softrock.py, remote_hermes.py, or similar, allows a radio-less (control_head) Quisk,
# running on a separate computer, to connect to this (remote_radio) instance of Quisk,
# which has an actual, real radio attached.
#
# The control_head Quisk must use control_common.py, along with a radio-specific file,
# e.g. control_softrock.py, control_hermes.py, or similar, to communicate with this
# remote_radio Quisk via a network connection.
#
# This remote_radio computer should be set up with a static IP address, so that you know
# where to point the control_head.  The control head computer may, however, use dynamic
# addressing; this remote_radio computer will read the control head address when the
# remote control connection is made.
#
# The main control interface between control_head and remote_radio is via a TCP port;
# this uses very low bandwidth.  All functional control, including CW keying, is done via this port.
#
# There are 2 additional ports, both UDP, using low to moderate bandwidth:
# -- Send graph/waterfall data to the control_head
# -- Send radio sound to the control_head, and receive mic sound
# These use sequential port numbers based on the TCP port number self.remote_ctl_base_port.
# If you need to change the default base port number, you can edit the line in this file
# that looks like (without the #):
#
#   self.remote_ctl_base_port = 4585		# Remote Control base TCP port
#   self.graph_data_port = self.remote_ctl_base_port + 1		# UDP port for graph data from the remote
#   self.remote_radio_sound_port = self.remote_ctl_base_port + 2	# UDP port for radio sound and mic samples
#
#
# Make sure to edit the corresponding line in control_common.py to match ports!!
#
# This remote_radio Quisk/computer is assumed to track the connected control_head Quisk/computer;
# no attempt is made by the control_head to verify the remote_radio Quisk's tuning frequency, mode, etc.
# Snap-to Rx tuning for CW works on the control_head Quisk by virtue of graph/waterfall data
# received from the remote_radio Quisk.
#
# To see detailed log output of CW key timing, set DEBUG_CW_JITTER = 1.
# To additionally see, when CW commands are pending, timestamps of when PollCwKey is called
# (e.g. to check thread scheduling behavior), set DEBUG_CW_JITTER = 2.
# To send "perfect" bursts of CW dits from the control_head, set DEBUG_CW_SEND_DITS = 1
# in the control_head's quisk_hardware_control_head.py.

DEBUG_CW_JITTER = 0
DEBUG = 0

from collections import deque	# for CW event queue
import socket, time, traceback, string, hmac, secrets, json
import _quisk as QS
from quisk_widgets import *

class Remot:	# Remote comtrol base class
  def __init__(self, app, conf):
    self.app = app			# Access Quisk class App (Python) functions
    self.conf = conf
    self.token = "abc"
    self.token_time = 0

    self.control_head_ip = None		# IP of control_head compter (read upon connection)
    self.remote_ctl_base_port = 4585	# Base of ports for remote connection (maybe edit this)
    self.remote_ctl_socket = None
    self.remote_ctl_connection = None
    self.remote_ctl_heartbeat_ts = None
    self.remote_ctl_heartbeat_timeout = 10.0	# Close our connection if we don't hear heartbeat from Control Head
    self.graph_data_port = self.remote_ctl_base_port + 1
    self.remote_radio_sound_port = self.remote_ctl_base_port + 2

    self.cw_delay_secs = 0.020	# time delay to absorb WiFi jitter, in secs
    self.cw_phrase_begin_ts = None	# timestamp of beginning of cw phrase
    self.cw_next_event_ts = None
    self.cw_next_keydown = None
    self.cw_event_queue = deque()
    self.cw_key_down = 0		# Tx-enable management
    self.cw_tx_enable = 0

    self.received = ''
    self.cmd_text = None	# cmd received from client (remote head)
    self.cmd = None		# cmd received from client (remote head)
    self.params = None		# params = the string following the command
    self.extended = None
    self.split_mode = 0

    print('Remote Overlay Initialized!')

  def open(self):
    self.token = "abc"
    self.remote_ctl_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.remote_ctl_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.remote_ctl_socket.bind(('', self.remote_ctl_base_port))	# '' == INADDR_ANY
    self.remote_ctl_socket.settimeout(0.0)
    self.remote_ctl_socket.listen(1)	# listen for TCP connection from just one client
    print('Remote Overlay Opened!')
    # Return an informative message for the config screen.
    # This method must return a string showing whether the open succeeded or failed.
    # Here, we are over-writing the string set by quisk_hardware_model.py
    t = f'Quisk Remote Controlled Radio {self.app.width}x{self.app.height} {self.app.graph_width} {self.app.data_width}'
    return t
    #BMC return ret

  def close(self):	# Close the listening socket, then the connection socket
    if self.remote_ctl_socket:
      self.remote_ctl_socket.close()
      self.remote_ctl_socket = None
    self.token = "abc"
    if self.remote_ctl_connection:
      print('Closing Remote Control connection: close')
    self.RemoteCtlClose(True)

  def RemoteCtlOpen(self):
    try:
      self.remote_ctl_connection, address = self.remote_ctl_socket.accept()
    except:
      return
    else:
      self.app.remote_control_slave = True
      QS.set_sparams(remote_control_slave=1)
      self.token = secrets.token_hex(32)
      self.remote_ctl_connection.settimeout(0.0)
      self.remote_ctl_heartbeat_ts = time.time()
      if DEBUG: print('Remote Control connection: ', self.remote_ctl_connection, ' address: ', address)
      self.control_head_ip = address[0]
      print ("Remote control connection from", self.control_head_ip)
      self.RemoteCtlSend("TOKEN;" + self.token + "\n")
      self.token_time = time.time()

  def RemoteCtlClose(self, send_quit):
    self.StopTransmit()
    if self.remote_ctl_connection:
      if send_quit:
        self.RemoteCtlSend('Q\n')
    if self.remote_ctl_connection:
      self.remote_ctl_connection.close()
    self.remote_ctl_connection = None
    QS.stop_remote_radio_remote_sound()
    self.app.remote_control_slave = False
    QS.set_sparams(remote_control_slave=0)

  def RemoteCtlSend(self, text):
    # Send text back to the client
    if not self.remote_ctl_connection:
      return
    if isinstance(text, str):
      text = text.encode('utf-8', errors='ignore')
    try:
      self.remote_ctl_connection.sendall(text)
    except socket.error:
      print('Closing Remote Control connection: sendall() failed.  Sent text:\n    '  + text.decode('utf-8'))
      # NOTE:  Cannot send 'Q' to Control Head here; sendall() isn't working!
      self.RemoteCtlClose(False)

  def ErrParam(self):		# Invalid parameter
    t = 'ERR_PARAM: ' + self.cmd_text + '\n'
    print(t)
    self.RemoteCtlSend(t)
  def ErrUnsupported(self):	# Command recognized but not supported (because of either H/W or configuration)
    t = 'ERR_UNSUPPORTED: ' + self.cmd_text + '\n'
    print(t)
    self.RemoteCtlSend(t)
  def ErrUnrecognized(self):	# Unrecognized command
    t = 'ERR_UNRECOGNIZED_CMD: ' + self.cmd_text + '\n'
    print(t)
    self.RemoteCtlSend(t)
  def ErrBadFormat(self):	# Something wrong with format of command
    t = 'ERR_BADFORMAT: ' + self.cmd_text + '\n'
    print(t)
    self.RemoteCtlSend(t)

  def HeartBeat(self):	# Called at about 10 Hz by the GUI thread
    if self.remote_ctl_connection:
      # Monitor the remote connection via periodic heartbeat from Control Head
      ts = time.time()
      if (ts - self.remote_ctl_heartbeat_ts) > self.remote_ctl_heartbeat_timeout:
        print('Closing Remote Control connection: Lost HEARTBEAT from Control Head')
        self.RemoteCtlClose(True)
    else:
      # Continually try to connect with Control Head
      self.RemoteCtlOpen()

  def FastHeartBeat(self):	# Called frequently by the GUI thread
    """This is the remote slave processing loop, and is called frequently.  It reads and satisfies requests."""
    if not self.remote_ctl_connection:
      return
    try:	# Read any data from the socket
      text = self.remote_ctl_connection.recv(1024)
    except:
      #traceback.print_exc()
      return
    else:					# We got some characters
      if not isinstance(text, str):
        text = text.decode('utf-8')
      self.received += text
    if not '\n' in self.received:	# A complete command ending with newline is not available
      return
    while '\n' in self.received:	# At least one complete command ending with newline *is* available
      cmd_text, self.received = self.received.split('\n', 1)	# Split off the command, save any further characters
      cmd_text = cmd_text.strip()	# Here is our command
      if not cmd_text:
        continue
      self.cmd_text = cmd_text
      args = cmd_text.split(';')	# Split at ';' because some control names have blanks
      command = args[0]
      params = args[1:]
      # TOKEN
      if self.token:
        if command == "TOKEN":
          passw = self.app.local_conf.globals.get("remote_radio_password", "")
          passw = passw.strip()
          if not passw:
            self.RemoteCtlSend("TOKEN_MISSING\n")
            print ("Error: Missing password on remote radio")
            continue
          H = hmac.new(passw.encode('utf-8'), self.token.encode('utf-8'), 'sha3_256')
          del passw
          if hmac.compare_digest(H.hexdigest(), args[1]):
            self.token = None
            print ("Security challenge passed", args[2])
            self.control_head_data_width = int(args[2])
            self.RemoteCtlSend("TOKEN_OK\n")
            self.remote_ctl_heartbeat_ts = time.time()
            QS.start_remote_radio_remote_sound(self.control_head_ip, self.remote_radio_sound_port,
                       self.graph_data_port, self.control_head_data_width)
          else:
            time.sleep(1)
        elif time.time() - self.token_time > 5:
          self.RemoteCtlSend("TOKEN_BAD\n")
          self.RemoteCtlClose(True)
          print ("Security failed")
        continue
      # Check for Quit and Heartbeat before any other commands
      if command == 'QUIT':
        print('Closing Remote Control connection: QUIT from Control Head')
        # NOTE:  Do not send 'Q' to Control Head; sendall() will fail because Control Head already disconnected
        self.RemoteCtlClose(False)
        continue
      # HEARTBEAT
      if command == 'HEARTBEAT':
        self.remote_ctl_heartbeat_ts = time.time()
        continue
      # Ignore the On/Off button, Help buttons, Small window pop buttons
      if command in ("On", "..", "bandBtnGroup", "screenBtnGroup", "modeButns", "Scope", "Config", "RX Filter", "Help"):
        continue
      if DEBUG: print("Remote receive:", cmd_text)
      # Look for radio buttons
      if self.ProcessRadioBtn(command, self.cmd_text):
        continue
      # buttons in idName2Button
      btn = self.app.idName2Button.get(args[0], None)
      if btn:
        #print ("Slave process button", cmd_text, btn.__class__)
        value = int(args[1])
        btn.SetIndex(value, True)
        continue
      # controls in midiControls
      if command in self.app.midiControls:
        ctrl, func = self.app.midiControls[command]
        #print ("Slave Process control", cmd_text, ctrl.__class__, func)
        value = int(args[1])
        if isinstance(ctrl, WrapSlider):
          ctrl.ChangeSlider(value)
        else:
          ctrl.SetValue(value)
          func()
        continue
      # FREQ
      if command == 'FREQ':
        freq, vfo, source, band, rxFreq, var_decim_index = args[1:]
        freq = int(freq)
        vfo = int(vfo)
        rxFreq = int(rxFreq)
        if rxFreq == self.app.rxFreq:
          rxFreq = None
        var_decim_index = int(var_decim_index)
        self.app.ChangeHwFrequency(freq - vfo, vfo, source, band, None, rxFreq)
        if source == "NewDecim":
          self.app.config_screen.config.btn_decimation.SetSelection(var_decim_index)
          sample_rate = self.VarDecimSet(var_decim_index)
          self.app.OnBtnDecimation(rate=sample_rate)
      # Json function call
      elif command == 'JsonAppFunc':
        # Call the function self.app. + func with the specified arguments
        # func = "%s.SetLabel" % self.idName
        # application.Hardware.RemoteCtlSend(f'JsonAppFunc;{json.dumps((func, label, do_cmd, direction))}\n')
        jargs = json.loads(params[0])
        pyobj = self.app
        for nam in jargs[0].split('.'):
          pyobj = getattr(pyobj, nam)
        pyobj(*jargs[1:])
      # AGC and Squelch levels
      elif command == 'AGCSQLCH':
        ctrl = self.app.midiControls["AGCSlider"][0]
        ctrl.SetSlider(value_off=int(args[1]), value_on=int(args[2]))
        self.app.levelSquelch = int(args[3])
        self.app.levelSquelchSSB = int(args[4])
        self.app.split_offset = int(args[5])
      # CW Keying
      elif command == 'CW':
        ts = time.time()
        if len(params) < 2:
          self.ErrParam()
          return
        if params[0] in '01':
          keydown = int(params[0])
        else:
          print('Bad keydown value in CW command:', params[0])
          self.ErrParam()
          return
        cw_event_ts = float(params[1]) / 1000.0     # int msecs to float secs
        if cw_event_ts == 0.0:
          if keydown != 1:
            # 'CW 0 0' == "Force Stop of CW"; clear all queued CW commands, and force CW key up
            print('Forcing stop of CW')
            while len(self.cw_event_queue):
              self.cw_event_queue.popleft()
            self.cw_next_event_ts = None
            self.cw_next_keydown = None
            self.cw_key_down = 0
            QS.set_remote_cwkey(0)
          else:
            # Begin new cw phrase; any prior phrase should be done by now.
            # Set up first cw event to be ready to execute.
            self.cw_begin_phrase_ts = ts + self.cw_delay_secs
        cw_new_event_ts = self.cw_begin_phrase_ts + cw_event_ts 
        if not self.cw_next_event_ts:
          self.cw_next_event_ts = cw_new_event_ts
          self.cw_next_keydown = keydown
          if DEBUG_CW_JITTER: print(f'{ts:10.4f} setting: {keydown} {cw_event_ts:2.3f} {cw_new_event_ts:10.4f}')
        else:
          self.cw_event_queue.append((cw_new_event_ts, keydown))
          if DEBUG_CW_JITTER: print(f'{ts:10.4f} queing:  {keydown} {cw_event_ts:2.3f} {cw_new_event_ts:10.4f}')
      # Menu
      elif command == 'MENU':
        menu_name, item_text, checked = args[1:]
        if item_text == 'Reverse Rx and Tx':
          continue	# No need to call handler, as rxFreq and txFreq are already handled
        menu = getattr(self.app, menu_name)
        nid = menu.item_text2id[item_text]
        menu_item = menu.FindItemById(nid)
        if menu_item.IsCheckable():
          menu_item.Check(int(checked))
        menu.Handler(None, nid)
      else:
        t = 'ERR_UNRECOGNIZED_CMD: %s\n' % cmd_text
        print(t)
        self.RemoteCtlSend(t)
      continue

  def PollCwKey(self):	# Called periodically at HW Poll usec period (typ. 50-200 Hz) by the sound thread
    cw_queue_len = len(self.cw_event_queue)
    if self.cw_next_event_ts or cw_queue_len > 0:
      # We have at least one CW event. If it's time to do so, set the next CW key down/up, look for next CW event.
      ts = time.time()
      if DEBUG_CW_JITTER > 1: print(f'{ts:10.4f}')
      if not self.cw_next_event_ts:
        # Nothing "on deck", but there is something on the cw event queue, so pop it off queue and put it "on deck".
        self.cw_next_event_ts, self.cw_next_keydown = self.cw_event_queue.popleft()
        if DEBUG_CW_JITTER: print(f'{ts:10.4f} queue len: {cw_queue_len}, popping: {self.cw_next_keydown} {self.cw_next_event_ts:10.4f}')
      if ts >= self.cw_next_event_ts:
        if DEBUG_CW_JITTER: print(f'{ts:10.4f} set_remote_cwkey: {self.cw_next_keydown}')
        QS.set_remote_cwkey(self.cw_next_keydown)
        self.cw_key_down = self.cw_next_keydown
        cw_queue_len = len(self.cw_event_queue)
        if cw_queue_len > 0:
          self.cw_next_event_ts, self.cw_next_keydown = self.cw_event_queue.popleft()
          if DEBUG_CW_JITTER: print(f'{ts:10.4f} queue len: {cw_queue_len}, popping: {self.cw_next_keydown} {self.cw_next_event_ts:10.4f}')
        else:
          self.cw_next_event_ts = None
          self.cw_next_keydown = None

  def StopTransmit(self):
    # TODO:  Add code for modes other than CW
    while len(self.cw_event_queue):
      self.cw_event_queue.popleft()
    self.cw_next_event_ts = None
    self.cw_next_keydown = None
    self.cw_key_down = 0
    QS.set_remote_cwkey(0)

  def ProcessRadioBtn(self, command, cmd_text):
    # Large and Small format screens send different button events for radio buttons.
    # Band buttons:
    if command in self.conf.BandList:
      self.app.bandBtnGroup.SetLabel(command, True)
      return True		# We processed this command
    if command in ("Audio", "Time"):
      self.app.bandBtnGroup.SetLabel(command, True)
      return True		# We processed this command
    Mode = self.app.modeButns.SetLabel
    Screen = self.app.screenBtnGroup.SetLabel
    # Mode buttons: process both formats:
    if cmd_text in ("CW U/L;0", "CWL;0", "CWL;1"):
      Mode("CWL", True)
    elif cmd_text in ("CW U/L;1", "CWU;0", "CWU;1"):
      Mode("CWU", True)
    elif cmd_text in ("SSB U/L;0", "LSB;0", "LSB;1"):
      Mode("LSB", True)
    elif cmd_text in ("SSB U/L;1", "USB;0", "USB;1"):
      Mode("USB", True)
    elif cmd_text in ("AM;0", "AM;1"):
      Mode("AM", True)
    elif cmd_text in ("FM;0", "FM;1"):
      Mode("FM", True)
    elif cmd_text in ("DGT;0", "DGT-U;0", "DGT-U;1"):
      Mode("DGT-U", True)
    elif cmd_text in ("DGT;1", "DGT-L;0", "DGT-L;1"):
      Mode("DGT-L", True)
    elif cmd_text in ("DGT;2", "DGT-FM;0", "DGT-FM;1"):
      Mode("DGT-FM", True)
    elif cmd_text in ("DGT;3", "DGT-IQ;0", "DGT-IQ;1"):
      Mode("DGT-IQ", True)
    elif cmd_text in ("FDV;0", "FDV-U;0", "FDV-U;1"):
      Mode("FDV-U", True)
    elif cmd_text in ("FDV;1", "FDV-L;0", "FDV-L;1"):
      Mode("FDV-L", True)
    # Screen buttons: process both formats:
    # Due to ambiguous received commands, the setting is always "Graph" or "WFall" without "P1 or "P2".
    # The P1 and P2 are handled at the control head so this shouldn't matter.
    elif cmd_text in ("Graph;0", "Graph;0", "Graph;1"):
      Screen("Graph", True)
    elif cmd_text in ("Graph;1", "GraphP1;0", "GraphP1;1"):
      Screen("Graph", True)
    elif cmd_text in ("Graph;2", "GraphP2;0", "GraphP2;1"):
      Screen("Graph", True)
    elif cmd_text in ("WFall;0", "WFall;0", "WFall;1"):
      Screen("WFall", True)
    elif cmd_text in ("WFall;1", "WFallP1;0", "WFallP1;1"):
      Screen("WFall", True)
    elif cmd_text in ("WFall;2", "WFallP2;0", "WFallP2;1"):
      Screen("WFall", True)
    else:
      return False
    return True		# We processed this command