File: splitter.py

package info (click to toggle)
python-stetl 2.0%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 90,156 kB
  • sloc: python: 5,103; xml: 707; sql: 430; makefile: 154; sh: 65
file content (105 lines) | stat: -rw-r--r-- 2,968 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# Splitter Component base class for ETL.
#
# Author: Just van den Broecke
#

import random
from .util import Util
from .component import Component

log = Util.get_log('splitter')


class Splitter(Component):
    """
    Component that splits a single input to multiple output Components.
    Use this for example to produce multiple output file formats (GML, GeoJSON etc)
    or to publish to multiple remote services (SOS, SensorThings API) or for simple
    debugging: target Output and StandardOutput.
    """

    def __init__(self, config_dict, child_list):
        # Assemble child list
        children = []
        section_name = ''
        for child in child_list:
            section_name += '-%s_%d' % (child.get_id(), random.randrange(0, 100000))
            children.append(child)

        # Add ourselves to config for compat with Component
        config_dict.add_section(section_name)

        # We use the in/out formats of first child, will be compat chcked later
        Component.__init__(self, config_dict, section_name, consumes=children[0]._input_format,
                           produces=children[0]._output_format)

        # Component sets self.next to None...
        self.next = children

    def add_next(self, next_component):
        # We use child list, maybe to be used later
        pass

    # Check our compatibility with our child Components
    def is_compatible(self):
        for comp in self.next:
            if not comp.is_compatible():
                return False
        return True

    def process(self, packet):
        # Defer processing to our child Components
        data = packet.data
        for comp in self.next:
            packet.data = data
            comp.process(packet)
        return packet

    def do_init(self):
        for comp in self.next:
            comp.do_init()

    def do_exit(self):
        # Notify all child comps that we exit
        for comp in self.next:
            comp.do_exit()

    def before_invoke(self, packet):
        """
        Called just before Component invoke.
        """
        for comp in self.next:
            if not comp.before_invoke(packet):
                return False
        return True

    def after_invoke(self, packet):
        """
        Called right after Component invoke.
        """
        for comp in self.next:
            if not comp.after_invoke(packet):
                return False
        return True

    def after_chain_invoke(self, packet):
        """
        Called right after entire Component Chain invoke.
        """
        for comp in self.next:
            if not comp.after_chain_invoke(packet):
                return False
        return True

    def invoke(self, packet):
        for comp in self.next:
            packet = comp.invoke(packet)
        return packet

    def init(self):
        for comp in self.next:
            comp.init()

    def exit(self):
        for comp in self.next:
            comp.exit()