1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
#!/usr/bin/env python
from __future__ import print_function
"""
test_transform_with_no_re_matches.py
test messages with no regular expression matches
"""
import os
tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
import sys
# add grandparent to search path for testing
grandparent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, grandparent_dir)
# module name = script name without extension
module_name = os.path.splitext(os.path.basename(__file__))[0]
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
import ruffus
from ruffus import follows, mkdir, transform, regex, merge, Pipeline, pipeline_run
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# imports
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
import shutil
def touch (outfile):
with open(outfile, "w"):
pass
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Tasks
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
@follows(mkdir(tempdir))
@ruffus.files([[None, tempdir+ "a.1"], [None, tempdir+ "b.1"]])
def task1(i, o):
touch(o)
@follows(mkdir(tempdir))
@ruffus.files([[None, tempdir+ "c.1"], [None, tempdir+ "d.1"]])
def task2(i, o):
touch(o)
@transform(task1, regex(r"(.*)"), ruffus.inputs(((r"\1"), task2, "test_transform_inputs.*y")), r"\1.output")
def task3(i, o):
names = ",".join(sorted(i))
for f in o:
with open(o, "w") as ff:
ff.write(names)
@merge((task3), tempdir + "final.output")
def task4(i, o):
with open(o, "w") as o_file:
for f in sorted(i):
with open(f) as ff:
o_file.write(f +":" + ff.read() + ";")
import unittest
class Test_task(unittest.TestCase):
def tearDown (self):
"""
"""
try:
shutil.rmtree(tempdir)
except:
pass
def test_task (self):
pipeline_run([task4], multiprocess = 10, verbose = 0, pipeline= "main")
correct_output = "{tempdir}a.1.output:test_transform_inputs.py,{tempdir}a.1,{tempdir}c.1,{tempdir}d.1;{tempdir}b.1.output:test_transform_inputs.py,{tempdir}b.1,{tempdir}c.1,{tempdir}d.1;".format(tempdir = tempdir)
with open(tempdir + "final.output") as ff:
real_output = ff.read()
self.assertEqual(correct_output, real_output)
def test_newstyle_task (self):
test_pipeline = Pipeline("test")
test_pipeline.files(task1, [[None, tempdir+ "a.1"], [None, tempdir+ "b.1"]])\
.follows(mkdir(tempdir))
test_pipeline.files(task2, [[None, tempdir+ "c.1"], [None, tempdir+ "d.1"]])\
.follows(mkdir(tempdir))
test_pipeline.transform(task_func = task3,
input = task1,
filter = regex(r"(.*)"),
replace_inputs = ruffus.inputs(((r"\1"), task2, "test_transform_inputs.*y")),
output = r"\1.output")
test_pipeline.merge(task4, (task3), tempdir + "final.output")
test_pipeline.run([task4], multiprocess = 10, verbose = 0)
correct_output = "{tempdir}a.1.output:test_transform_inputs.py,{tempdir}a.1,{tempdir}c.1,{tempdir}d.1;{tempdir}b.1.output:test_transform_inputs.py,{tempdir}b.1,{tempdir}c.1,{tempdir}d.1;".format(tempdir = tempdir)
with open(tempdir + "final.output") as ff:
real_output = ff.read()
self.assertEqual(correct_output, real_output)
if __name__ == '__main__':
unittest.main()
|