File: test_newstyle_proxy.py

package info (click to toggle)
python-ruffus 2.6.3%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 20,828 kB
  • ctags: 2,843
  • sloc: python: 15,745; makefile: 180; sh: 14
file content (176 lines) | stat: -rwxr-xr-x 5,873 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
from __future__ import print_function
"""

    test_softlink_uptodate.py

"""

import os
import sys

tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"

# add grandparent to search path for testing
grandparent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, grandparent_dir)

# module name = script name without extension
module_name = os.path.splitext(os.path.basename(__file__))[0]

from ruffus import Pipeline, suffix, pipeline_run


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Tasks


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888


import multiprocessing.managers



#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Tasks


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#
#   First task
#
def start_task(output_file_name, executed_tasks_proxy, mutex_proxy):
    with open(output_file_name,  "w") as f:
        pass
    with mutex_proxy:
        executed_tasks_proxy["start_task"] = 1

#
#   Forwards file names, is always as up to date as its input files...
#
def same_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
    with mutex_proxy:
        executed_tasks_proxy["same_file_name_task"] = executed_tasks_proxy.get("same_file_name_task", 0) + 1

#
#   Links file names, is always as up to date if links are not missing
#
def linked_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
    os.symlink(os.path.abspath(input_file_name), os.path.abspath(output_file_name))
    with mutex_proxy:
        executed_tasks_proxy["linked_file_name_task"] = executed_tasks_proxy.get("linked_file_name_task", 0) + 1


#
#   Final task linking everything
#
def final_task (input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
    with open(output_file_name,  "w") as f:
        pass
    with mutex_proxy:
        executed_tasks_proxy["final_task"] = executed_tasks_proxy.get("final_task", 0) + 1

#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Run pipeline


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888


import unittest, shutil
try:
    from StringIO import StringIO
except:
    from io import StringIO

class Test_ruffus(unittest.TestCase):
    def setUp(self):

        # list of executed tasks
        manager = multiprocessing.managers.SyncManager()
        manager.start()
        global mutex_proxy
        global executed_tasks_proxy
        mutex_proxy = manager.Lock()
        executed_tasks_proxy = manager.dict()

        pipeline = Pipeline.pipelines["main"]
        pipeline.originate(task_func = start_task,
                            output = [tempdir + "a.1", tempdir + "b.1"],
                            extras = [executed_tasks_proxy, mutex_proxy])\
                .mkdir(tempdir)
        pipeline.transform(task_func = same_file_name_task,
                            input = start_task,
                            filter = suffix(".1"),
                            output = ".1",
                            extras = [executed_tasks_proxy, mutex_proxy])
        pipeline.transform( task_func = linked_file_name_task,
                            input = start_task,
                            filter = suffix(".1"),
                            output = ".linked.1",
                            extras = [executed_tasks_proxy, mutex_proxy])
        pipeline.transform(task_func = final_task,
                            input = [linked_file_name_task, same_file_name_task],
                            filter = suffix(".1"),
                            output = ".3",
                            extras = [executed_tasks_proxy, mutex_proxy])
        self.cleanUp()

    def cleanUp(self, check_expected = False):
        for f in ["a.1", "b.1", "a.linked.1", "b.linked.1", "a.3", "b.3", "a.linked.3", "b.linked.3"]:
            if os.path.lexists(tempdir + f):
                os.unlink(tempdir + f)
            elif check_expected:
                raise Exception("Expected %s missing" % (tempdir + f))
        if os.path.lexists(tempdir):
            os.rmdir(tempdir)
        elif check_expected:
            raise Exception("Expected %s missing" % (tempdir))

    def tearDown(self):
        self.cleanUp(True)

    def test_ruffus (self):
        #
        #   Run task 1 only
        #
        print("    Run start_task only", file=sys.stderr)
        pipeline_run(log_exceptions = True, verbose = 0, pipeline= "main")


        #
        #   Run task 3 only
        #
        print("    Run final_task: linked_file_name_task should run as well", file=sys.stderr)
        pipeline_run(log_exceptions = True, verbose = 0, pipeline= "main")


        #
        #   Run task 3 again:
        #
        #       All jobs should be up to date
        #
        print("    Run final_task again: All jobs should be up to date", file=sys.stderr)
        pipeline_run(log_exceptions = True, verbose = 0, pipeline= "main")

        #
        #   Make sure right number of jobs / tasks ran
        #
        for task_name, jobs_count in ({'start_task': 1, 'final_task': 4, 'linked_file_name_task': 2}).items():
            if task_name not in executed_tasks_proxy:
                raise Exception("Error: %s did not run!!" % task_name)
            if executed_tasks_proxy[task_name] != jobs_count:
                raise Exception("Error: %s did not have %d jobs!!" % (task_name, jobs_count))
        if "same_file_name_task" in executed_tasks_proxy:
            raise Exception("Error: %s should not have run!!" % "same_file_name_task")



if __name__ == '__main__':
    unittest.main()