summaryrefslogtreecommitdiff
path: root/docs/examples/pilib/scheduler.scala
blob: fd8fd526009dc78e8725c58c7333a665df106cb5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package examples.pilib

import scala.concurrent.pilib._

object scheduler {

  /**
   * Random number generator.
   */
  val random = new util.Random()

  //***************** Scheduler ******************//

  /**
   * A cell of the scheduler whose attached agent is allowed to start.
   */
  def A(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    choice ( a * { x => C(a, b)(d, c) })
    ///+
  }

  /**
   * A cell of the scheduler in another intermediate state.
   */
  def C(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    choice (c * { x => B(a, b)(d, c) })
    ///+
  }

  /**
   * A cell of the scheduler whose attached agent is allowed to finish.
   */
  def B(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    //     choice (b * { x => D(a, b)(d, c) }) // incorrect naive solution
    choice (
      b * { x => choice ( d(()) * A(a, b)(d, c) ) }, // b.'d.A
      d(()) * (choice (b * { x => A(a, b)(d, c) }))  // 'd.b.A
    )
    ///+
   }

  /**
   * A cell of the scheduler whose attached agent is not yet allowed to start.
   */
  def D(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    choice (d(()) * A(a, b)(d, c))
    ///+
  }

  //***************** Agents ******************//

  def agent(i: Int)(a: Chan[Unit], b: Chan[Unit]) {
    // 50% chance that we sleep forever
    if (i == 0 && random.nextInt(10) < 5) {
      a.attach(x => println("Start and sleeps ----> " + i))
      Thread.sleep(random.nextInt(1000))
      a.write(())
    }
    else {
      a.attach(x => println("Start ----> " + i))
      b.attach(x => println("Stop -> " + i))
      Thread.sleep(random.nextInt(1000))
      a.write(())
      Thread.sleep(random.nextInt(1000))
      b.write(())
      agent(i)(a, b)
    }
  }

  //***************** Entry function ******************//

  /**
   * Creates a scheduler for five agents (programs).
   */

  def main(args: Array[String]) {
    val agentNb = 5
    val agents = List.range(0, agentNb) map agent
    scheduleAgents(agents)
  }

  //***************** Infrastructure *****************//

  /**
   * A cell is modelled as a function that takes as parameters
   * input and output channels and which returns nothing.  
   */
  type Cell = (Chan[Unit], Chan[Unit]) => Unit

  /**
   * Creates a cell composed of two cells linked together.
   */
  def join(cell1: Cell, cell2: Cell): Cell =
    (l: Chan[Unit], r: Chan[Unit]) => {
      val link = new Chan[Unit];
      spawn < cell1(l, link) | cell2(link, r) >
    };

  /**
   * Links the output of a cell to its input.
   */
  def close(cell: Cell) {
    val a = new Chan[Unit]
    cell(a, a)
  }

  /**
   * Creates a cell consisting of a chain of cells.
   */
  def chain(cells: List[Cell]): Cell =
    cells reduceLeft join

  /**
   * Creates a cell consisting of a chain of cells.
   */
  def makeRing(cells: List[Cell]): Unit =
    close(chain(cells))

  /**
   * An agent is modelled as a function that takes as parameters channels to
   * signal that it has started or finished.
   */
  type Agent = (Chan[Unit], Chan[Unit]) => Unit

  /**
   * Takes a list of agents and schedules them.
   */
  def scheduleAgents(agents: List[Agent]) {
    var firstAgent = true;
    val cells = agents map (ag => {
      val a = new Chan[Unit];
      val b = new Chan[Unit];
      spawn < ag(a, b) >;
      (d: Chan[Unit], c: Chan[Unit]) => if (firstAgent) {
        firstAgent = false;
        A(a, b)(d, c)
      }
      else
        D(a, b)(d, c)
    });
    makeRing(cells)
  }

}