1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
"""Summarise XFEL data in files or folders
"""
import argparse
from collections import defaultdict
import os
import os.path as osp
import re
import sys
from .read_machinery import FilenameInfo
from .reader import H5File, RunDirectory
def describe_file(path, details_for_sources=()):
"""Describe a single HDF5 data file"""
basename = os.path.basename(path)
print(basename, ": Data file")
h5file = H5File(path)
h5file.info(details_for_sources)
def summarise_file(path):
basename = os.path.basename(path)
print(basename, ": Data file")
f = H5File(path)
print(f" {len(f.train_ids)} trains, {len(f.all_sources)} sources")
def describe_run(path, details_for_sources=()):
basename = os.path.basename(path)
print(basename, ": Run directory")
print()
run = RunDirectory(path)
run.info(details_for_sources)
def summarise_run(path, indent=''):
basename = os.path.basename(path)
# Accessing all the files in a run can be slow. To get the number of trains,
# pick one set of segments (time slices of data from the same source).
# This relies on each set of segments recording the same number of trains.
segment_sequences = defaultdict(list)
n_detector = n_other = 0
for f in sorted(os.listdir(path)):
m = re.match(r'(.+)-S\d+\.h5', osp.basename(f))
if m:
segment_sequences[m.group(1)].append(f)
if FilenameInfo(f).is_detector:
n_detector += 1
else:
n_other += 1
if len(segment_sequences) < 1:
raise ValueError("No data files recognised in %s" % path)
# Take the shortest group of segments to make reading quicker
first_group = sorted(segment_sequences.values(), key=len)[0]
train_ids = set()
for f in first_group:
train_ids.update(H5File(osp.join(path, f)).train_ids)
print("{}{} : Run of {:>4} trains, with {:>3} detector files and {:>3} others".format(
indent, basename, len(train_ids), n_detector, n_other
))
def main(argv=None):
ap = argparse.ArgumentParser(
prog='lsxfel', description="Summarise XFEL data in files or folders"
)
ap.add_argument('paths', nargs='*', help="Files/folders to look at")
ap.add_argument('--detail', action='append', default=[],
help="Show details on keys & data for specified sources. "
"This can slow down lsxfel considerably. "
"Wildcard patterns like '*/XGM/*' are allowed, though you may "
"need single quotes to prevent the shell processing them. "
"Can be used more than once to include several patterns. "
"Only used when inspecting a single run or file."
)
args = ap.parse_args(argv)
paths = args.paths or [os.path.abspath(os.getcwd())]
if len(paths) == 1:
path = paths[0]
basename = os.path.basename(os.path.abspath(path.rstrip('/')))
if os.path.isdir(path):
contents = sorted(os.listdir(path))
if any(f.endswith('.h5') for f in contents):
# Run directory
describe_run(path, args.detail)
elif any(re.match(r'r\d+', f) for f in contents):
# Proposal directory, containing runs
print(basename, ": Proposal data directory")
print()
for f in contents:
child_path = os.path.join(path, f)
if re.match(r'r\d+', f) and os.path.isdir(child_path):
summarise_run(child_path, indent=' ')
elif osp.isdir(osp.join(path, 'raw')):
print(basename, ": Proposal directory")
print()
print('{}/raw/'.format(basename))
for f in sorted(os.listdir(osp.join(path, 'raw'))):
child_path = os.path.join(path, 'raw', f)
if re.match(r'r\d+', f) and os.path.isdir(child_path):
summarise_run(child_path, indent=' ')
else:
print(basename, ": Unrecognised directory")
elif os.path.isfile(path):
if path.endswith('.h5'):
describe_file(path, args.detail)
else:
print(basename, ": Unrecognised file")
return 2
else:
print(path, ': File/folder not found')
return 2
else:
exit_code = 0
for path in paths:
basename = os.path.basename(path)
if os.path.isdir(path):
contents = os.listdir(path)
if any(f.endswith('.h5') for f in contents):
# Run directory
summarise_run(path)
elif any(re.match(r'r\d+', f) for f in contents):
# Proposal directory, containing runs
print(basename, ": Proposal directory")
print()
for f in contents:
child_path = os.path.join(path, f)
if re.match(r'r\d+', f) and os.path.isdir(child_path):
summarise_run(child_path, indent=' ')
else:
print(basename, ": Unrecognised directory")
exit_code = 2
elif os.path.isfile(path):
if path.endswith('.h5'):
summarise_file(path)
else:
print(basename, ": Unrecognised file")
exit_code = 2
else:
print(path, ': File/folder not found')
exit_code = 2
return exit_code
if __name__ == '__main__':
sys.exit(main())
|