1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
"""
Tools to check a file locality at EuXFEL
May be used to avoiding hangs on reading files from dCache
if they are not available or stored only on tape
"""
import os
import sys
from collections import defaultdict
import multiprocessing as mp
UNAVAIL = 1
ONTAPE = 2
ONDISK = 4
ANY = 7
DC_LOC_RESP = {
'UNAVAILABLE': UNAVAIL,
'NEARLINE': ONTAPE,
'ONLINE': ONDISK,
'ONLINE_AND_NEARLINE': ONTAPE | ONDISK,
'NOT_ON_DCACHE': ONDISK,
}
LOCMSG = {
0: 'Unknown locality',
1: 'Unavailable',
2: 'Only on tape',
4: 'On disk',
6: 'On disk',
}
def get_locality(path):
""" Returns locality of the file (path) """
basedir, filename = os.path.split(path)
dotcmd = os.path.join(basedir, '.(get)({})(locality)'.format(filename))
try:
with open(dotcmd, 'r') as f:
return path, f.read().strip()
except FileNotFoundError:
return path, 'NOT_ON_DCACHE'
def list_locality(files):
""" Returns locality of the list of files """
with mp.Pool() as p:
yield from p.imap_unordered(get_locality, files)
def print_counts(fpart):
""" Prints the counters of different localities """
n_ondisk = len(fpart['NOT_ON_DCACHE']) + len(fpart['ONLINE_AND_NEARLINE']) + len(fpart['ONLINE'])
n_ontape = len(fpart['NEARLINE'])
n_unavail = len(fpart['UNAVAILABLE'])
print(f"{n_ondisk} on disk, {n_ontape} only on tape, {n_unavail} unavailable ", end='\r')
def silent(fpart):
""" Prints nothing """
pass
def partition(files, cb_disp=silent):
""" Partition files by locality """
fpart = defaultdict(list)
for path, loc in list_locality(files):
fpart[loc].append(path)
cb_disp(fpart)
return fpart
def lc_match(files, accept=ONDISK):
""" Returns files which has accepted locality """
filtered = []
for path, loc in list_locality(files):
code = DC_LOC_RESP.get(loc, 0)
if code & accept:
filtered.append(path)
else:
print(f"Skipping file {path}", file=sys.stderr)
print(f" ({LOCMSG[code]})", file=sys.stderr)
return filtered
def lc_any(files):
""" Returns all files, does nothing """
return files
def lc_ondisk(files):
"""Returns files on disk, excluding any which would be read from tape"""
return lc_match(files, ONDISK)
def lc_avail(files):
"""Returns files which are available on disk or tape
Excludes files which dCache reports are unavailable.
"""
return lc_match(files, ONTAPE | ONDISK)
def check_dir(basedir):
""" Check basedir and prints results """
if os.path.isdir(basedir):
ls = ( os.path.join(basedir, f) for f in os.listdir(basedir) )
files = [ f for f in ls if os.path.isfile(f) ]
elif os.path.isfile(basedir):
files = [ basedir ]
else:
files = []
print(f"Checking {len(files)} files in {basedir}")
fp = partition(files, print_counts)
print("")
retcode = 0
if fp['NEARLINE']:
retcode |= 1
print("Only on tape:")
for file in sorted(fp['NEARLINE']):
print(f" {file}")
if fp['UNAVAILABLE']:
retcode |= 2
print("Unavailable:")
for file in sorted(fp['UNAVAILABLE']):
print(f" {file}")
unknown_locality = set(fp) - set(DC_LOC_RESP)
if unknown_locality:
retcode |= 4
print("Unknown locality:", unknown_locality)
return retcode
from argparse import ArgumentParser
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
ap = ArgumentParser(prog='extra-data-locality', description="Checks locality of files in the directory")
ap.add_argument('path', help="run directory of HDF5 files.")
args = ap.parse_args(argv)
if not os.path.exists(args.path):
print(f"Path '{args.path}' is not found")
return 255
return check_dir(args.path)
if __name__ == "__main__":
sys.exit(main())
|