File: manual_dependencies_flowchart_intro.png.py

package info (click to toggle)
python-ruffus 2.6.3%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 20,828 kB
  • ctags: 2,843
  • sloc: python: 15,745; makefile: 180; sh: 14
file content (98 lines) | stat: -rwxr-xr-x 1,867 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python
import sys, os

# add self to search path for testing
if __name__ == '__main__':
    exe_path = os.path.split(os.path.abspath(sys.argv[0]))[0]
    module_name = os.path.split(sys.argv[0])[1]
    module_name = os.path.splitext(module_name)[0];
else:
    module_name = __name__

# Use import path from <<../python_modules>>
if __name__ == '__main__':
    sys.path.insert(0, os.path.abspath(os.path.join(exe_path,"../..")))

from ruffus import *
import json



#
#    up_to_date_task1
#
@files(None, 'a.1')
def up_to_date_task1(infile, outfile):
    open(outfile, "w")



#
#    up_to_date_task2
#
@transform(up_to_date_task1, suffix('.1'), '.2')
def up_to_date_task2(infile, outfile):
    open(outfile, "w")

#
#    task3
#
@transform(up_to_date_task2, suffix('.2'), '.3')
def task3(infile, outfile):
    open(outfile, "w")

#
#    up_to_date_task4
#
@files(None, 'a.4')
def up_to_date_task4(infile, outfile):
    open(outfile, "w")

#
#    task5
#
@transform(up_to_date_task4, suffix('.4'), '.5')
def task5(infile, outfile):
    open(outfile, "w")

#
#    task6
#
@follows(task3)
@transform([task5], suffix('.5'), '.6')
def task6(infile, outfile):
    open(outfile, "w")

#
#    task7
#
@transform(task6, suffix('.6'), '.7')
def final_task(infile, outfile):
    open(outfile, "w")



for f in range(7):
    fn = "a.%d" % (f + 1)
    if os.path.exists(fn):
        os.unlink(fn)
import time
open("a.3", "w")
open("a.7", "w")
open("a.5", "w")
time.sleep(1)
open("a.1", "w")
time.sleep(1)
open("a.2", "w")
time.sleep(1)
open("a.4", "w")
time.sleep(1)
open("a.6", "w")
time.sleep(1)
pipeline_printout_graph ("manual_dependencies_flowchart_intro.png", "png", [final_task], dpi = 72, size = (2,2))
#pipeline_printout (sys.stdout, [final_task], verbose = 5)
for f in range(7):
    fn = "a.%d" % (f + 1)
    if os.path.exists(fn):
        os.unlink(fn)