File: simpler.py

package info (click to toggle)
python-ruffus 2.6.3%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 20,828 kB
  • ctags: 2,843
  • sloc: python: 15,745; makefile: 180; sh: 14
file content (260 lines) | stat: -rw-r--r-- 7,424 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#!/usr/bin/env python2.5
"""

    test_tasks.py

"""


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   options


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

from optparse import OptionParser
import sys, os
import os.path
import StringIO

# add self to search path for testing
exe_path = os.path.split(os.path.abspath(sys.argv[0]))[0]
if __name__ == '__main__':
    module_name = os.path.split(sys.argv[0])[1]
    module_name = os.path.splitext(module_name)[0];
else:
    module_name = __name__




parser = OptionParser(version="%prog 1.0")
parser.add_option("-t", "--target_tasks", dest="target_tasks",
                  action="append",
                  default = list(),
                  metavar="JOBNAME",
                  type="string",
                  help="Target task(s) of pipeline.")
parser.add_option("-f", "--forced_tasks", dest="forced_tasks",
                  action="append",
                  default = list(),
                  metavar="JOBNAME",
                  type="string",
                  help="Pipeline task(s) which will be included even if they are up to date.")
parser.add_option("-j", "--jobs", dest="jobs",
                  default=5,
                  metavar="jobs",
                  type="int",
                  help="Specifies  the number of jobs (commands) to run simultaneously.")
parser.add_option("-v", "--verbose", dest = "verbose",
                  action="store_true", default=False,
                  help="Do not echo to shell but only print to log.")
parser.add_option("-d", "--dependency", dest="dependency_file",
                  default="simple.svg",
                  metavar="FILE",
                  type="string",
                  help="Print a dependency graph of the pipeline that would be executed "
                        "to FILE, but do not execute it.")
parser.add_option("-F", "--dependency_graph_format", dest="dependency_graph_format",
                  metavar="FORMAT",
                  type="string",
                  default = 'svg',
                  help="format of dependency graph file. Can be 'ps' (PostScript), "+
                  "'svg' 'svgz' (Structured Vector Graphics), " +
                  "'png' 'gif' (bitmap  graphics) etc ")
parser.add_option("-n", "--just_print", dest="just_print",
                    action="store_true", default=False,
                    help="Print a description of the jobs that would be executed, "
                        "but do not execute them.")
parser.add_option("-M", "--minimal_rebuild_mode", dest="minimal_rebuild_mode",
                    action="store_true", default=False,
                    help="Rebuild a minimum of tasks necessary for the target. "
                    "Ignore upstream out of date tasks if intervening tasks are fine.")
parser.add_option("-K", "--no_key_legend_in_graph", dest="no_key_legend_in_graph",
                    action="store_true", default=False,
                    help="Do not print out legend and key for dependency graph.")
parser.add_option("-H", "--draw_graph_horizontally", dest="draw_horizontally",
                    action="store_true", default=False,
                    help="Draw horizontal dependency graph.")

parameters = [
                ]







#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   imports


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

import StringIO
import re
import operator
import sys
from collections import defaultdict

sys.path.append(os.path.abspath(os.path.join(exe_path,"..", "..")))
from ruffus import *
import json

#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Functions


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

def create_custom_file_func(params):
    """
    creates function which can be used as input to @files_func
    """
    def cust_func ():
        for job_param in params:
            yield job_param
    return cust_func


def is_job_uptodate (infiles, outfiles, *extra_params):
    """
    assumes first two parameters are files, checks if they are up to date
    """
    return task.needs_update_check_modify_time (infiles, outfiles, *extra_params)



def test_post_task_function ():
    print "Hooray"

import time
def test_job_io(infiles, outfiles, extra_params):
    """
    cat input files content to output files
        after writing out job parameters
    """
    # dump parameters
    params = (infiles, outfiles) + extra_params
    sys.stdout.write('    job = %s\n' % json.dumps(params))



    if isinstance(infiles, str):
        infiles = [infiles]
    elif infiles is None:
        infiles = []
    if isinstance(outfiles, str):
        outfiles = [outfiles]
    output_text = list()
    for f in infiles:
        output_text.append(open(f).read())
    output_text = "".join(sorted(output_text))
    output_text += json.dumps(infiles) + " -> " + json.dumps(outfiles) + "\n"
    for f in outfiles:
        open(f, "w").write(output_text)
    time.sleep(1)


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Main logic


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888





# get help string
f =StringIO.StringIO()
parser.print_help(f)
helpstr = f.getvalue()
(options, remaining_args) = parser.parse_args()





#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888

#   Tasks


#88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888


#
#    task1
#
@files(None, 'a.1')
def task1(infiles, outfiles, *extra_params):
    """
    First task
    """
    test_job_io(infiles, outfiles, extra_params)



#
#    task2
#
@files_re('*.1', '(.*).1', r'\1.1', r'\1.2')
@follows(task1)
def task2(infiles, outfiles, *extra_params):
    """
    Second task
    """
    test_job_io(infiles, outfiles, extra_params)



#
#    task3
#
@files_re('*.1', '(.*).1', r'\1.2', r'\1.3')
@follows(task2)
def task3(infiles, outfiles, *extra_params):
    """
    Third task
    """
    test_job_io(infiles, outfiles, extra_params)



#
#    task4
#
@files_re('*.1', '(.*).1', r'\1.3', r'\1.4')
@follows(task3)
def task4(infiles, outfiles, *extra_params):
    """
    Fourth task
    """
    test_job_io(infiles, outfiles, extra_params)




if options.just_print:
    pipeline_printout(sys.stdout, options.target_tasks, options.forced_tasks,
                        long_winded=True,
                        gnu_make_maximal_rebuild_mode = not options.minimal_rebuild_mode)

elif options.dependency_file:
    pipeline_printout_graph (     open(options.dependency_file, "w"),
                         options.dependency_graph_format,
                         options.target_tasks,
                         options.forced_tasks,
                         draw_vertically = not options.draw_horizontally,
                         gnu_make_maximal_rebuild_mode  = not options.minimal_rebuild_mode,
                         no_key_legend  = options.no_key_legend_in_graph)
else:
    pipeline_run(options.target_tasks, options.forced_tasks, multiprocess = options.jobs,
                    gnu_make_maximal_rebuild_mode  = not options.minimal_rebuild_mode)