1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
"""
Process communication example
Covers:
- Resources: Store
Scenario:
This example shows how to interconnect simulation model elements
together using :class:`~simpy.resources.store.Store` for one-to-one,
and many-to-one asynchronous processes. For one-to-many a simple
BroadCastPipe class is constructed from Store.
When Useful:
When a consumer process does not always wait on a generating process
and these processes run asynchronously. This example shows how to
create a buffer and also tell is the consumer process was late
yielding to the event from a generating process.
This is also useful when some information needs to be broadcast to
many receiving processes
Finally, using pipes can simplify how processes are interconnected to
each other in a simulation model.
Example By:
Keith Smith
"""
import random
import simpy
RANDOM_SEED = 42
SIM_TIME = 100
class BroadcastPipe(object):
"""A Broadcast pipe that allows one process to send messages to many.
This construct is useful when message consumers are running at
different rates than message generators and provides an event
buffering to the consuming processes.
The parameters are used to create a new
:class:`~simpy.resources.store.Store` instance each time
:meth:`get_output_conn()` is called.
"""
def __init__(self, env, capacity=simpy.core.Infinity):
self.env = env
self.capacity = capacity
self.pipes = []
def put(self, value):
"""Broadcast a *value* to all receivers."""
if not self.pipes:
raise RuntimeError('There are no output pipes.')
events = [store.put(value) for store in self.pipes]
return self.env.all_of(events) # Condition event for all "events"
def get_output_conn(self):
"""Get a new output connection for this broadcast pipe.
The return value is a :class:`~simpy.resources.store.Store`.
"""
pipe = simpy.Store(self.env, capacity=self.capacity)
self.pipes.append(pipe)
return pipe
def message_generator(name, env, out_pipe):
"""A process which randomly generates messages."""
while True:
# wait for next transmission
yield env.timeout(random.randint(6, 10))
# messages are time stamped to later check if the consumer was
# late getting them. Note, using event.triggered to do this may
# result in failure due to FIFO nature of simulation yields.
# (i.e. if at the same env.now, message_generator puts a message
# in the pipe first and then message_consumer gets from pipe,
# the event.triggered will be True in the other order it will be
# False
msg = (env.now, '%s says hello at %d' % (name, env.now))
out_pipe.put(msg)
def message_consumer(name, env, in_pipe):
"""A process which consumes messages."""
while True:
# Get event for message pipe
msg = yield in_pipe.get()
if msg[0] < env.now:
# if message was already put into pipe, then
# message_consumer was late getting to it. Depending on what
# is being modeled this, may, or may not have some
# significance
print('LATE Getting Message: at time %d: %s received message: %s' %
(env.now, name, msg[1]))
else:
# message_consumer is synchronized with message_generator
print('at time %d: %s received message: %s.' %
(env.now, name, msg[1]))
# Process does some other work, which may result in missing messages
yield env.timeout(random.randint(4, 8))
# Setup and start the simulation
print('Process communication')
random.seed(RANDOM_SEED)
env = simpy.Environment()
# For one-to-one or many-to-one type pipes, use Store
pipe = simpy.Store(env)
env.process(message_generator('Generator A', env, pipe))
env.process(message_consumer('Consumer A', env, pipe))
print('\nOne-to-one pipe communication\n')
env.run(until=SIM_TIME)
# For one-to many use BroadcastPipe
# (Note: could also be used for one-to-one,many-to-one or many-to-many)
env = simpy.Environment()
bc_pipe = BroadcastPipe(env)
env.process(message_generator('Generator A', env, bc_pipe))
env.process(message_consumer('Consumer A', env, bc_pipe.get_output_conn()))
env.process(message_consumer('Consumer B', env, bc_pipe.get_output_conn()))
print('\nOne-to-many pipe communication\n')
env.run(until=SIM_TIME)
|