1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
|
# -*- coding: utf-8 -*-
#
# Component base class for ETL.
#
# Author: Just van den Broecke
#
import os
import sys
from time import time
from util import Util, ConfigSection
from packet import FORMAT
log = Util.get_log('component')
class Config(object):
"""
Decorator class to tie config values from the .ini file to object instance
property values. Somewhat like the Python standard @property but with
the possibility to define default values, typing and making properties required.
Each property is defined by @Config(type, default, required).
Basic idea comes from: https://wiki.python.org/moin/PythonDecoratorLibrary#Cached_Properties
"""
def __init__(self, ptype=str, default=None, required=False):
"""
If there are no decorator arguments, the function
to be decorated is passed to the constructor.
"""
# print "Inside __init__()"
self.ptype = ptype
self.default = default
self.required = required
def __call__(self, fget, doc=''):
"""
The __call__ method is not called until the
decorated function is called. self is returned such that __get__ below is called
with the Component instance. That allows us to cache the actual property value
in the Component itself.
"""
# Save the property name (is the name of the function calling us).
self.property_name = fget.__name__
# print "Inside __call__() name=%s" % self.property_name
# For Spinx documentation build we need the original function with docstring.
IS_SPHINX_BUILD = bool(os.getenv('SPHINX_BUILD'))
if IS_SPHINX_BUILD:
doc = doc.strip()
# TODO more detail, example below
# doc = '``Parameter`` - %s\n\n' % doc
doc += '* type: %s\n' % str(self.ptype).split("'")[1]
doc += '* required: %s\n' % self.required
doc += '* default: %s\n' % self.default
# if self.value:
# doc += '* value: %s\n' % self.value
# else:
# doc += '* required: %s\n' % self.required
# doc += '* default: %s\n' % self.default
# doc += '* value_range: %s\n' % self.value_range
fget.__doc__ = '``CONFIG`` %s\n%s' % (fget.__doc__, doc)
return fget
else:
return self
def __get__(self, comp_inst, owner):
# print "Inside __get__() owner=%s" % owner
""" descr.__get__(obj[, type]) -> value """
if self.property_name not in comp_inst.cfg_vals:
cfg, name, default_value = comp_inst.cfg, self.property_name, self.default
# Do type conversion where needed from the string values
if self.ptype is str:
value = cfg.get(name, default=default_value)
elif self.ptype is bool:
value = cfg.get_bool(name, default=default_value)
elif self.ptype is list:
value = cfg.get_list(name, default=default_value)
elif self.ptype is dict:
value = cfg.get_dict(name, default=default_value)
elif self.ptype is int:
value = cfg.get_int(name, default=default_value)
elif self.ptype is tuple:
value = cfg.get_tuple(name, default=default_value)
else:
value = cfg.get(name, default=default_value)
if self.required is True and value is None:
raise Exception('Config property: %s is required in config for %s' % (name, str(comp_inst)))
comp_inst.cfg_vals[self.property_name] = value
return comp_inst.cfg_vals[self.property_name]
class Component(object):
"""
Abstract Base class for all Input, Filter and Output Components.
"""
@Config(ptype=str, default=None, required=False)
def input_format(self):
"""
The specific input format if the consumes parameter is a list or the format to be converted to the output_format.
"""
pass
@Config(ptype=str, default=None, required=False)
def output_format(self):
"""
The specific output format if the produces parameter is a list or the format to which the input format is converted.
"""
pass
def __init__(self, configdict, section, consumes=FORMAT.none, produces=FORMAT.none):
# The raw config values from the cfg file
self.cfg = ConfigSection(configdict.items(section))
# The actual typed values as populated within Config Decorator
self.cfg_vals = dict()
self.next = None
self.section = section
self._max_time = -1
self._min_time = sys.maxint
self._total_time = 0
self._invoke_count = 0
# First assume single output provided by derived class
self._output_format = produces
# We may have a configured output_format: use that value, when multiple formats it should be in that list
if self.output_format is not None:
self._output_format = self.output_format
if type(produces) is list and self._output_format not in produces:
raise ValueError('Configured output_format %s not in list: %s' % (self._output_format, str(produces)))
elif type(produces) is list:
# No output_format configured and a list: use the first as default
self._output_format = produces[0]
# First assume single input provided by derived class
self._input_format = consumes
# We may have a configured input_format: use that value, when multiple formats it should be in that list
if self.input_format is not None:
self._input_format = self.input_format
if type(consumes) is list and self._input_format not in consumes:
raise ValueError('Configured input_format %s not in list: %s' % (self._input_format, str(consumes)))
elif type(consumes) is list:
# No input_format configured and a list: use the first as default
self._input_format = consumes[0]
def add_next(self, next_component):
self.next = next_component
if not self.is_compatible():
raise ValueError(
'Incompatible components are linked: %s and %s' % (str(self), str(self.next)))
# Get our id: currently the [section] name
def get_id(self):
return self.section
# Get last Component in Chain
def get_last(self):
last = self
while last.next:
last = last.next
if isinstance(last, list):
last = last[0]
return last
# Check our compatibility with the next Component in the Chain
def is_compatible(self):
# Ok, nothing next in Chain
if self.next is None or self._output_format is FORMAT.none or self.next._input_format == FORMAT.any:
return True
# return if our Output is compatible with the next Component's Input
return self._output_format == self.next._input_format
def __str__(self):
return "%s: in=%s out=%s" % (str(self.__class__), self._input_format, self._output_format)
def process(self, packet):
# Current processor of packet
packet.component = self
start_time = self.timer_start()
self._invoke_count += 1
# Do something with the data
result = self.before_invoke(packet)
if result is False:
# Component indicates it does not want the chain to proceed
self.timer_stop(start_time)
return packet
# Do component-specific processing, e.g. read or write or filter
packet = self.invoke(packet)
result = self.after_invoke(packet)
if result is False:
# Component indicates it does not want the chain to proceed
self.timer_stop(start_time)
return packet
self.timer_stop(start_time)
# If there is a next component, let it process
if self.next:
# Hand-over data (line, doc whatever) to the next component
packet.format = self._output_format
packet = self.next.process(packet)
result = self.after_chain_invoke(packet)
return packet
def do_init(self):
# Some components may do one-time init
self.init()
# If there is a next component, let it do its init()
if self.next:
self.next.do_init()
def do_exit(self):
# Notify all comps that we exit
self.exit()
# Simple performance stats in one line (issue #77)
# Calc average processing time, watch for 0 invoke-case
avg_time = 0.0
if self._invoke_count > 0:
avg_time = self._total_time / self._invoke_count
log.info("%s invokes=%d time(total, min, max, avg) = %.3f %.3f %.3f %.3f" %
(self.__class__.__name__, self._invoke_count,
self._total_time, self._min_time, self._max_time,
avg_time))
# If there is a next component, let it do its exit()
if self.next:
self.next.do_exit()
def before_invoke(self, packet):
"""
Called just before Component invoke.
"""
return True
def after_invoke(self, packet):
"""
Called right after Component invoke.
"""
return True
def after_chain_invoke(self, packet):
"""
Called right after entire Component Chain invoke.
"""
return True
def invoke(self, packet):
"""
Components override for Component-specific behaviour, typically read, filter or write actions.
"""
return packet
def init(self):
"""
Allows derived Components to perform a one-time init.
"""
pass
def exit(self):
"""
Allows derived Components to perform a one-time exit/cleanup.
"""
pass
def timer_start(self):
return time()
def timer_stop(self, start_time):
"""
Collect and calculate per-Component performance timing stats.
:param start_time:
:return:
"""
delta_time = time() - start_time
# Calc timing stats for Component invocation
self._total_time += delta_time
if delta_time > self._max_time:
self._max_time = delta_time
if delta_time < self._min_time and '%.3f' % delta_time != '0.000':
self._min_time = delta_time
|