#!/usr/bin/env python
#############################################################################
# Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
# All Rights Reserved.
#
# The software contained on this media is the property of the
# DSTC Pty Ltd.  Use of this software is strictly in accordance
# with the license agreement in the accompanying LICENSE.DOC file.
# If your distribution of this software does not contain a
# LICENSE.DOC file then you have no rights to use this software
# in any manner and should contact DSTC at the address below
# to determine an appropriate licensing arrangement.
# 
#      DSTC Pty Ltd
#      Level 7, Gehrmann Labs
#      University of Queensland
#      St Lucia, 4072
#      Australia
#      Tel: +61 7 3365 4310
#      Fax: +61 7 3365 4311
#      Email: enquiries@dstc.edu.au
# 
# This software is being provided "AS IS" without warranty of
# any kind.  In no event shall DSTC Pty Ltd be liable for
# damage of any kind arising out of or in connection with
# the use or performance of this software.
#
# Project:      Distributed Environment
# File:         $Source: /cvsroot/fnorb/fnorb/orb/ThreadPoolQueue.py,v $
#
#############################################################################
""" A queue serviced by a thread pool. """


# Standard/built-in modules.
import thread

# Fnorb modules.
import condvar


class ThreadPoolQueue:
    """ A queue serviced by a thread pool. """

    def __init__(self, size, function):
	""" Constructor. """

	self.__size = size
	self.__function = function
	self.__stopped = 0
	self.__data = []
	self.__cv = condvar.condvar()

	return

    def start(self):
	""" Start servicing the queue. """

	# Start the appropriate number of worker threads.
	for i in range(self.__size):
	    thread.start_new_thread(self.worker_thread, (i,))

	return

    def stop(self):
	""" Stop servicing the queue. """

	self.__cv.acquire()
	self.__stopped = 1
	self.__cv.broadcast()

	return

    def wait(self):
	""" Wait until all of the worker threads have finished. """

	self.__cv.acquire()
	while self.__size > 0:
	    self.__cv.wait()
	self.__cv.release()

	return
	
    def add_item(self, item):
	""" Add a single item to the queue. """

	self.__cv.acquire()
	self.__data.append(item)
	self.__cv.signal()

	return

    def add_items(self, items):
	""" Add a list of items to the queue. """

	self.__cv.acquire()
	self.__data[len(self.__data):] = items
	self.__cv.broadcast()

	return
		
    def worker_thread(self, i):
	""" The worker!"""

	self.__cv.acquire()
	while not self.__stopped:
	    # Is there an item on the queue for me to deal with?
	    if len(self.__data) > 0:
		item = self.__data[0]
		del self.__data[0]
		self.__cv.release()

		# Do the work!
		apply(self.__function, item)

		# Acquire the lock so that I can check to see if I am stopped
		# or if there is some more work for me to do.
		self.__cv.acquire()

	    # Otherwise, we are not stopped, and there is nothing for me to
	    # do, so I'll just wait around a while...
	    else:
		self.__cv.wait()

	# The thread pool has been stopped so let's get outta here.
	self.__size = self.__size - 1
	self.__cv.signal()

	# Explicitly exit the thread!
	thread.exit()

#############################################################################
