File: validators.py

package info (click to toggle)
python-pbcommand 2.1.1%2Bgit20231020.28d1635-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,016 kB
  • sloc: python: 7,676; makefile: 220; sh: 73
file content (173 lines) | stat: -rw-r--r-- 5,818 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import functools
import logging
import os

from pbcommand.utils import nfs_exists_check
from pbcommand.pb_io import load_report_from_json

log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())  # squash annoying no Handler msg


def trigger_nfs_refresh(ff):
    # keeping this for backward compatibility
    return nfs_exists_check(ff)


def _validate_resource(func, resource):
    """Validate the existence of a file/dir"""
    _ = nfs_exists_check(resource)

    if func(resource):
        return os.path.abspath(resource)
    else:
        raise IOError("Unable to find '{f}'".format(f=resource))


validate_file = functools.partial(_validate_resource, os.path.isfile)
validate_dir = functools.partial(_validate_resource, os.path.isdir)
validate_output_dir = functools.partial(_validate_resource, os.path.isdir)


def validate_or(f1, f2, error_msg):
    """
    Apply Valid functions f1, then f2 (if failure occurs)

    :param error_msg: Default message to print
    """
    def wrapper(path):
        try:
            return f1(path)
        except Exception:
            try:
                return f2(path)
            except Exception as e:
                log.error(
                    "{m} {p} \n. {e}".format(
                        m=error_msg,
                        p=path,
                        e=repr(e)))
                raise

    return wrapper


def validate_report_file(report_file_name):
    """
    Raise ValueError if report contains path seps
    """
    if not os.path.basename(report_file_name) == report_file_name:
        raise ValueError(
            "Path separators are not allowed: {r}".format(
                r=report_file_name))
    return report_file_name


def validate_fofn(fofn):
    """Validate existence of FOFN and files within the FOFN.

    :param fofn: (str) Path to File of file names.
    :raises: IOError if any file is not found.
    :return: (str) abspath of the input fofn
    """
    _ = nfs_exists_check(fofn)

    if os.path.isfile(fofn):
        file_names = fofn_to_files(os.path.abspath(fofn))
        log.debug(
            "Found {n} files in FOFN {f}.".format(
                n=len(file_names), f=fofn))
        return os.path.abspath(fofn)
    else:
        raise IOError("Unable to find {f}".format(f=fofn))


def validate_nonempty_file(resource):
    try:
        resource_path = validate_file(resource)
    except IOError as e:
        raise e
    try:
        with open(resource_path) as handle:
            l = [next(handle) for i in range(2)]
    except StopIteration:
        raise IOError("{f} appears to be empty".format(f=resource))
    return resource_path


def fofn_to_files(fofn):
    """Util func to convert a fofn file to a list of files."""

    _ = nfs_exists_check(fofn)

    if os.path.exists(fofn):
        with open(fofn, 'r') as f:
            files = [line.strip()
                     for line in f.readlines() if len(line.strip()) > 0]

        for _file in files:
            if not os.path.isfile(_file):
                # try one more time to find the file by
                # performing an NFS refresh
                found = nfs_exists_check(_file)
                if not found:
                    raise IOError("Unable to find file '{f}'".format(f=_file))

        return list(files)
    else:
        raise IOError("Unable to find FOFN {f}".format(f=fofn))


def validate_report(file_name):
    e = []
    base_path = os.path.dirname(file_name)
    r = load_report_from_json(file_name)
    if r.title is None:
        e.append("Report {i} is missing a title".format(i=r.id))
    for t in r.tables:
        if t.title is None:
            e.append("Table {r}.{t} is missing a title".format(r=r.id, t=t.id))
        for col in t.columns:
            if col.header is None:
                e.append("Column {r}.{t}.{c} is missing a header".format(
                         r=r.id, t=t.id, c=col.id))
        lengths = set([len(col.values) for col in t.columns])
        if len(lengths) != 1:
            e.append("Inconsistent column sizes in table {r}.{t}: {s}".format(
                     r=r.id, t=t.id, s=",".join(
                         [str(x) for x in sorted(list(lengths))])))
    for pg in r.plotGroups:
        if pg.title is None:
            e.append("Plot group {r}.{g} is missing a title".format(
                     r=r.id, g=pg.id))
        for plot in pg.plots:
            # if plot.caption is None:
            #    raise ValueError("Plot {r.g.p} is missing a caption".format(
            #                     r=r.id, g=pg.id, p=plot.id))
            if plot.image is None:
                e.append("Plot {r}.{g}.{p} does not have an image".format(
                         r=r.id, g=pg.id, p=plot.id))
            img_path = os.path.join(base_path, plot.image)
            if not os.path.exists(img_path):
                e.append(
                    "The plot image {f} does not exist".format(
                        f=img_path))
            if plot.thumbnail is None:
                pass
                # raise ValueError("Plot {r.g.p} does not have an thumbnail image".format(
                #                 r=r.id, g=pg.id, p=plot.id))
            else:
                thumbnail = os.path.join(base_path, plot.thumbnail)
                if not os.path.exists(thumbnail):
                    e.append(
                        "The thumbnail image {f} does not exist".format(
                            f=img_path))
        if pg.thumbnail is not None:
            thumbnail = os.path.join(base_path, pg.thumbnail)
            if not os.path.exists(thumbnail):
                e.append(
                    "The thumbnail image {f} does not exist".format(
                        f=img_path))
    if len(e) > 0:
        raise ValueError("\n".join(e))
    return r