1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
package examples.pilib
import scala.concurrent.pilib._
object scheduler {
/**
* Random number generator.
*/
val random = new util.Random()
//***************** Scheduler ******************//
/**
* A cell of the scheduler whose attached agent is allowed to start.
*/
def A(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
///- ... complete here ...
choice ( a * { x => C(a, b)(d, c) })
///+
}
/**
* A cell of the scheduler in another intermediate state.
*/
def C(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
///- ... complete here ...
choice (c * { x => B(a, b)(d, c) })
///+
}
/**
* A cell of the scheduler whose attached agent is allowed to finish.
*/
def B(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
///- ... complete here ...
// choice (b * { x => D(a, b)(d, c) }) // incorrect naive solution
choice (
b * { x => choice ( d(()) * A(a, b)(d, c) ) }, // b.'d.A
d(()) * (choice (b * { x => A(a, b)(d, c) })) // 'd.b.A
)
///+
}
/**
* A cell of the scheduler whose attached agent is not yet allowed to start.
*/
def D(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
///- ... complete here ...
choice (d(()) * A(a, b)(d, c))
///+
}
//***************** Agents ******************//
def agent(i: Int)(a: Chan[Unit], b: Chan[Unit]) {
// 50% chance that we sleep forever
if (i == 0 && random.nextInt(10) < 5) {
a.attach(x => println("Start and sleeps ----> " + i))
Thread.sleep(random.nextInt(1000))
a.write(())
}
else {
a.attach(x => println("Start ----> " + i))
b.attach(x => println("Stop -> " + i))
Thread.sleep(random.nextInt(1000))
a.write(())
Thread.sleep(random.nextInt(1000))
b.write(())
agent(i)(a, b)
}
}
//***************** Entry function ******************//
/**
* Creates a scheduler for five agents (programs).
*/
def main(args: Array[String]) {
val agentNb = 5
val agents = List.range(0, agentNb) map agent
scheduleAgents(agents)
}
//***************** Infrastructure *****************//
/**
* A cell is modelled as a function that takes as parameters
* input and output channels and which returns nothing.
*/
type Cell = (Chan[Unit], Chan[Unit]) => Unit
/**
* Creates a cell composed of two cells linked together.
*/
def join(cell1: Cell, cell2: Cell): Cell =
(l: Chan[Unit], r: Chan[Unit]) => {
val link = new Chan[Unit];
spawn < cell1(l, link) | cell2(link, r) >
};
/**
* Links the output of a cell to its input.
*/
def close(cell: Cell) {
val a = new Chan[Unit]
cell(a, a)
}
/**
* Creates a cell consisting of a chain of cells.
*/
def chain(cells: List[Cell]): Cell =
cells reduceLeft join
/**
* Creates a cell consisting of a chain of cells.
*/
def makeRing(cells: List[Cell]): Unit =
close(chain(cells))
/**
* An agent is modelled as a function that takes as parameters channels to
* signal that it has started or finished.
*/
type Agent = (Chan[Unit], Chan[Unit]) => Unit
/**
* Takes a list of agents and schedules them.
*/
def scheduleAgents(agents: List[Agent]) {
var firstAgent = true;
val cells = agents map (ag => {
val a = new Chan[Unit];
val b = new Chan[Unit];
spawn < ag(a, b) >;
(d: Chan[Unit], c: Chan[Unit]) => if (firstAgent) {
firstAgent = false;
A(a, b)(d, c)
}
else
D(a, b)(d, c)
});
makeRing(cells)
}
}
|