1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
#!/usr/bin/env python
from __future__ import print_function
"""
test_transform_formatter.py
"""
JOBS_PER_TASK = 5
import os
tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0]))
import sys
import re
# add grandparent to search path for testing
grandparent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, grandparent_dir)
# module name = script name without extension
module_name = os.path.splitext(os.path.basename(__file__))[0]
# funky code to import by file name
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
import ruffus
from ruffus import pipeline_run, pipeline_printout, Pipeline, formatter, transform, mkdir,originate
from ruffus.ruffus_exceptions import RethrownJobError
from ruffus.task import t_stream_logger
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# options
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
import shutil
import unittest
try:
from StringIO import StringIO
except:
from io import StringIO
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# imports
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Main logic
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Tasks
#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
@mkdir(tempdir)
@originate([os.path.join(tempdir, ff + ".tmp") for ff in "abcd"])
def generate_initial_files(out_name):
with open(out_name, 'w') as outfile:
pass
@transform(input = generate_initial_files, filter=formatter(), output = "{path[0]}/{basename[0]}.task1.{whatever}",
extras=['echo {dynamic_message} > {some_file}'])
def transform_with_missing_formatter_args(input_file, output_files, output1):
print ("input = %r, output = %r, extras = %r" % (input_file, output_files, output1))
class Test_ruffus(unittest.TestCase):
#___________________________________________________________________________
#
# setup and cleanup
#___________________________________________________________________________
def setUp(self):
import os
try:
shutil.rmtree(tempdir)
except:
pass
os.makedirs(tempdir)
def tearDown(self):
shutil.rmtree(tempdir)
#___________________________________________________________________________
#
# test product() pipeline_printout and pipeline_run
#___________________________________________________________________________
def test_transform_with_missing_formatter_args(self):
s = StringIO()
pipeline_printout(s, [transform_with_missing_formatter_args], verbose=4, wrap_width = 10000, pipeline= "main")
self.assertIn("Missing key = {dynamic_message}", s.getvalue())
pipeline_run([transform_with_missing_formatter_args], verbose=0, pipeline= "main")
def test_transform_with_missing_formatter_args_b(self):
test_pipeline = Pipeline("test")
test_pipeline.originate(task_func = generate_initial_files,
output = [os.path.join(tempdir, ff + ".tmp") for ff in "abcd"])\
.mkdir(tempdir)
test_pipeline.transform(task_func = transform_with_missing_formatter_args,
input = generate_initial_files,
filter = formatter(),
output = "{path[0]}/{basename[0]}.task1",
extras =['echo {dynamic_message} > {some_file}'])
s = StringIO()
test_pipeline.printout(s, [transform_with_missing_formatter_args], verbose=4, wrap_width = 10000, pipeline= "test")
self.assertIn("Missing key = {dynamic_message}", s.getvalue())
#log to stream
s = StringIO()
logger = t_stream_logger(s)
test_pipeline.run([transform_with_missing_formatter_args], verbose=5, pipeline= "test", logger=logger)
self.assertIn("Missing key = {dynamic_message}", s.getvalue())
#
# Necessary to protect the "entry point" of the program under windows.
# see: http://docs.python.org/library/multiprocessing.html#multiprocessing-programming
#
if __name__ == '__main__':
unittest.main()
|