1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
import functools
import logging
import os
from pbcommand.utils import nfs_exists_check
from pbcommand.pb_io import load_report_from_json
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler()) # squash annoying no Handler msg
def trigger_nfs_refresh(ff):
# keeping this for backward compatibility
return nfs_exists_check(ff)
def _validate_resource(func, resource):
"""Validate the existence of a file/dir"""
_ = nfs_exists_check(resource)
if func(resource):
return os.path.abspath(resource)
else:
raise IOError("Unable to find '{f}'".format(f=resource))
validate_file = functools.partial(_validate_resource, os.path.isfile)
validate_dir = functools.partial(_validate_resource, os.path.isdir)
validate_output_dir = functools.partial(_validate_resource, os.path.isdir)
def validate_or(f1, f2, error_msg):
"""
Apply Valid functions f1, then f2 (if failure occurs)
:param error_msg: Default message to print
"""
def wrapper(path):
try:
return f1(path)
except Exception:
try:
return f2(path)
except Exception as e:
log.error(
"{m} {p} \n. {e}".format(
m=error_msg,
p=path,
e=repr(e)))
raise
return wrapper
def validate_report_file(report_file_name):
"""
Raise ValueError if report contains path seps
"""
if not os.path.basename(report_file_name) == report_file_name:
raise ValueError(
"Path separators are not allowed: {r}".format(
r=report_file_name))
return report_file_name
def validate_fofn(fofn):
"""Validate existence of FOFN and files within the FOFN.
:param fofn: (str) Path to File of file names.
:raises: IOError if any file is not found.
:return: (str) abspath of the input fofn
"""
_ = nfs_exists_check(fofn)
if os.path.isfile(fofn):
file_names = fofn_to_files(os.path.abspath(fofn))
log.debug(
"Found {n} files in FOFN {f}.".format(
n=len(file_names), f=fofn))
return os.path.abspath(fofn)
else:
raise IOError("Unable to find {f}".format(f=fofn))
def validate_nonempty_file(resource):
try:
resource_path = validate_file(resource)
except IOError as e:
raise e
try:
with open(resource_path) as handle:
l = [next(handle) for i in range(2)]
except StopIteration:
raise IOError("{f} appears to be empty".format(f=resource))
return resource_path
def fofn_to_files(fofn):
"""Util func to convert a fofn file to a list of files."""
_ = nfs_exists_check(fofn)
if os.path.exists(fofn):
with open(fofn, 'r') as f:
files = [line.strip()
for line in f.readlines() if len(line.strip()) > 0]
for _file in files:
if not os.path.isfile(_file):
# try one more time to find the file by
# performing an NFS refresh
found = nfs_exists_check(_file)
if not found:
raise IOError("Unable to find file '{f}'".format(f=_file))
return list(files)
else:
raise IOError("Unable to find FOFN {f}".format(f=fofn))
def validate_report(file_name):
e = []
base_path = os.path.dirname(file_name)
r = load_report_from_json(file_name)
if r.title is None:
e.append("Report {i} is missing a title".format(i=r.id))
for t in r.tables:
if t.title is None:
e.append("Table {r}.{t} is missing a title".format(r=r.id, t=t.id))
for col in t.columns:
if col.header is None:
e.append("Column {r}.{t}.{c} is missing a header".format(
r=r.id, t=t.id, c=col.id))
lengths = set([len(col.values) for col in t.columns])
if len(lengths) != 1:
e.append("Inconsistent column sizes in table {r}.{t}: {s}".format(
r=r.id, t=t.id, s=",".join(
[str(x) for x in sorted(list(lengths))])))
for pg in r.plotGroups:
if pg.title is None:
e.append("Plot group {r}.{g} is missing a title".format(
r=r.id, g=pg.id))
for plot in pg.plots:
# if plot.caption is None:
# raise ValueError("Plot {r.g.p} is missing a caption".format(
# r=r.id, g=pg.id, p=plot.id))
if plot.image is None:
e.append("Plot {r}.{g}.{p} does not have an image".format(
r=r.id, g=pg.id, p=plot.id))
img_path = os.path.join(base_path, plot.image)
if not os.path.exists(img_path):
e.append(
"The plot image {f} does not exist".format(
f=img_path))
if plot.thumbnail is None:
pass
# raise ValueError("Plot {r.g.p} does not have an thumbnail image".format(
# r=r.id, g=pg.id, p=plot.id))
else:
thumbnail = os.path.join(base_path, plot.thumbnail)
if not os.path.exists(thumbnail):
e.append(
"The thumbnail image {f} does not exist".format(
f=img_path))
if pg.thumbnail is not None:
thumbnail = os.path.join(base_path, pg.thumbnail)
if not os.path.exists(thumbnail):
e.append(
"The thumbnail image {f} does not exist".format(
f=img_path))
if len(e) > 0:
raise ValueError("\n".join(e))
return r
|