File: etl.py

package info (click to toggle)
python-stetl 1.0.9%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 89,428 kB
  • ctags: 720
  • sloc: python: 3,527; xml: 699; sql: 428; makefile: 153; sh: 45
file content (105 lines) | stat: -rwxr-xr-x 3,568 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# -*- coding: utf-8 -*-
#
# Main ETL program.
#
# Author: Just van den Broecke
#
import os
import sys
from ConfigParser import ConfigParser
import version
from util import Util
from chain import Chain
import StringIO

log = Util.get_log('ETL')

class ETL:
    """The main class: builds ETL Chains with connected Components from a config and let them run.

    Usually this class is called via :mod:`main`  but it may be called directly for direct integration.

    """

    CONFIG_DIR = None

    def __init__(self, options_dict, args_dict=None):
        """
        :param options_dict: dictionary with options, now only config_file files with path to config file
        :param args_dict: optional dictionary with arguments to be substituted for symbolic values in config
        :return:

        Assume path to config .ini file is in options dict
        """
        # args_dict is optional and is used to do string substitutions in options_dict.config file

        log.info("INIT - Stetl version is %s" % str(version.__version__))

        self.options_dict = options_dict
        config_file = self.options_dict.get('config_file')

        if config_file is None or not os.path.isfile(config_file):
            log.error('No config file found at: %s' % config_file)
            sys.exit(1)

        ETL.CONFIG_DIR = os.path.dirname(os.path.abspath(config_file))
        log.info("Config/working dir = %s" % ETL.CONFIG_DIR)

        self.configdict = ConfigParser()

        sys.path.append(ETL.CONFIG_DIR)

        try:
            log.info("Reading config_file = %s" % config_file)
            if args_dict:
                log.info("Substituting %d args in config file from args_dict: %s" % (len(args_dict), str(args_dict)))
                # Get config file as string
                f = open(config_file, 'r')
                config_str = f.read()
                f.close()

                # Do replacements  see http://docs.python.org/2/library/string.html#formatstrings
                config_str = config_str.format(**args_dict)

                log.info("Substituting args OK")
                # Put Config string into buffer (readfp() needs a readline() method)
                config_buf = StringIO.StringIO(config_str)

                # Parse config from file buffer
                self.configdict.readfp(config_buf, config_file)
            else:
                # Parse config file directly
                self.configdict.read(config_file)
        except Exception, e:
            log.error("Fatal Error reading config file: err=%s" % str(e))


    def run(self):
        # The main ETL processing
        log.info("START")
        t1 = Util.start_timer("total ETL")

        # Get the ETL Chain pipeline config strings
        # Default is to use the section [etl], but may be overidden on cmd line

        config_section = self.options_dict.get('config_section')
        if config_section is None:
            config_section = 'etl'

        chains_str = self.configdict.get(config_section, 'chains')
        if not chains_str:
            raise ValueError('ETL chain entry not defined in section [etl]')

        # Multiple Chains may be specified in the config
        chains_str_arr = chains_str.split(',')
        for chain_str in chains_str_arr:
            # Build single Chain of components and let it run
            chain = Chain(chain_str.strip(), self.configdict)
            chain.assemble()

            # Run the ETL for this Chain
            chain.run()

        Util.end_timer(t1, "total ETL")

        log.info("ALL DONE")