File: work_queue_example.py

package info (click to toggle)
cctools 9.9-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 44,624 kB
  • sloc: ansic: 192,539; python: 20,827; cpp: 20,199; sh: 11,719; perl: 4,106; xml: 3,688; makefile: 1,224
file content (79 lines) | stat: -rwxr-xr-x 3,107 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python

# Copyright (c) 2010- The University of Notre Dame.
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.

# This program is a very simple example of how to use Work Queue.
# It accepts a list of files on the command line.
# Each file is compressed with gzip and returned to the user.

from work_queue import *

import os
import sys

cctools_debug_flags_set("all")
cctools_debug_config_file("wq.log")
# Main program
if __name__ == '__main__':
  port = WORK_QUEUE_DEFAULT_PORT

  if len(sys.argv) < 2:
    print("work_queue_example <file1> [file2] [file3] ...")
    print("Each file given on the command line will be compressed using a remote worker.")
    sys.exit(1)

  # Usually, we can execute the gzip utility by simply typing its name at a
  # terminal. However, this is not enough for work queue; we have to
  # specify precisely which files need to be transmitted to the workers. We
  # record the location of gzip in 'gzip_path', which is usually found in
  # /bin/gzip or /usr/bin/gzip.

  gzip_path = "/bin/gzip"
  if not os.path.exists(gzip_path):
    gzip_path = "/usr/bin/gzip"
    if not os.path.exists(gzip_path):
      print("gzip was not found. Please modify the gzip_path variable accordingly. To determine the location of gzip, from the terminal type: which gzip (usual locations are /bin/gzip and /usr/bin/gzip)")
      sys.exit(1);

  # We create the tasks queue using the default port. If this port is already
  # been used by another program, you can try setting port = 0 to use an
  # available port.
  try:
      q = WorkQueue(port)
  except:
      print("Instantiation of Work Queue failed!")
      sys.exit(1)

  # We create and dispatch a task for each filename given in the argument list
  for i in range(1, len(sys.argv)):
      infile = "%s" % sys.argv[i]
      outfile = "%s.gz" % sys.argv[i]

      # Note that we write ./gzip here, to guarantee that the gzip version we
      # are using is the one being sent to the workers.
      command = "./gzip < %s > %s" % (infile, outfile)

      t = Task(command)

      # gzip is the same across all tasks, so we can cache it in the workers.
      # Note that when specifying a file, we have to name its local name
      # (e.g. gzip_path), and its remote name (e.g. "gzip"). Unlike the
      # following line, more often than not these are the same.
      t.specify_file(gzip_path, "gzip", WORK_QUEUE_INPUT, cache=True)

      # files to be compressed are different across all tasks, so we do not
      # cache them. This is, of course, application specific. Sometimes you may
      # want to cache an output file if is the input of a later task.
      t.specify_file(infile, infile, WORK_QUEUE_INPUT, cache=False)
      t.specify_file(outfile, outfile, WORK_QUEUE_OUTPUT, cache=False)

      # Once all files has been specified, we are ready to submit the task to the queue.
      taskid = q.submit(t)

  while not q.empty():
      t = q.wait(5)

#work queue object will be garbage collected by Python automatically when it goes out of scope
sys.exit(0)