File: merger.py

package info (click to toggle)
python-stetl 2.0%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 90,156 kB
  • sloc: python: 5,103; xml: 707; sql: 430; makefile: 154; sh: 65
file content (126 lines) | stat: -rw-r--r-- 4,206 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# Merger Component base class for ETL.
#
# Author: Just van den Broecke
#

import random
from .util import Util
from .component import Component

log = Util.get_log('merger')


class Merger(Component):
    """
    Component that merges multiple Input Components into a single Component.
    Use this for example to combine multiple input streams like API endpoints.
    The Merger will embed Child Components to which actions are delegated.
    A Child Component may be a sub-Chain e.g. (Input|Filter|Filter..) sequence.
    Hence the "next" should be coupled to the last Component in that sub-Chain with
    the degenerate case where the sub-Chain is a single (Input) Component.
    NB this Component can only be used for Inputs.
    """

    def __init__(self, config_dict, child_list):
        # Assemble child list
        self.children = []
        section_name = ''
        for child in child_list:
            section_name += '-%s_%d' % (child.get_id(), random.randrange(0, 100000))

            # A Child can be a sub-Chain: each child is tuple: [0] is first
            # [1] is last in sub-Chain. child[0] === child[1] if child is single Component.
            # Need to remember both first and last in order to link/unlink subchain.
            # So we store the Child as a tuple of (first, last).
            self.children.append((child, child.get_last()))

        # Add ourselves to config for compat with Component
        config_dict.add_section(section_name)

        # We use the in/out formats of first child, will be compat-checked later
        Component.__init__(self, config_dict, section_name, consumes=self.first(self.children[0])._input_format,
                           produces=self.last(self.children[0])._output_format)

        self.end_count = len(self.children)

    def add_next(self, next_component):
        for child in self.children:
            # Couple Child Component's last .next directly to our next
            self.last(child).add_next(next_component)

        # Remember next
        self.next = next_component

    def first(self, child):
        """
        Get first Component in Child sub-Chain.
        :param child:
        :return: first Component
        """
        return child[0]

    def last(self, child):
        """
        Get last Component in Child sub-Chain.
        :param child:
        :return: last Component
        """
        return child[1]

    # Check compatibility with our child Components
    def is_compatible(self):
        for child in self.children:
            # Last in subchain must be compatible
            if not self.last(child).is_compatible():
                return False
        return True

    def process(self, packet):
        # Defer processing to children
        # and track of End-of-Stream Packets

        for child in self.children:
            # Skip inactive child Components
            if not self.last(child).next:
                continue

            # Defer to child
            self.first(child).process(packet)

            # Keep track of End-of-Stream
            if packet.is_end_of_stream():
                # deactivate Child by unlinking
                # otherwise we'll keep getting EoS
                self.last(child).next = None
                self.end_count -= 1

            # Re-init to start afresh again
            packet.init()

        # Only if all children have End-of-Stream
        # declare the Packet returned EoS.
        if self.end_count == 0:
            packet.set_end_of_stream()

        return packet

    def do_init(self):
        for child in self.children:
            # Only init the child, without
            # initing upstream Components via Chain
            self.last(child).next = None
            self.first(child).do_init()
            self.last(child).next = self.next

        # init upstream Components once
        self.next.do_init()

    def do_exit(self):
        for child in self.children:
            # Only exit the child, without
            # exiting upstream Components via Chain
            self.last(child).next = None
            self.first(child).do_exit()

        # exit upstream Components once
        self.next.do_exit()