1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
#!/usr/bin/env python
# Copyright (c) 2010- The University of Notre Dame.
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.
# This program is a very simple example of how to use Work Queue.
# It accepts a list of files on the command line.
# Each file is compressed with gzip and returned to the user.
from work_queue import *
import os
import sys
cctools_debug_flags_set("all")
cctools_debug_config_file("wq.log")
# Main program
if __name__ == '__main__':
port = WORK_QUEUE_DEFAULT_PORT
if len(sys.argv) < 2:
print("work_queue_example <file1> [file2] [file3] ...")
print("Each file given on the command line will be compressed using a remote worker.")
sys.exit(1)
# Usually, we can execute the gzip utility by simply typing its name at a
# terminal. However, this is not enough for work queue; we have to
# specify precisely which files need to be transmitted to the workers. We
# record the location of gzip in 'gzip_path', which is usually found in
# /bin/gzip or /usr/bin/gzip.
gzip_path = "/bin/gzip"
if not os.path.exists(gzip_path):
gzip_path = "/usr/bin/gzip"
if not os.path.exists(gzip_path):
print("gzip was not found. Please modify the gzip_path variable accordingly. To determine the location of gzip, from the terminal type: which gzip (usual locations are /bin/gzip and /usr/bin/gzip)")
sys.exit(1);
# We create the tasks queue using the default port. If this port is already
# been used by another program, you can try setting port = 0 to use an
# available port.
try:
q = WorkQueue(port)
except:
print("Instantiation of Work Queue failed!")
sys.exit(1)
# We create and dispatch a task for each filename given in the argument list
for i in range(1, len(sys.argv)):
infile = "%s" % sys.argv[i]
outfile = "%s.gz" % sys.argv[i]
# Note that we write ./gzip here, to guarantee that the gzip version we
# are using is the one being sent to the workers.
command = "./gzip < %s > %s" % (infile, outfile)
t = Task(command)
# gzip is the same across all tasks, so we can cache it in the workers.
# Note that when specifying a file, we have to name its local name
# (e.g. gzip_path), and its remote name (e.g. "gzip"). Unlike the
# following line, more often than not these are the same.
t.specify_file(gzip_path, "gzip", WORK_QUEUE_INPUT, cache=True)
# files to be compressed are different across all tasks, so we do not
# cache them. This is, of course, application specific. Sometimes you may
# want to cache an output file if is the input of a later task.
t.specify_file(infile, infile, WORK_QUEUE_INPUT, cache=False)
t.specify_file(outfile, outfile, WORK_QUEUE_OUTPUT, cache=False)
# Once all files has been specified, we are ready to submit the task to the queue.
taskid = q.submit(t)
while not q.empty():
t = q.wait(5)
#work queue object will be garbage collected by Python automatically when it goes out of scope
sys.exit(0)
|