1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
#
# Copyright 2020 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
This module contains the interface for providing data to a simulator
stream and receiving data from a simulator stream.
"""
import importlib.util
sources = {}
sinks = {}
def cli_source(cls):
"""This decorator adds a class to the global list of SampleSources"""
sources[cls.__name__] = cls
return cls
def cli_sink(cls):
"""This decorator adds a class to the global list of SampleSinks"""
sinks[cls.__name__] = cls
return cls
name_index = 0
module_lookup = {}
def from_import_path(class_name, import_path):
global name_index
global module_lookup
module = None
if import_path in module_lookup:
module = module_lookup[import_path]
else:
spec = importlib.util.spec_from_file_location("simsample." + str(name_index), import_path)
name_index =+ 1
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
module_lookup[import_path] = module
return getattr(module, class_name)
class SampleSource:
"""This class defines the interface of a SampleSource. It
provides samples to the simulator which are then sent over the
network to a UHD client.
"""
def fill_packet(self, packet, payload_size):
"""This method should fill the packet with enough samples to
make its payload payload_size bytes long.
Returning None signals that this source is exhausted.
"""
raise NotImplementedError()
def close(self):
"""Use this to clean up any resources held by the object"""
raise NotImplementedError()
class SampleSink:
"""This class provides the interface of a SampleSink. It serves
as a destination for smaples received over the network from a
UHD client.
"""
def accept_packet(self, packet):
"""Called whenever a new packet is received"""
raise NotImplementedError()
def close(self):
"""Use this to clean up any resources held by the object"""
raise NotImplementedError()
@cli_source
@cli_sink
class NullSamples(SampleSource, SampleSink):
"""This combination source/sink simply provides an infinite
number of samples with a value of zero. You may optionally provide
a log object which will enable debug output.
"""
def __init__(self, log=None):
self.log = log
def fill_packet(self, packet, payload_size):
if self.log is not None:
self.log.debug("Null Source called, providing {} bytes of zeroes".format(payload_size))
payload = bytes(payload_size)
packet.set_payload_bytes(payload)
return packet
def accept_packet(self, packet):
if self.log is not None:
self.log.debug("Null Source called, accepting {} bytes of payload"
.format(len(packet.get_payload_bytes())))
def close(self):
pass
class IOSource(SampleSource):
"""This adaptor class creates a sample source using a read object
that provides a read(# of bytes) function.
(e.g. the result of an open("<filename>", "rb") call)
"""
def __init__(self, read):
self.read_obj = read
def fill_packet(self, packet, payload_size):
payload = self.read_obj.read(payload_size)
if len(payload) == 0:
return None
packet.set_payload_bytes(payload)
return packet
def close(self):
self.read_obj.close()
class IOSink(SampleSink):
"""This adaptor class creates a sample sink using a write object
that provides a write(bytes) function.
(e.g. the result of an open("<filename>", "wb") call)
"""
def __init__(self, write):
self.write_obj = write
def accept_packet(self, packet):
payload = packet.get_payload_bytes()
written = self.write_obj.write(bytes(payload))
assert written == len(payload)
def close(self):
self.write_obj.close()
@cli_source
class FileSource(IOSource):
"""This class creates a SampleSource using a file path"""
def __init__(self, read_file, repeat=False):
self.open = lambda: open(read_file, "rb")
if isinstance(repeat, bool):
self.repeat = repeat
else:
self.repeat = repeat == "True"
read = self.open()
super().__init__(read)
def fill_packet(self, packet, payload_size):
payload = self.read_obj.read(payload_size)
if len(payload) == 0:
if self.repeat:
self.read_obj.close()
self.read_obj = self.open()
payload = self.read_obj.read(payload_size)
else:
return None
packet.set_payload_bytes(payload)
return packet
@cli_sink
class FileSink(IOSink):
"""This class creates a SampleSink using a file path"""
def __init__(self, write_file):
write = open(write_file, "wb")
super().__init__(write)
|