diff options
Diffstat (limited to 'docs/examples/pilib/scheduler.scala')
-rw-r--r-- | docs/examples/pilib/scheduler.scala | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/docs/examples/pilib/scheduler.scala b/docs/examples/pilib/scheduler.scala new file mode 100644 index 0000000000..3b08a9df66 --- /dev/null +++ b/docs/examples/pilib/scheduler.scala @@ -0,0 +1,149 @@ +package examples.pilib; + +import scala.concurrent.pilib._; + +object scheduler { + + /** + * Random number generator. + */ + val random = new java.util.Random(); + + //***************** Scheduler ******************// + + /** + * A cell of the scheduler whose attached agent is allowed to start. + */ + def A(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { + ///- ... complete here ... + choice ( a * { x => C(a, b)(d, c) }) + ///+ + } + + /** + * A cell of the scheduler in another intermediate state. + */ + def C(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { + ///- ... complete here ... + choice (c * { x => B(a, b)(d, c) }) + ///+ + } + + /** + * A cell of the scheduler whose attached agent is allowed to finish. + */ + def B(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { + ///- ... complete here ... + // choice (b * { x => D(a, b)(d, c) }) // incorrect naive solution + choice ( + b * { x => choice ( d(()) * A(a, b)(d, c) ) }, // b.'d.A + d(()) * (choice (b * { x => A(a, b)(d, c) })) // 'd.b.A + ) + ///+ + } + + /** + * A cell of the scheduler whose attached agent is not yet allowed to start. + */ + def D(a: Chan[unit], b: Chan[unit])(d: Chan[unit], c: Chan[unit]): unit = { + ///- ... complete here ... + choice (d(()) * A(a, b)(d, c)) + ///+ + } + + //***************** Agents ******************// + + def agent(i: Int)(a: Chan[unit], b: Chan[unit]): unit = { + // 50% chance that we sleep forever + if (i == 0 && random.nextInt(10) < 5) { + a.attach(x => System.out.println("Start and sleeps ----> " + i)); + Thread.sleep(random.nextInt(1000)); + a.write(()); + } + else { + a.attach(x => System.out.println("Start ----> " + i)); + b.attach(x => System.out.println("Stop -> " + i)); + Thread.sleep(random.nextInt(1000)); + a.write(()); + Thread.sleep(random.nextInt(1000)); + b.write(()); + agent(i)(a, b) + } + } + + //***************** Entry function ******************// + + /** + * Creates a scheduler for five agents (programs). + */ + + def main(args: Array[String]): unit = { + val agentNb = 5; + val agents = List.range(0, agentNb) map agent; + scheduleAgents(agents); + } + + //***************** Infrastructure *****************// + + /** + * A cell is modelled as a function that takes as parameters + * input and output channels and which returns nothing. + */ + type Cell = (Chan[unit], Chan[unit]) => unit; + + /** + * Creates a cell composed of two cells linked together. + */ + def join(cell1: Cell, cell2: Cell): Cell = + (l: Chan[unit], r: Chan[unit]) => { + val link = new Chan[unit]; + spawn < cell1(l, link) | cell2(link, r) > + }; + + /** + * Links the output of a cell to its input. + */ + def close(cell: Cell): unit = { + val a = new Chan[unit]; + cell(a, a) + } + + /** + * Creates a cell consisting of a chain of cells. + */ + def chain(cells: List[Cell]): Cell = + cells reduceLeft join; + + /** + * Creates a cell consisting of a chain of cells. + */ + def makeRing(cells: List[Cell]): unit = + close(chain(cells)); + + /** + * An agent is modelled as a function that takes as parameters channels to + * signal that it has started or finished. + */ + type Agent = (Chan[unit], Chan[unit]) => unit; + + /** + * Takes a list of agents and schedules them. + */ + def scheduleAgents(agents: List[Agent]): unit = { + var firstAgent = true; + val cells = agents map (ag => { + val a = new Chan[unit]; + val b = new Chan[unit]; + spawn < ag(a, b) >; + if (firstAgent) { + firstAgent = false; + A(a, b) + } + else + D(a, b) + }); + makeRing(cells) + } +} + + |