1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
# A parallelized "find(1)" using the thread module.
# This demonstrates the use of a work queue and worker threads.
# It really does do more stats/sec when using multiple threads,
# although the improvement is only about 20-30 percent.
# (That was 8 years ago. In 2002, on Linux, I can't measure
# a speedup. :-( )
# I'm too lazy to write a command line parser for the full find(1)
# command line syntax, so the predicate it searches for is wired-in,
# see function selector() below. (It currently searches for files with
# world write permission.)
# Usage: parfind.py [-w nworkers] [directory] ...
# Default nworkers is 4
import sys
import getopt
import string
import time
import os
from stat import *
import thread
# Work queue class. Usage:
# wq = WorkQ()
# wq.addwork(func, (arg1, arg2, ...)) # one or more calls
# wq.run(nworkers)
# The work is done when wq.run() completes.
# The function calls executed by the workers may add more work.
# Don't use keyboard interrupts!
class WorkQ:
# Invariants:
# - busy and work are only modified when mutex is locked
# - len(work) is the number of jobs ready to be taken
# - busy is the number of jobs being done
# - todo is locked iff there is no work and somebody is busy
def __init__(self):
self.mutex = thread.allocate()
self.todo = thread.allocate()
self.todo.acquire()
self.work = []
self.busy = 0
def addwork(self, func, args):
job = (func, args)
self.mutex.acquire()
self.work.append(job)
self.mutex.release()
if len(self.work) == 1:
self.todo.release()
def _getwork(self):
self.todo.acquire()
self.mutex.acquire()
if self.busy == 0 and len(self.work) == 0:
self.mutex.release()
self.todo.release()
return None
job = self.work[0]
del self.work[0]
self.busy = self.busy + 1
self.mutex.release()
if len(self.work) > 0:
self.todo.release()
return job
def _donework(self):
self.mutex.acquire()
self.busy = self.busy - 1
if self.busy == 0 and len(self.work) == 0:
self.todo.release()
self.mutex.release()
def _worker(self):
time.sleep(0.00001) # Let other threads run
while 1:
job = self._getwork()
if not job:
break
func, args = job
apply(func, args)
self._donework()
def run(self, nworkers):
if not self.work:
return # Nothing to do
for i in range(nworkers-1):
thread.start_new(self._worker, ())
self._worker()
self.todo.acquire()
# Main program
def main():
nworkers = 4
opts, args = getopt.getopt(sys.argv[1:], '-w:')
for opt, arg in opts:
if opt == '-w':
nworkers = string.atoi(arg)
if not args:
args = [os.curdir]
wq = WorkQ()
for dir in args:
wq.addwork(find, (dir, selector, wq))
t1 = time.time()
wq.run(nworkers)
t2 = time.time()
sys.stderr.write('Total time %r sec.\n' % (t2-t1))
# The predicate -- defines what files we look for.
# Feel free to change this to suit your purpose
def selector(dir, name, fullname, stat):
# Look for world writable files that are not symlinks
return (stat[ST_MODE] & 0002) != 0 and not S_ISLNK(stat[ST_MODE])
# The find procedure -- calls wq.addwork() for subdirectories
def find(dir, pred, wq):
try:
names = os.listdir(dir)
except os.error, msg:
print repr(dir), ':', msg
return
for name in names:
if name not in (os.curdir, os.pardir):
fullname = os.path.join(dir, name)
try:
stat = os.lstat(fullname)
except os.error, msg:
print repr(fullname), ':', msg
continue
if pred(dir, name, fullname, stat):
print fullname
if S_ISDIR(stat[ST_MODE]):
if not os.path.ismount(fullname):
wq.addwork(find, (fullname, pred, wq))
# Call the main program
main()
|