summaryrefslogblamecommitdiff
path: root/docs/examples/pilib/scheduler.scala
blob: fd8fd526009dc78e8725c58c7333a665df106cb5 (plain) (tree)
1
2
3
4
5
6
7
8
9
                      
 
                               
 
                  

     

                             
                                



                                                    

                                                                      
                                                                     


                                        


     

                                                           
                                                                     



                                       

     

                                                                       
                                                                     

                                                                          
            




                                                              

     

                                                                              
                                                                     


                                  



                                                 
                                                   

                                           
                                                           

                                        

          

                                                




                                        
     




                                                         

                                                    
 
                                 


                                                 




                                                        
                                                              
                                                           
     
                                              

     

                                                          
                                            

                                       



                                               

                                             

                          



              

                                                   
                                      
                         

     

                                                   
                                         
                       

     


                                                                            
                                               

     

                                               
                                           

                                   

                             
                         
                                                         

                           

          
                     


                   
 


 
package examples.pilib

import scala.concurrent.pilib._

object scheduler {

  /**
   * Random number generator.
   */
  val random = new util.Random()

  //***************** Scheduler ******************//

  /**
   * A cell of the scheduler whose attached agent is allowed to start.
   */
  def A(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    choice ( a * { x => C(a, b)(d, c) })
    ///+
  }

  /**
   * A cell of the scheduler in another intermediate state.
   */
  def C(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    choice (c * { x => B(a, b)(d, c) })
    ///+
  }

  /**
   * A cell of the scheduler whose attached agent is allowed to finish.
   */
  def B(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    //     choice (b * { x => D(a, b)(d, c) }) // incorrect naive solution
    choice (
      b * { x => choice ( d(()) * A(a, b)(d, c) ) }, // b.'d.A
      d(()) * (choice (b * { x => A(a, b)(d, c) }))  // 'd.b.A
    )
    ///+
   }

  /**
   * A cell of the scheduler whose attached agent is not yet allowed to start.
   */
  def D(a: Chan[Unit], b: Chan[Unit])(d: Chan[Unit], c: Chan[Unit]) {
    ///- ... complete here ...
    choice (d(()) * A(a, b)(d, c))
    ///+
  }

  //***************** Agents ******************//

  def agent(i: Int)(a: Chan[Unit], b: Chan[Unit]) {
    // 50% chance that we sleep forever
    if (i == 0 && random.nextInt(10) < 5) {
      a.attach(x => println("Start and sleeps ----> " + i))
      Thread.sleep(random.nextInt(1000))
      a.write(())
    }
    else {
      a.attach(x => println("Start ----> " + i))
      b.attach(x => println("Stop -> " + i))
      Thread.sleep(random.nextInt(1000))
      a.write(())
      Thread.sleep(random.nextInt(1000))
      b.write(())
      agent(i)(a, b)
    }
  }

  //***************** Entry function ******************//

  /**
   * Creates a scheduler for five agents (programs).
   */

  def main(args: Array[String]) {
    val agentNb = 5
    val agents = List.range(0, agentNb) map agent
    scheduleAgents(agents)
  }

  //***************** Infrastructure *****************//

  /**
   * A cell is modelled as a function that takes as parameters
   * input and output channels and which returns nothing.  
   */
  type Cell = (Chan[Unit], Chan[Unit]) => Unit

  /**
   * Creates a cell composed of two cells linked together.
   */
  def join(cell1: Cell, cell2: Cell): Cell =
    (l: Chan[Unit], r: Chan[Unit]) => {
      val link = new Chan[Unit];
      spawn < cell1(l, link) | cell2(link, r) >
    };

  /**
   * Links the output of a cell to its input.
   */
  def close(cell: Cell) {
    val a = new Chan[Unit]
    cell(a, a)
  }

  /**
   * Creates a cell consisting of a chain of cells.
   */
  def chain(cells: List[Cell]): Cell =
    cells reduceLeft join

  /**
   * Creates a cell consisting of a chain of cells.
   */
  def makeRing(cells: List[Cell]): Unit =
    close(chain(cells))

  /**
   * An agent is modelled as a function that takes as parameters channels to
   * signal that it has started or finished.
   */
  type Agent = (Chan[Unit], Chan[Unit]) => Unit

  /**
   * Takes a list of agents and schedules them.
   */
  def scheduleAgents(agents: List[Agent]) {
    var firstAgent = true;
    val cells = agents map (ag => {
      val a = new Chan[Unit];
      val b = new Chan[Unit];
      spawn < ag(a, b) >;
      (d: Chan[Unit], c: Chan[Unit]) => if (firstAgent) {
        firstAgent = false;
        A(a, b)(d, c)
      }
      else
        D(a, b)(d, c)
    });
    makeRing(cells)
  }

}