1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
|
"""
Monitoring a dynamic number of threads
Shows a simple user interface being updated by a dynamic number of threads.
When the *Create Threads* button is pressed, the *count* method is
dispatched on a new thread. It then creates a new *Counter* object and
adds it to the *counters* list (which causes the *Counter* to appear
in the user interface. It then counts by incrementing the *Counter*
object's *count* trait (which again causes a user interface update each
time the counter is incremented). After it reaches its maximum count, it
removes the *Counter* from the *counter* list (causing the counter
to be removed from the user interface) and exits (terminating the thread).
Note that repeated clicking of the *Create Thread* button will create
additional threads.
"""
from time import sleep
from traits.api import HasTraits, Int, Button, List
from traitsui.api import View, Item, ListEditor
#-- The Counter objects used to keep track of the current count ----------------
class Counter(HasTraits):
# The current count:
count = Int
view = View(Item('count', style = 'readonly'))
#-- The main 'ThreadDemo' class ------------------------------------------------
class ThreadDemo(HasTraits):
# The button used to start a new thread running:
create = Button('Create Thread')
# The set of counter objects currently running:
counters = List(Counter)
view = View(
Item('create', show_label = False, width = -100),
'_',
Item('counters', style = 'custom',
show_label = False,
editor = ListEditor(use_notebook = True,
dock_style = 'tab')),
resizable = True,
width = 300,
height = 150,
title = 'Dynamic threads'
)
def __init__(self, **traits):
super(HasTraits, self).__init__(**traits)
# Set up the notification handler for the 'Create Thread' button so
# that it is run on a new thread:
self.on_trait_change(self.count, 'create', dispatch = 'new')
def count(self):
""" This method is dispatched on a new thread each time the
'Create Thread' button is pressed.
"""
counter = Counter()
self.counters.append(counter)
for i in range(1000):
counter.count += 1
sleep(0.030)
self.counters.remove(counter)
# Create the demo:
demo = ThreadDemo()
# Run the demo (if invoked from the command line):
if __name__ == '__main__':
demo.configure_traits()
|