File: archiveexpander.py

package info (click to toggle)
python-stetl 2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 90,360 kB
  • sloc: python: 5,618; xml: 707; sql: 430; makefile: 147; sh: 71
file content (127 lines) | stat: -rw-r--r-- 3,817 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Expands an archive file into a collection of files.
#
# Author: Just van den Broecke 2021
#
import os.path
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT

log = Util.get_log('archiveexpander')


class ArchiveExpander(Filter):
    """
    Abstract Base Class.
    Expands an archive file into a collection of files.

    consumes=FORMAT.string, produces=FORMAT.string
    """

    # Start attribute config meta

    @Config(ptype=str, default='temp_dir', required=True)
    def target_dir(self):
        """
        Target directory to write the extracted files to.
        """
        pass

    @Config(ptype=bool, default=False, required=False)
    def remove_input_file(self):
        """
        Delete input archive file when the chain has been completed?
        """
        pass

    @Config(ptype=bool, default=True, required=False)
    def clear_target_dir(self):
        """
        Delete the files from the target directory  when the chain has been completed?
        """
        pass

    # End attribute config meta

    # Constructor
    def __init__(self, configdict, section, consumes, produces):
        Filter.__init__(self, configdict, section, consumes=consumes, produces=produces)
        self.input_archive_file = None
        if not os.path.exists(self.target_dir):
            os.mkdir(self.target_dir)

    def remove_file(self, file_path):
        if os.path.isfile(file_path):
            os.remove(file_path)

    def wipe_dir(self, dir_path):
        if os.path.isdir(dir_path):
            for file_object in os.listdir(dir_path):
                file_object_path = os.path.join(dir_path, file_object)
                if os.path.isdir(file_object_path):
                    self.wipe_dir(file_object_path)
                    os.rmdir(file_object_path)
                    return

                os.remove(file_object_path)

    def expand_archive(self, packet):
        log.error('Only classes derived from ArchiveExpander can be used!')

    def invoke(self, packet):

        if packet.data is None:
            log.info("Input data is empty")
            return packet

        # Optionally clear target dir
        self.wipe_dir(self.target_dir)

        self.input_archive_file = packet.data

        # Let derived class provide archive expansion (.zip, .tar etc)
        self.expand_archive(self.input_archive_file)
        if not os.listdir(self.target_dir):
            log.warn('No expanded files in {}'.format(self.target_dir))
            packet.data = None
            return packet

        # ASSERT: expanded files in target dir
        file_count = len(os.listdir(self.target_dir))
        log.info('Expanded {} into {} OK - filecount={}'.format(
            self.input_archive_file, self.target_dir, file_count))

        # Output the target dir path where expanded files are found
        packet.data = self.target_dir

        return packet

    def after_chain_invoke(self, packet):
        if self.remove_input_file:
            self.remove_file(self.input_archive_file)

        if self.clear_target_dir:
            self.wipe_dir(self.target_dir)

        return True


class ZipArchiveExpander(ArchiveExpander):
    """
    Extracts all files from a ZIP file into the configured  target directory.

    consumes=FORMAT.string, produces=FORMAT.string
    """

    def __init__(self, configdict, section):
        ArchiveExpander.__init__(self, configdict, section, consumes=FORMAT.string, produces=FORMAT.string)

    def expand_archive(self, file_path):

        import zipfile
        if not file_path.lower().endswith('zip'):
            log.warn('No zipfile passed: {}'.format(file_path))
            return

        zipfile.ZipFile(file_path).extractall(path=self.target_dir)