File: lsxfel.py

package info (click to toggle)
extra-data 1.20.0-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 952 kB
  • sloc: python: 10,421; makefile: 4
file content (157 lines) | stat: -rw-r--r-- 5,724 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Summarise XFEL data in files or folders
"""
import argparse
from collections import defaultdict
import os
import os.path as osp
import re
import sys

from .read_machinery import FilenameInfo
from .reader import H5File, RunDirectory


def describe_file(path, details_for_sources=()):
    """Describe a single HDF5 data file"""
    basename = os.path.basename(path)
    print(basename, ": Data file")

    h5file = H5File(path)
    h5file.info(details_for_sources)


def summarise_file(path):
    basename = os.path.basename(path)
    print(basename, ": Data file")

    f = H5File(path)
    print(f"  {len(f.train_ids)} trains, {len(f.all_sources)} sources")


def describe_run(path, details_for_sources=()):
    basename = os.path.basename(path)
    print(basename, ": Run directory")
    print()

    run = RunDirectory(path)
    run.info(details_for_sources)


def summarise_run(path, indent=''):
    basename = os.path.basename(path)

    # Accessing all the files in a run can be slow. To get the number of trains,
    # pick one set of segments (time slices of data from the same source).
    # This relies on each set of segments recording the same number of trains.
    segment_sequences = defaultdict(list)
    n_detector = n_other = 0
    for f in sorted(os.listdir(path)):
        m = re.match(r'(.+)-S\d+\.h5', osp.basename(f))
        if m:
            segment_sequences[m.group(1)].append(f)
            if FilenameInfo(f).is_detector:
                n_detector += 1
            else:
                n_other += 1

    if len(segment_sequences) < 1:
        raise ValueError("No data files recognised in %s" % path)

    # Take the shortest group of segments to make reading quicker
    first_group = sorted(segment_sequences.values(), key=len)[0]
    train_ids = set()
    for f in first_group:
        train_ids.update(H5File(osp.join(path, f)).train_ids)

    print("{}{} : Run of {:>4} trains, with {:>3} detector files and {:>3} others".format(
        indent, basename, len(train_ids), n_detector, n_other
    ))

def main(argv=None):
    ap = argparse.ArgumentParser(
        prog='lsxfel', description="Summarise XFEL data in files or folders"
    )
    ap.add_argument('paths', nargs='*', help="Files/folders to look at")
    ap.add_argument('--detail', action='append', default=[],
        help="Show details on keys & data for specified sources. "
             "This can slow down lsxfel considerably. "
             "Wildcard patterns like '*/XGM/*' are allowed, though you may "
             "need single quotes to prevent the shell processing them. "
             "Can be used more than once to include several patterns. "
             "Only used when inspecting a single run or file."
    )
    args = ap.parse_args(argv)
    paths = args.paths or [os.path.abspath(os.getcwd())]

    if len(paths) == 1:
        path = paths[0]
        basename = os.path.basename(os.path.abspath(path.rstrip('/')))

        if os.path.isdir(path):
            contents = sorted(os.listdir(path))
            if any(f.endswith('.h5') for f in contents):
                # Run directory
                describe_run(path, args.detail)
            elif any(re.match(r'r\d+', f) for f in contents):
                # Proposal directory, containing runs
                print(basename, ": Proposal data directory")
                print()
                for f in contents:
                    child_path = os.path.join(path, f)
                    if re.match(r'r\d+', f) and os.path.isdir(child_path):
                        summarise_run(child_path, indent='  ')
            elif osp.isdir(osp.join(path, 'raw')):
                print(basename, ": Proposal directory")
                print()
                print('{}/raw/'.format(basename))
                for f in sorted(os.listdir(osp.join(path, 'raw'))):
                    child_path = os.path.join(path, 'raw', f)
                    if re.match(r'r\d+', f) and os.path.isdir(child_path):
                        summarise_run(child_path, indent='  ')
            else:
                print(basename, ": Unrecognised directory")
        elif os.path.isfile(path):
            if path.endswith('.h5'):
                describe_file(path, args.detail)
            else:
                print(basename, ": Unrecognised file")
                return 2
        else:
            print(path, ': File/folder not found')
            return 2
    else:
        exit_code = 0
        for path in paths:
            basename = os.path.basename(path)

            if os.path.isdir(path):
                contents = os.listdir(path)
                if any(f.endswith('.h5') for f in contents):
                    # Run directory
                    summarise_run(path)
                elif any(re.match(r'r\d+', f) for f in contents):
                    # Proposal directory, containing runs
                    print(basename, ": Proposal directory")
                    print()
                    for f in contents:
                        child_path = os.path.join(path, f)
                        if re.match(r'r\d+', f) and os.path.isdir(child_path):
                            summarise_run(child_path, indent='  ')
                else:
                    print(basename, ": Unrecognised directory")
                    exit_code = 2
            elif os.path.isfile(path):
                if path.endswith('.h5'):
                    summarise_file(path)
                else:
                    print(basename, ": Unrecognised file")
                    exit_code = 2
            else:
                print(path, ': File/folder not found')
                exit_code = 2

        return exit_code


if __name__ == '__main__':
    sys.exit(main())