File: component.py

package info (click to toggle)
python-stetl 2.0%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 90,156 kB
  • sloc: python: 5,103; xml: 707; sql: 430; makefile: 154; sh: 65
file content (299 lines) | stat: -rw-r--r-- 10,427 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# Component base class for ETL.
#
# Author: Just van den Broecke
#
import os
import sys
from time import time
from .util import Util, ConfigSection
from .packet import FORMAT

log = Util.get_log('component')


class Config:
    """
    Decorator class to tie config values from the .ini file to object instance
    property values. Somewhat like the Python standard @property but with
    the possibility to define default values, typing and making properties required.

    Each property is defined by @Config(type, default, required).
    Basic idea comes from:  https://wiki.python.org/moin/PythonDecoratorLibrary#Cached_Properties
    """

    def __init__(self, ptype=str, default=None, required=False):
        """
        If there are no decorator arguments, the function
        to be decorated is passed to the constructor.
        """
        self.ptype = ptype
        self.default = default
        self.required = required

    def __call__(self, fget, doc=''):
        """
        The __call__ method is not called until the
        decorated function is called. self is returned such that __get__ below is called
        with the Component instance. That allows us to cache the actual property value
        in the Component itself.
        """
        # Save the property name (is the name of the function calling us).
        self.property_name = fget.__name__

        # For Spinx documentation build we need the original function with docstring.
        IS_SPHINX_BUILD = bool(os.getenv('SPHINX_BUILD'))
        if IS_SPHINX_BUILD:
            doc = doc.strip()
            # TODO more detail, example below
            # doc = '``Parameter`` - %s\n\n' % doc
            doc += '* type: %s\n' % str(self.ptype).split("'")[1]
            doc += '* required: %s\n' % self.required
            doc += '* default: %s\n' % self.default

            # if self.value:
            #     doc += '* value: %s\n' % self.value
            # else:
            #     doc += '* required: %s\n' % self.required
            #     doc += '* default: %s\n' % self.default
            #     doc += '* value_range: %s\n' % self.value_range

            fget.__doc__ = '``CONFIG`` %s\n%s' % (fget.__doc__, doc)
            return fget
        else:
            return self

    def __get__(self, comp_inst, owner):
        """ descr.__get__(obj[, type]) -> value """
        if self.property_name not in comp_inst.cfg_vals:
            cfg, name, default_value = comp_inst.cfg, self.property_name, self.default

            # Do type conversion where needed from the string values
            if self.ptype is str:
                value = cfg.get(name, default=default_value)
            elif self.ptype is bool:
                value = cfg.get_bool(name, default=default_value)
            elif self.ptype is list:
                value = cfg.get_list(name, default=default_value)
            elif self.ptype is dict:
                value = cfg.get_dict(name, default=default_value)
            elif self.ptype is int:
                value = cfg.get_int(name, default=default_value)
            elif self.ptype is tuple:
                value = cfg.get_tuple(name, default=default_value)
            else:
                value = cfg.get(name, default=default_value)

            if self.required is True and value is None:
                raise Exception('Config property: %s is required in config for %s' % (name, str(comp_inst)))

            comp_inst.cfg_vals[self.property_name] = value

        return comp_inst.cfg_vals[self.property_name]


