File: validation.py

package info (click to toggle)
python-petl 1.7.17-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,224 kB
  • sloc: python: 22,617; makefile: 109; xml: 9
file content (181 lines) | stat: -rw-r--r-- 7,272 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division


import operator
from petl.compat import text_type


from petl.util.base import Table, asindices, Record


def validate(table, constraints=None, header=None):
    """
    Validate a `table` against a set of `constraints` and/or an expected
    `header`, e.g.::

        >>> import petl as etl
        >>> # define some validation constraints
        ... header = ('foo', 'bar', 'baz')
        >>> constraints = [
        ...     dict(name='foo_int', field='foo', test=int),
        ...     dict(name='bar_date', field='bar', test=etl.dateparser('%Y-%m-%d')),
        ...     dict(name='baz_enum', field='baz', assertion=lambda v: v in ['Y', 'N']),
        ...     dict(name='not_none', assertion=lambda row: None not in row),
        ...     dict(name='qux_int', field='qux', test=int, optional=True),
        ... ]
        >>> # now validate a table
        ... table = (('foo', 'bar', 'bazzz'),
        ...          (1, '2000-01-01', 'Y'),
        ...          ('x', '2010-10-10', 'N'),
        ...          (2, '2000/01/01', 'Y'),
        ...          (3, '2015-12-12', 'x'),
        ...          (4, None, 'N'),
        ...          ('y', '1999-99-99', 'z'),
        ...          (6, '2000-01-01'),
        ...          (7, '2001-02-02', 'N', True))
        >>> problems = etl.validate(table, constraints=constraints, header=header)
        >>> problems.lookall()
        +--------------+-----+-------+--------------+------------------+
        | name         | row | field | value        | error            |
        +==============+=====+=======+==============+==================+
        | '__header__' |   0 | None  | None         | 'AssertionError' |
        +--------------+-----+-------+--------------+------------------+
        | 'foo_int'    |   2 | 'foo' | 'x'          | 'ValueError'     |
        +--------------+-----+-------+--------------+------------------+
        | 'bar_date'   |   3 | 'bar' | '2000/01/01' | 'ValueError'     |
        +--------------+-----+-------+--------------+------------------+
        | 'baz_enum'   |   4 | 'baz' | 'x'          | 'AssertionError' |
        +--------------+-----+-------+--------------+------------------+
        | 'bar_date'   |   5 | 'bar' | None         | 'AttributeError' |
        +--------------+-----+-------+--------------+------------------+
        | 'not_none'   |   5 | None  | None         | 'AssertionError' |
        +--------------+-----+-------+--------------+------------------+
        | 'foo_int'    |   6 | 'foo' | 'y'          | 'ValueError'     |
        +--------------+-----+-------+--------------+------------------+
        | 'bar_date'   |   6 | 'bar' | '1999-99-99' | 'ValueError'     |
        +--------------+-----+-------+--------------+------------------+
        | 'baz_enum'   |   6 | 'baz' | 'z'          | 'AssertionError' |
        +--------------+-----+-------+--------------+------------------+
        | '__len__'    |   7 | None  |            2 | 'AssertionError' |
        +--------------+-----+-------+--------------+------------------+
        | 'baz_enum'   |   7 | 'baz' | None         | 'AssertionError' |
        +--------------+-----+-------+--------------+------------------+
        | '__len__'    |   8 | None  |            4 | 'AssertionError' |
        +--------------+-----+-------+--------------+------------------+

    Returns a table of validation problems.

    """  # noqa

    return ProblemsView(table, constraints=constraints, header=header)


Table.validate = validate


class ProblemsView(Table):

    def __init__(self, table, constraints, header):
        self.table = table
        self.constraints = constraints
        self.header = header

    def __iter__(self):
        return iterproblems(self.table, self.constraints, self.header)


def normalize_constraints(constraints, flds):
    """
    This method renders local constraints such that return value is:
      * a list, not None
      * a list of dicts
      * a list of non-optional constraints or optional with defined field

    .. note:: We use a new variable 'local_constraints' because the constraints
              parameter may be a mutable collection, and we do not wish to
              cause side-effects by modifying it locally
    """
    local_constraints = constraints or []
    local_constraints = [dict(**c) for c in local_constraints]
    local_constraints = [
        c for c in local_constraints
        if c.get('field') in flds or
        not c.get('optional')
    ]
    return local_constraints


def iterproblems(table, constraints, expected_header):

    outhdr = ('name', 'row', 'field', 'value', 'error')
    yield outhdr

    it = iter(table)
    try:
        actual_header = next(it)
    except StopIteration:
        actual_header = []

    if expected_header is None:
        flds = list(map(text_type, actual_header))
    else:
        expected_flds = list(map(text_type, expected_header))
        actual_flds = list(map(text_type, actual_header))
        try:
            assert expected_flds == actual_flds
        except Exception as e:
            yield ('__header__', 0, None, None, type(e).__name__)
        flds = expected_flds

    local_constraints = normalize_constraints(constraints, flds)

    # setup getters
    for constraint in local_constraints:
        if 'getter' not in constraint:
            if 'field' in constraint:
                # should ensure FieldSelectionError if bad field in constraint
                indices = asindices(flds, constraint['field'])
                getter = operator.itemgetter(*indices)
                constraint['getter'] = getter

    # generate problems
    expected_len = len(flds)
    for i, row in enumerate(it):
        row = tuple(row)

        # row length constraint
        l = None
        try:
            l = len(row)
            assert l == expected_len
        except Exception as e:
            yield ('__len__', i+1, None, l, type(e).__name__)

        # user defined constraints
        row = Record(row, flds)
        for constraint in local_constraints:
            name = constraint.get('name', None)
            field = constraint.get('field', None)
            assertion = constraint.get('assertion', None)
            test = constraint.get('test', None)
            getter = constraint.get('getter', lambda x: x)
            try:
                target = getter(row)
            except Exception as e:
                # getting target value failed, report problem
                yield (name, i+1, field, None, type(e).__name__)
            else:
                value = target if field else None
                if test is not None:
                    try:
                        test(target)
                    except Exception as e:
                        # test raised exception, report problem
                        yield (name, i+1, field, value, type(e).__name__)
                if assertion is not None:
                    try:
                        assert assertion(target)
                    except Exception as e:
                        # assertion raised exception, report problem
                        yield (name, i+1, field, value, type(e).__name__)