1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
|
# -*- coding: utf-8 -*-
#
# Component base class for ETL.
#
# Author: Just van den Broecke
#
import os
from util import Util, ConfigSection
from packet import FORMAT
log = Util.get_log('component')
class Config(object):
"""
Decorator class to tie config values from the .ini file to object instance
property values. Somewhat like the Python standard @property but with
the possibility to define default values, typing and making properties required.
Each property is defined by @Config(type, default, required).
Basic idea comes from: https://wiki.python.org/moin/PythonDecoratorLibrary#Cached_Properties
"""
def __init__(self, ptype=str, default=None, required=False):
"""
If there are no decorator arguments, the function
to be decorated is passed to the constructor.
"""
# print "Inside __init__()"
self.ptype = ptype
self.default = default
self.required = required
def __call__(self, fget, doc=None):
"""
The __call__ method is not called until the
decorated function is called. self is returned such that __get__ below is called
with the Component instance. That allows us to cache the actual property value
in the Component itself.
"""
# Save the property name (is the name of the function calling us).
self.property_name = fget.__name__
# print "Inside __call__() name=%s" % self.property_name
# For Spinx documention build we need the original function with docstring.
IS_SPHINX_BUILD = bool(os.getenv('SPHINX_BUILD'))
if IS_SPHINX_BUILD:
fget.__doc__ = '``CONFIG`` - %s' % fget.__doc__
return fget
else:
return self
def __get__(self, comp_inst, owner):
# print "Inside __get__() owner=%s" % owner
""" descr.__get__(obj[, type]) -> value """
if self.property_name not in comp_inst.cfg_vals:
cfg, name, default_value = comp_inst.cfg, self.property_name, self.default
# Do type conversion where needed from the string values
if self.ptype is str:
value = cfg.get(name, default=default_value)
elif self.ptype is bool:
value = cfg.get_bool(name, default=default_value)
elif self.ptype is list:
value = cfg.get_list(name, default=default_value)
elif self.ptype is dict:
value = cfg.get_dict(name, default=default_value)
elif self.ptype is int:
value = cfg.get_int(name, default=default_value)
elif self.ptype is tuple:
value = cfg.get_tuple(name, default=default_value)
else:
value = cfg.get(name, default=default_value)
if self.required is True and value is None:
raise Exception('Config property: %s is required in config for %s' % (name, str(comp_inst)))
comp_inst.cfg_vals[self.property_name] = value
return comp_inst.cfg_vals[self.property_name]
class Component:
"""
Abstract Base class for all Input, Filter and Output Components.
"""
@Config(ptype=str, default=None, required=False)
def input_format(self):
"""
The specific input format if the consumes parameter is a list or the format to be converted to the output_format.
Required: False
Default: None
"""
pass
@Config(ptype=str, default=None, required=False)
def output_format(self):
"""
The specific output format if the produces parameter is a list or the format to which the input format is converted.
Required: False
Default: None
"""
pass
def __init__(self, configdict, section, consumes=FORMAT.none, produces=FORMAT.none):
# The raw config values from the cfg file
self.cfg = ConfigSection(configdict.items(section))
# The actual typed values as populated within Config Decorator
self.cfg_vals = dict()
self.next = None
# First assume single output provided by derived class
self._output_format = produces
# We may have a configured output_format: use that value, when multiple formats it should be in that list
if self.output_format is not None:
self._output_format = self.output_format
if type(produces) is list and self._output_format not in produces:
raise ValueError('Configured output_format %s not in list: %s' % (self._output_format, str(produces)))
elif type(produces) is list:
# No output_format configured and a list: use the first as default
self._output_format = produces[0]
# First assume single input provided by derived class
self._input_format = consumes
# We may have a configured input_format: use that value, when multiple formats it should be in that list
if self.input_format is not None:
self._input_format = self.input_format
if type(consumes) is list and self._input_format not in consumes:
raise ValueError('Configured input_format %s not in list: %s' % (self._input_format, str(consumes)))
elif type(consumes) is list:
# No input_format configured and a list: use the first as default
self._input_format = consumes[0]
def add_next(self, next_component):
self.next = next_component
if not self.is_compatible():
raise ValueError(
'Incompatible components are linked: %s and %s' % (str(self), str(self.next)))
# Check our compatibility with the next Component in the Chain
def is_compatible(self):
# Ok, nothing next in Chain
if self.next is None or self._output_format is FORMAT.none or self.next._input_format == FORMAT.any:
return True
# return if our Output is compatible with the next Component's Input
return self._output_format == self.next._input_format
def __str__(self):
return "%s: in=%s out=%s" % (str(self.__class__), self._input_format, self._output_format)
def process(self, packet):
# Current processor of packet
packet.component = self
# Do something with the data
result = self.before_invoke(packet)
if result is False:
# Component indicates it does not want the chain to proceed
return packet
# Do component-specific processing, e.g. read or write or filter
packet = self.invoke(packet)
result = self.after_invoke(packet)
if result is False:
# Component indicates it does not want the chain to proceed
return packet
# If there is a next component, let it process
if self.next:
# Hand-over data (line, doc whatever) to the next component
packet.format = self._output_format
packet = self.next.process(packet)
result = self.after_chain_invoke(packet)
return packet
def do_init(self):
# Some components may do one-time init
self.init()
# If there is a next component, let it do its init()
if self.next:
self.next.do_init()
def do_exit(self):
# Notify all comps that we exit
self.exit()
# If there is a next component, let it do its exit()
if self.next:
self.next.do_exit()
def before_invoke(self, packet):
"""
Called just before Component invoke.
"""
return True
def after_invoke(self, packet):
"""
Called right after Component invoke.
"""
return True
def after_chain_invoke(self, packet):
"""
Called right after entire Component Chain invoke.
"""
return True
def invoke(self, packet):
"""
Components override for Component-specific behaviour, typically read, filter or write actions.
"""
return packet
def init(self):
"""
Allows derived Components to perform a one-time init.
"""
pass
def exit(self):
"""
Allows derived Components to perform a one-time exit/cleanup.
"""
pass
|