File: voview.py

package info (click to toggle)
extra-data 1.20.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 952 kB
  • sloc: python: 10,421; makefile: 4
file content (171 lines) | stat: -rw-r--r-- 5,573 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Create & check 'virtual overview' files

These use virtual datasets to present the data from a run as a single file.
"""

import os
import os.path as osp
import re
import sys
from tempfile import TemporaryDirectory

import h5py

from .file_access import FileAccess
from .writer import VirtualFileWriter

DATA_ROOT_DIR = "/gpfs/exfel/exp/"
# Version number for virtual overview format - increment if we need to stop old
# versions of EXtra-data from reading files made by newer versions.
VOVIEW_VERSION = 1


class VirtualOverviewFileWriter(VirtualFileWriter):
    def record_source_files(self):
        grp = self.file.create_group('.source_files')
        names, sizes = [], []
        for fa in self.data.files:
            st = fa.metadata_fstat or os.stat(fa.filename)
            names.append(osp.basename(fa.filename).encode('ascii'))
            sizes.append(st.st_size)

        grp.create_dataset(
            'names', data=names, dtype=h5py.special_dtype(vlen=bytes)
        )
        grp.create_dataset('sizes', data=sizes, dtype='u8')

    def write(self):
        self.record_source_files()
        self.file.attrs['virtual_overview_version'] = VOVIEW_VERSION
        super().write()


def check_sources(overview_file: h5py.File, run_dir):
    g = overview_file['.source_files']
    if not (g['names'].shape == g['sizes'].shape):
        return False  # Basic check that things make sense

    files_now = {f for f in os.listdir(run_dir)
                 if f.endswith('.h5') and (f.lower() != 'overview.h5')}
    files_stored = [p.decode('ascii') for p in g['names'][:]]
    if files_now != set(files_stored):
        return False

    for name, size in zip(files_stored, g['sizes']):
        st = os.stat(osp.join(run_dir, name))
        if st.st_size != size:
            return False

    return True


def voview_paths_for_run(directory):
    paths = [osp.join(directory, 'overview.h5')]
    # After resolving symlinks, data on Maxwell is stored in either
    # GPFS, e.g. /gpfs/exfel/d/proc/SCS/201901/p002212  or
    # dCache, e.g. /pnfs/xfel.eu/exfel/archive/XFEL/raw/SCS/201901/p002212
    # On the online cluster the resolved path stay:
    #   /gpfs/exfel/exp/inst/cycle/prop/(raw|proc)/run
    maxwell_match = re.match(
        #     raw/proc  instr  cycle prop   run
        r'.+/(raw|proc)/(\w+)/(\w+)/(p\d+)/(r\d+)/?$',
        osp.realpath(directory)
    )
    online_match = re.match(
        #     instr cycle prop   raw/proc   run
        r'^.+/(\w+)/(\w+)/(p\d+)/(raw|proc)/(r\d+)/?$',
        osp.realpath(directory)
    )

    if maxwell_match:
        raw_proc, instr, cycle, prop, run_nr = maxwell_match.groups()
    elif online_match:
        instr, cycle, prop, raw_proc, run_nr = online_match.groups()
    else:
        return paths

    fname = f'{raw_proc.upper()}-{run_nr.upper()}-OVERVIEW.h5'
    prop_usr = osp.join(
        DATA_ROOT_DIR, instr, cycle, prop, 'usr'
    )
    if osp.isdir(prop_usr):
        paths.append(
            osp.join(prop_usr, '.extra_data', fname)
        )
    return paths

def find_file_read(run_dir):
    for candidate in voview_paths_for_run(run_dir):
        if osp.isfile(candidate):
            return candidate

def find_file_valid(run_dir):
    for candidate in voview_paths_for_run(run_dir):
        if h5py.is_hdf5(candidate):
            file_acc = FileAccess(candidate)
            version = file_acc.file.attrs.get('virtual_overview_version', 0)
            if version <= VOVIEW_VERSION and check_sources(file_acc.file, run_dir):
                return file_acc

def find_file_write(run_dir):
    for candidate in voview_paths_for_run(run_dir):
        try:
            os.makedirs(osp.dirname(candidate), exist_ok=True)
            candidate_tmp = candidate + '.check'
            with open(candidate_tmp, 'wb'):
                pass
            os.unlink(candidate_tmp)
            return candidate
        except PermissionError:
            pass

    raise PermissionError


def write_atomic(path, data):
    """Write a virtual overview file, then rename it to the final path

    This aims to avoid exposing a partially written file where EXtra-data might
    try to read it.
    """
    dirname, basename = osp.split(path)
    with TemporaryDirectory(prefix=".create-voview-", dir=dirname) as td:
        tmp_filename = osp.join(td, basename)
        try:
            vofw = VirtualOverviewFileWriter(tmp_filename, data)
            vofw.write()
            os.replace(tmp_filename, path)
        except:
            os.unlink(tmp_filename)
            raise


def main(argv=None):
    import argparse

    ap = argparse.ArgumentParser()
    ap.add_argument('--check', action='store_true')
    ap.add_argument('run_dir')
    ap.add_argument('--overview-file')
    args = ap.parse_args(argv)

    if args.check:
        file_path = args.overview_file or find_file_read(args.run_dir)
        print(f"Checking {file_path} ...")
        with h5py.File(file_path, 'r') as f:
            ok = check_sources(f, args.run_dir)
        if ok:
            print("Source files match, overview file can be used")
        else:
            print("Source files don't match, overview file outdated")
            return 1
    else:
        from . import RunDirectory
        file_path = args.overview_file or find_file_write(args.run_dir)
        print("Opening", args.run_dir)
        run = RunDirectory(args.run_dir, _use_voview=False)
        print(f"Creating {file_path} from {len(run.files)} files...")
        write_atomic(file_path, run)

if __name__ == '__main__':
    sys.exit(main())