1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
#!/usr/bin/env python
from __future__ import print_function
from ruffus import pipeline_run, pipeline_printout, Pipeline, formatter, transform, mkdir, originate
import unittest
import shutil
from ruffus.task import t_stream_logger
from ruffus.ruffus_exceptions import RethrownJobError
import ruffus
import re
import sys
import os
"""
test_transform_formatter.py
"""
JOBS_PER_TASK = 5
tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0]))
# add grandparent to search path for testing
grandparent_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, grandparent_dir)
# module name = script name without extension
module_name = os.path.splitext(os.path.basename(__file__))[0]
# funky code to import by file name
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# options
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
try:
from StringIO import StringIO
except:
from io import StringIO
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# imports
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Main logic
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
# Tasks
# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
@mkdir(tempdir)
@originate([os.path.join(tempdir, ff + ".tmp") for ff in "abcd"])
def generate_initial_files(out_name):
with open(out_name, 'w') as outfile:
pass
@transform(input=generate_initial_files, filter=formatter(), output="{path[0]}/{basename[0]}.task1.{whatever}",
extras=['echo {dynamic_message} > {some_file}'])
def transform_with_missing_formatter_args(input_file, output_files, output1):
print("input = %r, output = %r, extras = %r" %
(input_file, output_files, output1))
class Test_ruffus(unittest.TestCase):
# ___________________________________________________________________________
#
# setup and cleanup
# ___________________________________________________________________________
def setUp(self):
import os
try:
shutil.rmtree(tempdir)
except:
pass
os.makedirs(tempdir)
def tearDown(self):
shutil.rmtree(tempdir)
# ___________________________________________________________________________
#
# test product() pipeline_printout and pipeline_run
# ___________________________________________________________________________
def test_transform_with_missing_formatter_args(self):
s = StringIO()
pipeline_printout(s, [transform_with_missing_formatter_args],
verbose=4, wrap_width=10000, pipeline="main")
self.assertIn("Unmatched field {dynamic_message}", s.getvalue())
pipeline_run([transform_with_missing_formatter_args],
verbose=0, pipeline="main")
def test_transform_with_missing_formatter_args_b(self):
test_pipeline = Pipeline("test")
test_pipeline.originate(task_func=generate_initial_files,
output=[os.path.join(tempdir, ff + ".tmp") for ff in "abcd"])\
.mkdir(tempdir)
test_pipeline.transform(task_func=transform_with_missing_formatter_args,
input=generate_initial_files,
filter=formatter(),
output="{path[0]}/{basename[0]}.task1",
extras=['echo {dynamic_message} > {some_file}'])
s = StringIO()
test_pipeline.printout(
s, [transform_with_missing_formatter_args], verbose=4, wrap_width=10000, pipeline="test")
self.assertIn("Unmatched field {dynamic_message}", s.getvalue())
# log to stream
s = StringIO()
logger = t_stream_logger(s)
test_pipeline.run([transform_with_missing_formatter_args],
verbose=5, pipeline="test", logger=logger)
self.assertIn("Unmatched field {dynamic_message}", s.getvalue())
#
# Necessary to protect the "entry point" of the program under windows.
# see: http://docs.python.org/library/multiprocessing.html#multiprocessing-programming
#
if __name__ == '__main__':
unittest.main()
|