summaryrefslogblamecommitdiff
path: root/sources/examples/pilib/elasticBuffer.scala
blob: 50e61c291a748640052ca6fb4cd36f386cf88293 (plain) (tree)










































































                                                                              
object elasticBuffer {

  import scala.concurrent.pilib._;

  /**
  * Recursive type for channels that carry a "String" channel and
  * an object of the type we define.
  */
  class MetaChan extends Chan[Pair[Chan[String], MetaChan]];

  def Buffer(put: Chan[String], get: Chan[String]): unit =  {

    /**
    * An empty buffer cell, ready to pass on (o,r) to the left.
    */
    def Bl(i:Chan[String], l: MetaChan,
	   o: Chan[String], r: MetaChan): unit =
	     choice (
	       l(Pair(o,r)) * (System.out.println("Removed one cell.")),
	       i * (inp => Cl(i, l, o, r, inp))
	     );

    /**
    * A buffer cell containing a value, ready to receive (o,r) from the right.
    */
    def Cl(i: Chan[String], l: MetaChan,
	  o: Chan[String], r: MetaChan, content: String): unit =
	    choice (
	      o(content) * (Bl(i,l,o,r)),
	      i * (inp => Dl(i,l,o,r,content, inp)),
	      r * ( { case Pair(newo, newr) => Cl(i,l,newo,newr,content) })
	    );

    /**
    * Two joined buffer cells, of type Cl.
    */
    def Dl(i: Chan[String], l: MetaChan,
	  o: Chan[String], r: MetaChan,
	  content: String, inp: String): unit = {
      val newlr = new MetaChan;
      val newio = new Chan[String];
      spawn < Cl(i, l, newio, newlr, inp) | Cl(newio, newlr, o, r,content) >;
    }

    // l and r channels for the leftmost and rightmost cell, respectively.
    val unused1 = new MetaChan;
    val unused2 = new MetaChan;

    Bl(put, unused1, get, unused2);
  }

  val random = new java.util.Random();

  def Producer(n: int, put: Chan[String]): unit = {
    Thread.sleep(1 + random.nextInt(1000));
    val msg = "object " + n;
    put.write(msg);
    System.out.println("Producer gave " + msg);
    Producer(n + 1, put)
  }

  def Consumer(get: Chan[String]): unit = {
    Thread.sleep(1 + random.nextInt(1000));
    val msg = get.read;
    System.out.println("Consummer took " + msg);
    Consumer(get)
  }

  def main(args: Array[String]): unit = {
    val put = new Chan[String];
    val get = new Chan[String];
    spawn < Producer(0, put) | Consumer(get) | Buffer(put, get) >
  }

}