1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
|
# coding: utf-8
"""Expose data to different interface
ZMQStream explose to a ZeroMQ socket in a REQ/REP pattern.
Copyright (c) 2017, European X-Ray Free-Electron Laser Facility GmbH
All rights reserved.
You should have received a copy of the 3-Clause BSD License along with this
program. If not, see <https://opensource.org/licenses/BSD-3-Clause>
"""
import os.path as osp
import time
from collections import deque
from socket import AF_INET
from warnings import warn
from karabo_bridge import ServerInThread
from karabo_bridge.server import Sender
from psutil import net_if_addrs
from .components import XtdfDetectorBase
from .exceptions import SourceNameError
from .reader import RunDirectory, H5File
from .stacking import stack_detector_data
__all__ = ['ZMQStreamer', 'serve_files']
def find_infiniband_ip():
"""Find the first infiniband IP address
:returns: str
IP of the first infiniband interface if it exists else '*'
"""
addrs = net_if_addrs()
for addr in addrs.get('ib0', ()):
if addr.family == AF_INET:
return addr.address
return '*'
class ZMQStreamer(ServerInThread):
def __init__(self, port, sock='REP', maxlen=10, protocol_version='2.2',
dummy_timestamps=False):
warn("Please use :ref:karabo_bridge.ServerInThread instead",
DeprecationWarning, stacklevel=2)
endpoint = f'tcp://*:{port}'
super().__init__(endpoint, sock=sock, maxlen=maxlen,
protocol_version=protocol_version,
dummy_timestamps=dummy_timestamps)
def _iter_trains(data, merge_detector=False):
"""Iterate over trains in data and merge detector tiles in a single source
:data: DataCollection
:merge_detector: bool
if True and data contains detector data (e.g. AGIPD) individual sources
for each detector tiles are merged in a single source. The new source
name keep the original prefix, but replace the last 2 part with
'/DET/APPEND'. Individual sources are removed from the train data
:yield: dict
train data
"""
det, source_name = None, ''
if merge_detector:
for detector in XtdfDetectorBase.__subclasses__():
try:
det = detector(data)
source_name = f'{det.detector_name}/DET/APPEND'
except SourceNameError:
continue
else:
break
for tid, train_data in data.trains():
if not train_data:
continue
if det is not None:
det_data = {
k: v for k, v in train_data.items()
if k in det.data.detector_sources
}
# get one of the module to reference other datasets
train_data[source_name] = mod_data = next(iter(det_data.values()))
stacked = stack_detector_data(det_data, 'image.data')
mod_data['image.data'] = stacked
mod_data['metadata']['source'] = source_name
if 'image.gain' in mod_data:
stacked = stack_detector_data(det_data, 'image.gain')
mod_data['image.gain'] = stacked
if 'image.mask' in mod_data:
stacked = stack_detector_data(det_data, 'image.mask')
mod_data['image.mask'] = stacked
# remove individual module sources
for src in det.data.detector_sources:
del train_data[src]
yield tid, train_data
def serve_files(path, port, source_glob='*', key_glob='*', **kwargs):
"""Stream data from files through a TCP socket.
Parameters
----------
path: str
Path to the HDF5 file or file folder.
port: str or int
A ZMQ endpoint (e.g. 'tcp://*:44444') or a TCP port to bind the socket
to. Integers or strings of all digits are treated as port numbers.
source_glob: str
Only stream sources matching this glob pattern.
Streaming data selectively is more efficient than streaming everything.
key_glob: str
Only stream keys matching this glob pattern in the selected sources.
append_detector_modules: bool
Combine multi-module detector data in a single data source (sources for
individual modules are removed). The last section of the source name is
replaces with 'APPEND', example:
'SPB_DET_AGIPD1M-1/DET/#CH0:xtdf' -> 'SPB_DET_AGIPD1M-1/DET/APPEND'
Supported detectors: AGIPD, DSSC, LPD
dummy_timestamps: bool
Whether to add mock timestamps if the metadata lacks them.
use_infiniband: bool
Use infiniband interface if available (if port specifies a TCP port)
sock: str
socket type - supported: REP, PUB, PUSH (default REP).
"""
if osp.isdir(path):
data = RunDirectory(path)
else:
data = H5File(path)
data = data.select(source_glob, key_glob)
serve_data(data, port, **kwargs)
def serve_data(data, port, append_detector_modules=False,
dummy_timestamps=False, use_infiniband=False, sock='REP'):
"""Stream data from files through a TCP socket.
Parameters
----------
data: DataCollection
The data to be streamed; should already have sources & keys selected.
port: str or int
A ZMQ endpoint (e.g. 'tcp://*:44444') or a TCP port to bind the socket
to. Integers or strings of all digits are treated as port numbers.
append_detector_modules: bool
Combine multi-module detector data in a single data source (sources for
individual modules are removed). The last section of the source name is
replaces with 'APPEND', example:
'SPB_DET_AGIPD1M-1/DET/#CH0:xtdf' -> 'SPB_DET_AGIPD1M-1/DET/APPEND'
Supported detectors: AGIPD, DSSC, LPD
dummy_timestamps: bool
Whether to add mock timestamps if the metadata lacks them.
use_infiniband: bool
Use infiniband interface if available (if port specifies a TCP port)
sock: str
socket type - supported: REP, PUB, PUSH (default REP).
"""
if isinstance(port, int) or port.isdigit():
endpt = f'tcp://{find_infiniband_ip() if use_infiniband else "*"}:{port}'
else:
endpt = port
sender = Sender(endpt, sock=sock, dummy_timestamps=dummy_timestamps)
print(f'Streamer started on: {sender.endpoint}')
ntrains = len(data.train_ids)
sent_times = deque([time.monotonic()], 10)
count = 0
tid, rate = 0, 0.
def print_update(end='\r'):
print(f'Sent {count}/{ntrains} trains - Train ID {tid} - {rate:.1f} Hz', end=end)
for tid, data in _iter_trains(data, merge_detector=append_detector_modules):
sender.send(data)
count += 1
new_time = time.monotonic()
if count % 5 == 0:
rate = len(sent_times) / (new_time - sent_times[0])
print_update()
sent_times.append(new_time)
print_update(end='\n')
# The karabo-bridge code sets linger to 0 so that it doesn't get stuck if
# the client goes away. But this would also mean that we close the socket
# when the last messages have been queued but not sent. So if we've
# successfully queued all the messages, set linger -1 (i.e. infinite) to
# wait until ZMQ has finished transferring them before the socket is closed.
sender.server_socket.close(linger=-1)
|