File: etl.py

package info (click to toggle)
python-stetl 1.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 89,988 kB
  • sloc: python: 5,007; xml: 707; sql: 430; makefile: 155; sh: 50
file content (163 lines) | stat: -rwxr-xr-x 5,816 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
#
# Main ETL program.
#
# Author: Just van den Broecke
#
import os
import re
import sys
from ConfigParser import ConfigParser
import version
from util import Util
from chain import Chain
import StringIO

log = Util.get_log('ETL')


class ETL:
    """The main class: builds ETL Chains with connected Components from a config and let them run.

    Usually this class is called via :mod:`main`  but it may be called directly for direct integration.

    """

    CONFIG_DIR = None

    def __init__(self, options_dict, args_dict=None):
        """
        :param options_dict: dictionary with options, now only config_file files with path to config file
        :param args_dict: optional dictionary with arguments to be substituted for symbolic values in config
        :return:

        Assume path to config .ini file is in options dict
        """
        # args_dict is optional and is used to do string substitutions in options_dict.config file

        log.info("INIT - Stetl version is %s" % str(version.__version__))

        self.options_dict = options_dict
        config_file = self.options_dict.get('config_file')

        if config_file is None or not os.path.isfile(config_file):
            log.error('No config file found at: %s' % config_file)
            sys.exit(1)

        ETL.CONFIG_DIR = os.path.dirname(os.path.abspath(config_file))
        log.info("Config/working dir = %s" % ETL.CONFIG_DIR)

        self.configdict = ConfigParser()

        sys.path.append(ETL.CONFIG_DIR)

        config_str = ''
        try:
            # Get config file as string
            log.info("Reading config_file = %s" % config_file)
            f = open(config_file, 'r')
            config_str = f.read()
            f.close()
        except Exception as e:
            log.error("Cannot read config file: err=%s" % str(e))
            raise e

        args_names = list()
        try:
            # Optional: expand symbolic arguments from args_dict and or OS Env
            # ignore errors here as { .. } may appear at random.

            # Parse unique list of argument names from config file string.
            # https://www.machinelearningplus.com/python/python-regex-tutorial-examples/
            args_names = list(set(re.findall('{[A-Z|a-z]\w+}', config_str)))
            args_names = [name.split('{')[1].split('}')[0] for name in args_names]

            # Optional: expand from equivalent env vars
            args_dict = self.env_expand_args_dict(args_dict, args_names)

            # In general all arg names should be present in args dict
            for args_name in args_names:
                if args_name not in args_dict:
                    log.warn("Arg not found in args nor environment: name=%s" % args_name)
                    # raise Exception("name=%s" % args_name)

        except Exception as e:
            log.warn("Expanding config arguments (non fatal yet): %s" % str(e))

        try:
            if args_dict:
                log.info("Substituting %d args in config file from args_dict: %s" % (len(args_names), str(args_names)))

                # Do replacements  see http://docs.python.org/2/library/string.html#formatstrings
                # and render substituted config string
                config_str = config_str.format(**args_dict)

                log.info("Substituting args OK")

        except Exception as e:
            log.error("Error substituting config arguments: err=%s" % str(e))
            raise e

        try:
            # Put Config string into buffer (readfp() needs a readline() method)
            config_buf = StringIO.StringIO(config_str)

            # Parse config from file buffer
            self.configdict.readfp(config_buf, config_file)
        except Exception as e:
            log.error("Error populating config dict from config string: err=%s" % str(e))
            raise e

    def env_expand_args_dict(self, args_dict, args_names):
        """
        Expand values in dict with equivalent values from the
        OS Env. NB vars in OS Env should be prefixed with `STETL_` or `stetl_`
        as to get overrides by accident.

        :return: expanded args_dict or None
        """
        env_dict = os.environ
        for name in env_dict:
            args_key = '_'.join(name.split('_')[1:])
            if name.lower().startswith('stetl_') and args_key in args_names:
                # Get real key, e.g. "STETL_HOST" becomes "HOST"
                # "stetl_host" becomes "host".
                args_value = env_dict[name]
                if not args_dict:
                    args_dict = dict()

                # Set: optionally override any existing value
                args_dict[args_key] = args_value
                log.info("Set/override from env var: %s" % name)

        return args_dict

    def run(self):
        # The main ETL processing
        log.info("START")
        t1 = Util.start_timer("total ETL")

        # Get the ETL Chain pipeline config strings
        # Default is to use the section [etl], but may be overidden on cmd line

        config_section = self.options_dict.get('config_section')
        if config_section is None:
            config_section = 'etl'

        chains_str = self.configdict.get(config_section, 'chains')
        if not chains_str:
            raise ValueError('ETL chain entry not defined in section [etl]')

        # Multiple Chains may be specified in the config
        chains_str_arr = chains_str.split(',')
        for chain_str in chains_str_arr:
            # Build single Chain of components and let it run
            chain = Chain(chain_str, self.configdict)
            chain.assemble()

            # Run the ETL for this Chain
            chain.run()

        Util.end_timer(t1, "total ETL")

        log.info("ALL DONE")