class Component:
    """
    Abstract Base class for all Input, Filter and Output Components.

    """

    @Config(ptype=str, default=None, required=False)
    def input_format(self):
        """
        The specific input format if the consumes parameter is a list or the format to be converted to the output_format.
        """
        pass

    @Config(ptype=str, default=None, required=False)
    def output_format(self):
        """
        The specific output format if the produces parameter is a list or the format to which the input format is converted.
        """
        pass

    def __init__(self, configdict, section, consumes=FORMAT.none, produces=FORMAT.none):
        # The raw config values from the cfg file
        self.cfg = ConfigSection(configdict.items(section))

        # The actual typed values as populated within Config Decorator
        self.cfg_vals = dict()
        self.next = None
        self.section = section
        self._max_time = -1
        self._min_time = sys.maxsize
        self._total_time = 0
        self._invoke_count = 0

        # First assume single output provided by derived class
        self._output_format = produces

        # We may have a configured output_format: use that value, when multiple formats it should be in that list
        if self.output_format is not None:
            self._output_format = self.output_format
            if type(produces) is list and self._output_format not in produces:
                raise ValueError('Configured output_format %s not in list: %s' % (self._output_format, str(produces)))
        elif type(produces) is list:
            # No output_format configured and a list: use the first as default
            self._output_format = produces[0]

        # First assume single input provided by derived class
        self._input_format = consumes

        # We may have a configured input_format: use that value, when multiple formats it should be in that list
        if self.input_format is not None:
            self._input_format = self.input_format
            if type(consumes) is list and self._input_format not in consumes:
                raise ValueError('Configured input_format %s not in list: %s' % (self._input_format, str(consumes)))
        elif type(consumes) is list:
            # No input_format configured and a list: use the first as default
            self._input_format = consumes[0]

    def add_next(self, next_component):
        self.next = next_component

        if not self.is_compatible():
            raise ValueError(
                'Incompatible components are linked: %s and %s' % (str(self), str(self.next)))

    # Get our id: currently the [section] name
    def get_id(self):
        return self.section

    # Get last Component in Chain
    def get_last(self):
        last = self
        while last.next:
            last = last.next
            if isinstance(last, list):
                last = last[0]
        return last

    # Check our compatibility with the next Component in the Chain
    def is_compatible(self):

        # Ok, nothing next in Chain
        if self.next is None or self._output_format is FORMAT.none or self.next._input_format == FORMAT.any:
            return True

        # return if our Output is compatible with the next Component's Input
        return self._output_format == self.next._input_format

    def __str__(self):
        return "%s: in=%s out=%s" % (str(self.__class__), self._input_format, self._output_format)

    def process(self, packet):
        # Current processor of packet
        packet.component = self

        start_time = self.timer_start()
        self._invoke_count += 1

        # Do something with the data
        result = self.before_invoke(packet)
        if result is False:
            # Component indicates it does not want the chain to proceed
            self.timer_stop(start_time)
            return packet

        # Do component-specific processing, e.g. read or write or filter
        packet = self.invoke(packet)

        result = self.after_invoke(packet)
        if result is False:
            # Component indicates it does not want the chain to proceed
            self.timer_stop(start_time)
            return packet

        self.timer_stop(start_time)

        # If there is a next component, let it process
        if self.next:
            # Hand-over data (line, doc whatever) to the next component
            packet.format = self._output_format
            packet = self.next.process(packet)

        result = self.after_chain_invoke(packet)
        return packet

    def do_init(self):
        # Some components may do one-time init
        self.init()

        # If there is a next component, let it do its init()
        if self.next:
            self.next.do_init()

    def do_exit(self):
        # Notify all comps that we exit
        self.exit()

        # Simple performance stats in one line (issue #77)
        # Calc average processing time, watch for 0 invoke-case
        avg_time = 0.0
        if self._invoke_count > 0:
            avg_time = self._total_time / self._invoke_count

        log.info("%s invokes=%d time(total, min, max, avg) = %.3f %.3f %.3f %.3f" %
                 (self.__class__.__name__, self._invoke_count,
                  self._total_time, self._min_time, self._max_time,
                  avg_time))

        # If there is a next component, let it do its exit()
        if self.next:
            self.next.do_exit()

    def before_invoke(self, packet):
        """
        Called just before Component invoke.
        """
        return True

    def after_invoke(self, packet):
        """
        Called right after Component invoke.
        """
        return True

    def after_chain_invoke(self, packet):
        """
        Called right after entire Component Chain invoke.
        """
        return True

    def invoke(self, packet):
        """
        Components override for Component-specific behaviour, typically read, filter or write actions.
        """
        return packet

    def init(self):
        """
        Allows derived Components to perform a one-time init.
        """
        pass

    def exit(self):
        """
        Allows derived Components to perform a one-time exit/cleanup.
        """
        pass

    def timer_start(self):
        return time()

    def timer_stop(self, start_time):
        """
        Collect and calculate per-Component performance timing stats.
        :param start_time:
        :return:
        """
        delta_time = time() - start_time

        # Calc timing stats for Component invocation
        self._total_time += delta_time

        if delta_time > self._max_time:
            self._max_time = delta_time

        if delta_time < self._min_time and '%.3f' % delta_time != '0.000':
            self._min_time = delta_time