File: test_transform_formatter.py

package info (click to toggle)
python-ruffus 2.6.3%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 20,828 kB
  • ctags: 2,843
  • sloc: python: 15,745; makefile: 180; sh: 14
file content (147 lines) | stat: -rwxr-xr-x 4,687 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python
from __future__ import print_function
"""

    test_transform_formatter.py

"""
JOBS_PER_TASK = 5

import os
tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0]))
import sys
import re

# add grandparent to search path for testing
grandparent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, grandparent_dir)

# module name = script name without extension
module_name = os.path.splitext(os.path.basename(__file__))[0]


# funky code to import by file name
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
import ruffus
from ruffus import pipeline_run, pipeline_printout, Pipeline, formatter, transform, mkdir,originate

from ruffus.ruffus_exceptions import RethrownJobError
from ruffus.task import t_stream_logger


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   options


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

import shutil
import unittest


try:
    from StringIO import StringIO
except:
    from io import StringIO






#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   imports


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Main logic


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888




#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Tasks


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
@mkdir(tempdir)
@originate([os.path.join(tempdir, ff + ".tmp") for ff in "abcd"])
def generate_initial_files(out_name):
    with open(out_name, 'w') as outfile:
        pass


@transform(input = generate_initial_files, filter=formatter(), output = "{path[0]}/{basename[0]}.task1.{whatever}",
                        extras=['echo {dynamic_message} > {some_file}'])
def transform_with_missing_formatter_args(input_file, output_files, output1):
    print ("input = %r, output = %r, extras = %r" % (input_file, output_files, output1))


class Test_ruffus(unittest.TestCase):
    #___________________________________________________________________________
    #
    #   setup and cleanup
    #___________________________________________________________________________
    def setUp(self):
        import os
        try:
            shutil.rmtree(tempdir)
        except:
            pass
        os.makedirs(tempdir)

    def tearDown(self):
        shutil.rmtree(tempdir)

    #___________________________________________________________________________
    #
    #   test product() pipeline_printout and pipeline_run
    #___________________________________________________________________________
    def test_transform_with_missing_formatter_args(self):
        s = StringIO()
        pipeline_printout(s, [transform_with_missing_formatter_args], verbose=4, wrap_width = 10000, pipeline= "main")
        self.assertIn("Missing key = {dynamic_message}", s.getvalue())
        pipeline_run([transform_with_missing_formatter_args], verbose=0, pipeline= "main")


    def test_transform_with_missing_formatter_args_b(self):
        test_pipeline = Pipeline("test")


        test_pipeline.originate(task_func   = generate_initial_files,
                                output      = [os.path.join(tempdir, ff + ".tmp") for ff in "abcd"])\
            .mkdir(tempdir)


        test_pipeline.transform(task_func   = transform_with_missing_formatter_args,
                                input       = generate_initial_files,
                                filter      = formatter(),
                                output      = "{path[0]}/{basename[0]}.task1",
                                extras      =['echo {dynamic_message} > {some_file}'])
        s = StringIO()
        test_pipeline.printout(s, [transform_with_missing_formatter_args], verbose=4, wrap_width = 10000, pipeline= "test")
        self.assertIn("Missing key = {dynamic_message}", s.getvalue())

        #log to stream
        s = StringIO()
        logger = t_stream_logger(s)
        test_pipeline.run([transform_with_missing_formatter_args], verbose=5, pipeline= "test", logger=logger)
        self.assertIn("Missing key = {dynamic_message}", s.getvalue())


#
#   Necessary to protect the "entry point" of the program under windows.
#       see: http://docs.python.org/library/multiprocessing.html#multiprocessing-programming
#
if __name__ == '__main__':
    unittest.main()