File: sieve.py

package info (click to toggle)
python-stetl 1.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 89,988 kB
  • sloc: python: 5,007; xml: 707; sql: 430; makefile: 155; sh: 50
file content (98 lines) | stat: -rw-r--r-- 2,881 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Lets data Packets pass-through, "sieve", based on criteria in their data.
# See issue: https://github.com/geopython/stetl/issues/78
#
# A concrete example is AttrValueRecordSieve which sieves records matching
# specific attribute values. One can also think of Sieves based on XPath expressions
# (e.g. for XML, GML), or geospatial, based on for example WFS-like filters like bounding boxes.
#
# Author: Just van den Broecke
#
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT

log = Util.get_log('Sieve')


class Sieve(Filter):
    """
    ABC for specific Sieves that pass-through, "sieve",  Packets based on criteria in their data.
    """

    def __init__(self, configdict, section, consumes, produces):
        Filter.__init__(self, configdict, section, consumes, produces)

    def invoke(self, packet):
        if packet.data is None:
            return packet
        return self.sieve(packet)

    def sieve(self, packet):
        """
        To be implemented in subclasses.
        :param packet:
        :return:
        """
        return packet


class AttrValueRecordSieve(Sieve):
    """
    Sieves by attr/value(s) in Record Packets.
    """

    @Config(ptype=str, required=True)
    def attr_name(self):
        """
        Name of attribute whose value(s) are to be sieved.
        """
        pass

    @Config(ptype=list, default=list(), required=False)
    def attr_values(self):
        """
        Value(s) for attribute to be to sieved. If empty any value is passed through (existence
        of attr_name is criterium).
        """
        pass

    def __init__(self, configdict, section):
        Sieve.__init__(self, configdict, section, consumes=[FORMAT.record_array, FORMAT.record], produces=[FORMAT.record_array, FORMAT.record])

    def sieve(self, packet):
        """
        Filter out Packets that are not matching designated attr value(s).
        :param packet:
        :return:
        """

        # Start with empty result: fill with matching records
        record_data = packet.data
        packet.data = None

        # Data can be list or single record
        if type(record_data) is list:
            packet.data = list()
            for record in record_data:
                if self.matches_attr(record):
                    packet.data.append(record)
        elif type(record_data) is dict:
            if self.matches_attr(record_data):
                packet.data = record_data

        return packet

    def matches_attr(self, record):
        # Attr not even in record: no use going on
        if self.attr_name not in record:
            return False

        # Match if no value
        if not self.attr_values:
            return True

        return record[self.attr_name] in self.attr_values