1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
#!/usr/bin/env python
from __future__ import print_function
"""
test_softlink_uptodate.py
"""
import os
import sys
tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
# add grandparent to search path for testing
grandparent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, grandparent_dir)
# module name = script name without extension
module_name = os.path.splitext(os.path.basename(__file__))[0]
from ruffus import Pipeline, suffix, pipeline_run
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Tasks
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
import multiprocessing.managers
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Tasks
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
#
# First task
#
def start_task(output_file_name, executed_tasks_proxy, mutex_proxy):
with open(output_file_name, "w") as f:
pass
with mutex_proxy:
executed_tasks_proxy["start_task"] = 1
#
# Forwards file names, is always as up to date as its input files...
#
def same_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
with mutex_proxy:
executed_tasks_proxy["same_file_name_task"] = executed_tasks_proxy.get("same_file_name_task", 0) + 1
#
# Links file names, is always as up to date if links are not missing
#
def linked_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
os.symlink(os.path.abspath(input_file_name), os.path.abspath(output_file_name))
with mutex_proxy:
executed_tasks_proxy["linked_file_name_task"] = executed_tasks_proxy.get("linked_file_name_task", 0) + 1
#
# Final task linking everything
#
def final_task (input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
with open(output_file_name, "w") as f:
pass
with mutex_proxy:
executed_tasks_proxy["final_task"] = executed_tasks_proxy.get("final_task", 0) + 1
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Run pipeline
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
import unittest, shutil
try:
from StringIO import StringIO
except:
from io import StringIO
class Test_ruffus(unittest.TestCase):
def setUp(self):
# list of executed tasks
manager = multiprocessing.managers.SyncManager()
manager.start()
global mutex_proxy
global executed_tasks_proxy
mutex_proxy = manager.Lock()
executed_tasks_proxy = manager.dict()
pipeline = Pipeline.pipelines["main"]
pipeline.originate(task_func = start_task,
output = [tempdir + "a.1", tempdir + "b.1"],
extras = [executed_tasks_proxy, mutex_proxy])\
.mkdir(tempdir)
pipeline.transform(task_func = same_file_name_task,
input = start_task,
filter = suffix(".1"),
output = ".1",
extras = [executed_tasks_proxy, mutex_proxy])
pipeline.transform( task_func = linked_file_name_task,
input = start_task,
filter = suffix(".1"),
output = ".linked.1",
extras = [executed_tasks_proxy, mutex_proxy])
pipeline.transform(task_func = final_task,
input = [linked_file_name_task, same_file_name_task],
filter = suffix(".1"),
output = ".3",
extras = [executed_tasks_proxy, mutex_proxy])
self.cleanUp()
def cleanUp(self, check_expected = False):
for f in ["a.1", "b.1", "a.linked.1", "b.linked.1", "a.3", "b.3", "a.linked.3", "b.linked.3"]:
if os.path.lexists(tempdir + f):
os.unlink(tempdir + f)
elif check_expected:
raise Exception("Expected %s missing" % (tempdir + f))
if os.path.lexists(tempdir):
os.rmdir(tempdir)
elif check_expected:
raise Exception("Expected %s missing" % (tempdir))
def tearDown(self):
self.cleanUp(True)
def test_ruffus (self):
#
# Run task 1 only
#
print(" Run start_task only", file=sys.stderr)
pipeline_run(log_exceptions = True, verbose = 0, pipeline= "main")
#
# Run task 3 only
#
print(" Run final_task: linked_file_name_task should run as well", file=sys.stderr)
pipeline_run(log_exceptions = True, verbose = 0, pipeline= "main")
#
# Run task 3 again:
#
# All jobs should be up to date
#
print(" Run final_task again: All jobs should be up to date", file=sys.stderr)
pipeline_run(log_exceptions = True, verbose = 0, pipeline= "main")
#
# Make sure right number of jobs / tasks ran
#
for task_name, jobs_count in ({'start_task': 1, 'final_task': 4, 'linked_file_name_task': 2}).items():
if task_name not in executed_tasks_proxy:
raise Exception("Error: %s did not run!!" % task_name)
if executed_tasks_proxy[task_name] != jobs_count:
raise Exception("Error: %s did not have %d jobs!!" % (task_name, jobs_count))
if "same_file_name_task" in executed_tasks_proxy:
raise Exception("Error: %s should not have run!!" % "same_file_name_task")
if __name__ == '__main__':
unittest.main()
|