1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
|
#!/usr/bin/env python3
#
# libssc: Library to expose Qualcomm Sensor Core sensors
# Copyright (C) 2022-2025 Dylan Van Assche
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import os
import gi
import sys
import time
import errno
import signal
import argparse
import sysconfig
from multiprocessing import Process
from ctypes import CDLL, c_int, byref, c_char_p, addressof, create_string_buffer, Structure
# Import build path for running unittests
sys.path.append(os.path.abspath('../_build/data'))
from ssc import *
VERSION = '0.3.0'
SSC_SERVICE_ID = 0x190
SSC_SERVICE_VERSION = 1
SSC_SERVICE_INSTANCE = 0
QMI_REQUEST = 0x0
QRTR_REQUEST = 0x6
ERROR_QRTR_PORT_UNAVAILABLE = 1
ERROR_QRTR_SERVER_FAILURE = 2
def signal_handler(sig, frame):
# Clear message buffer
print('')
sys.exit(0)
class SSCMockingServer():
"""
SSC Server mocking class.
"""
_qrtr = None
sock = None
def __init__(self, port: int = 0, verbose: bool = False):
"""
Create a mocking server for the SSC QMI service using libqrtr from a QRTR socket.
Starts up a server on a random port unless specified.
Parameters
----------
port: int
Port number to use, default 0 which results into a random port number.
verbose : bool
Log raw messages. Default False.
"""
self.workers = {}
self._qrtr = None
multiarch = sysconfig.get_config_var('MULTIARCH')
print(f'đ SSC Mocking Server v{VERSION}')
err = None
for path in [
# Local git submodule
os.path.join(os.path.dirname(__file__), './qrtr/_build/lib/libqrtr.so'),
# Installed in OS (Alpine)
'/usr/lib/libqrtr.so.1.1',
# Installed in OS (Debian)
f'/usr/lib/{multiarch}/libqrtr.so.1'
]:
try:
self._qrtr = CDLL(path)
break
except OSError as e:
err = e
continue
err = None
if err:
print(f'â Failed to load libqrtr: {err}', file=sys.stderr)
sys.exit(ERROR_QRTR_PORT_UNAVAILABLE)
print('âšī¸ Creating QRTR socket')
self.sock = self._qrtr.qrtr_open(port)
if self.sock < 0:
print('â Failed to open QRTR port', file=sys.stderr)
sys.exit(ERROR_QRTR_PORT_UNAVAILABLE)
print(f'âšī¸ Starting QRTR server (service {SSC_SERVICE_ID})')
error = self._qrtr.qrtr_new_server(self.sock,
SSC_SERVICE_ID,
SSC_SERVICE_VERSION,
SSC_SERVICE_INSTANCE)
self._verbose = verbose
if self._verbose:
print('âšī¸ Verbose mode enabled')
self._process()
def _debug(self, msg: str):
if self._verbose:
print(msg)
def _receive_msg(self, buffer_size: int):
node = c_int()
port = c_int()
buf = create_string_buffer(buffer_size)
length = 0
length = self._qrtr.qrtr_recvfrom(self.sock, c_char_p(addressof(buf)),
buffer_size, byref(node), byref(port))
if length <= 0:
print(f'â Unable to receive QRTR message (error {error})', file=sys.stderr)
sys.exit(ERROR_QRTR_SERVER_FAILURE)
incoming_msg = ' '.join(list(map(hex, buf[0:length])))
self._debug(f'đĨ Incoming MSG ({SSC_SERVICE_ID}@[{node.value}:{port.value}] {length} bytes): {incoming_msg}')
return node, port, buf[0:length]
def _sensor_measurement(self, uid_high: int, uid_low: int, node, port, transaction_id: int):
print('âšī¸ Sending sensor measurements...')
index = 0
while True:
# Generate sensor measurement
print(f'âšī¸ Measurement {index + 1}')
data = SSC.generate_protobuf_sensor_measurement(uid_high, uid_low, index)
buf = SSC.generate_report_large_indication(data, transaction_id)
# Stop worker when QRTR device dissappears
if self._qrtr.qrtr_sendto(self.sock, node, port, c_char_p(buf), len(buf)) != 0:
break
reply_msg = ' '.join(list(map(hex, buf)))
self._debug(f'đ¤ Indication MSG ({SSC_SERVICE_ID}@[{node.value}:{port.value}] {len(buf)} bytes): {reply_msg}')
# Send measurements at 10 Hz
time.sleep(0.1)
index += 1
def _reply_msg(self, node: int, port: int, incoming_msg: list):
data = []
if incoming_msg[0] == QMI_REQUEST:
# Send QMI Success reply
report_type, data, transaction_id = SSC.parse_message_control_input(incoming_msg)
buf = SSC.generate_message_control_output(transaction_id)
if self._qrtr.qrtr_sendto(self.sock, node, port, c_char_p(buf), len(buf)) != 0:
print(f'â Failed to send QMI Success message to QRTR node', file=sys.stderr)
sys.exit(ERROR_QRTR_SERVER_FAILURE)
reply_msg = ' '.join(list(map(hex, buf)))
self._debug(f'đ¤ Outgoing MSG ({SSC_SERVICE_ID}@[{node.value}:{port.value}] {len(buf)} bytes): {reply_msg}')
# Send QMI indication with actual data
message_id, uid_high, uid_low = SSC.parse_protobuf_client_request(data)
# SUID sensor discovery message
if message_id == SSC_PROTOBUF_DISCOVER_MSG_ID and uid_high == SSC_SUID_SENSOR_UID_HIGH and uid_low == SSC_SUID_SENSOR_UID_LOW:
sensor = SSC.parse_protobuf_discovery_request(data)
print(f'âšī¸ Discovering sensors for data type {sensor}')
data = SSC.generate_protobuf_discovery_response(sensor)
buf = SSC.generate_report_large_indication(data, transaction_id)
if self._qrtr.qrtr_sendto(self.sock, node, port, c_char_p(buf), len(buf)) != 0:
print(f'â Failed to send Protobuf Discovery message to QRTR node', file=sys.stderr)
sys.exit(ERROR_QRTR_SERVER_FAILURE)
reply_msg = ' '.join(list(map(hex, buf)))
self._debug(f'đ¤ Indication MSG ({SSC_SERVICE_ID}@[{node.value}:{port.value}] {len(buf)} bytes): {reply_msg}')
# Sensor attribute request message
elif message_id == SSC_PROTOBUF_GET_ATTRIBUTE_MSG_ID:
print(f'âšī¸ Sensor {uid_high},{uid_low} getting attributes')
data = SSC.generate_protobuf_attributes_response(uid_high, uid_low)
buf = SSC.generate_report_large_indication(data, transaction_id)
if self._qrtr.qrtr_sendto(self.sock, node, port, c_char_p(buf), len(buf)) != 0:
print(f'â Failed to send Protobuf Get Attributes message to QRTR node', file=sys.stderr)
sys.exit(ERROR_QRTR_SERVER_FAILURE)
reply_msg = ' '.join(list(map(hex, buf)))
self._debug(f'đ¤ Indication MSG ({SSC_SERVICE_ID}@[{node.value}:{port.value}] {len(buf)} bytes): {reply_msg}')
# Sensor enabled, send data
elif message_id == SSC_PROTOBUF_ENABLE_REPORT_MSG_ID or message_id == SSC_PROTOBUF_ENABLE_CONTINUOUS_MSG_ID:
uid = f'{uid_high},{uid_low}'
if uid in self.workers:
print(f'â ī¸ Sensor {uid_high},{uid_low} already enabled', file=sys.stderr)
return
print(f'âšī¸ Enabling sensor {uid}')
self.workers[uid] = Process(target=self._sensor_measurement, args=(uid_high, uid_low, node, port, transaction_id))
self.workers[uid].start()
# Sensor disabled, stop sending data
elif message_id == SSC_PROTOBUF_DISABLE_REPORT_MSG_ID:
uid = f'{uid_high},{uid_low}'
if uid not in self.workers:
print(f'â ī¸ Sensor {uid_high},{uid_low} not enabled yet', file=sys.stderr)
return
print(f'âšī¸ Disabling sensor {uid}')
self.workers[uid].terminate()
self.workers[uid].join()
del self.workers[uid]
else:
print(f'â ī¸ Cannot send QMI indication, unsupported Protobuf message ({message_id})', file=sys.stderr)
elif incoming_msg[0] == QRTR_REQUEST:
print('âšī¸ QRTR request')
else:
print('â ī¸ Cannot reply to QMI message which is not a request', file=sys.stderr)
def _process(self, timeout: int = -1, buffer_size: int = 65536):
"""
Waits for incoming QRTR messages to respond to.
If timeout is specified, this method returns immediately after
receiving a message. If not, this method keeps listening continuously.
Parameters
----------
timeout : int
Timeout for polling to wait for new messages in milliseconds.
buffer_size : int
Size of the buffer to receive messages, default 65536 bytes.
"""
print(f'â Waiting for MSGs')
while True:
# Wait for messages
error = self._qrtr.qrtr_poll(self.sock, timeout);
if error < 0 and error != errno.EINTR:
print('â Polling QRTR socket failed', file=sys.stderr)
sys.exit(ERROR_QRTR_SERVER_FAILURE)
# Receive message
node_id, port_id, incoming_msg = self._receive_msg(buffer_size)
# Reply message if needed
self._reply_msg(node_id, port_id, incoming_msg)
# Break out if not listen indefinitely
if timeout > -1:
break
def __del__(self):
"""
Shutdowns sensor measurement process, QRTR server, and socket when object is disposed.
"""
print(f'âšī¸ Stopping sensor measurement workers')
for worker in self.workers:
self.workers[worker].terminate()
self.workers[worker].join()
print(f'âšī¸ Shutting down QRTR server (service {SSC_SERVICE_ID})')
if self._qrtr is not None:
if self.sock:
error = self._qrtr.qrtr_remove_server(self.sock, SSC_SERVICE_ID)
if error:
print('â Failed to shutdown QRTR server', file=sys.stderr)
sys.exit(ERROR_QRTR_SERVER_FAILURE)
print('âšī¸ Closing QRTR socket')
self._qrtr.qrtr_close(self.sock)
print('đ Bye!')
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='ssc-server',
description='SSC QMI service server mocking',
epilog='GPLv3 license, copyright (c) by libssc authors (2024)')
parser.add_argument('-p', '--port', type=int, default=0)
parser.add_argument('--verbose', action='store_true', default=False)
args = parser.parse_args()
signal.signal(signal.SIGINT, signal_handler)
server = SSCMockingServer(args.port, args.verbose)
|