object scheduler { import scala.concurrent.pilib._; /** * Random number generator. */ val random = new java.util.Random(); //***************** Scheduler ******************// /** * A cell of the scheduler whose attached agent is allowed to start. */ def A(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { a.read; C(a,b)(d,c) } /** * A cell of the scheduler in an intermediate state (incorrect) */ // def B(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { // b.read; // D(a,b)(d,c) // } /** * A cell of the scheduler in an intermediate state (correct). */ def B(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = choice ( b * (x => D(a,b)(d,c)), d(()) * ({ b.read; A(a,b)(d,c)}) ); /** * A cell of the scheduler in another intermediate state. */ def C(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { c.read; B(a,b)(d,c) } /** * A cell of the scheduler whose attached agent is not yet allowed to start. */ def D(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { d.write(()); A(a,b)(d,c) } //***************** Agents ******************// def agent(i: Int)(a: Chan[unit], b: Chan[unit]): unit = { Thread.sleep(random.nextInt(1000) + 1); a.write(()); System.out.println("Starting agent " + i); Thread.sleep(random.nextInt(1000) + 1); // 10% chance that we sleep for a long while. if(random.nextInt(10) == 0) { System.out.println("Agent " + i + " sleeps"); Thread.sleep(20000); } b.write(()); System.out.println("Ending agent " + i); agent(i)(a, b) } //***************** Entry function ******************// /** * Creates a scheduler for five agents (programs). */ def main(args: Array[String]): unit = { val agentNb = 5; val agents = for(val i <- List.range(0, agentNb)) yield agent(i); scheduleAgents(agents); } //***************** Infrastructure *****************// /** * A cell is modelled as a function that takes as parameters * input and output channels and which returns nothing. */ type Cell = (Chan[unit], Chan[unit]) => unit; /** * Creates a cell composed of two cells linked together. */ def join(cell1: Cell, cell2: Cell): Cell = (l: Chan[unit], r: Chan[unit]) => { val link = new Chan[unit]; spawn < cell1(l, link) | cell2(link, r) > }; /** * Links the output of a cell to its input. */ def close(cell: Cell): unit = { val a = new Chan[unit]; cell(a, a) } /** * Creates a cell consisting of a chain of cells. */ def chain(cells: List[Cell]): Cell = cells reduceLeft join; /** * Creates a cell consisting of a chain of cells. */ def makeRing(cells: List[Cell]): unit = close(chain(cells)); /** * An agent is modelled as a function that takes as parameters channels to * signal that it has started or finished. */ type Agent = (Chan[unit], Chan[unit]) => unit; /** * Takes a list of agents and schedules them. */ def scheduleAgents(agents: List[Agent]): unit = { var firstAgent = true; val cells = agents map (ag => { val a = new Chan[unit]; val b = new Chan[unit]; spawn < ag(a, b) >; if (firstAgent) { firstAgent = false; A(a, b) } else D(a, b) }); makeRing(cells) } }