summaryrefslogblamecommitdiff
path: root/sources/examples/pilib/rwlock.scala
blob: 65578ea51bee8534a3d5e72908e8a41928dbedfd (plain) (tree)







































































































































































































































































































































                                                                        
/**
* From Pi to Scala: Semaphores, monitors, read/write locks.
* Readers/writers locks.
*/
object rwlock {

  import scala.concurrent.pilib._;

  class Signal extends Chan[unit] {
    def send = write(());
    def receive = read;
  }

  class CountLock {
    private val busy = new Signal;
    def get = busy.send;
    def release = busy.receive;
    spawn < release >
  }

  /** A binary semaphore
  */
  class Lock {
    private val busy = new Signal;
    private val free = new Signal;
    def get = busy.send;
    def release = free.send;
    spawn < (while (true) {
      choice (
        busy * (x => free.receive),
	free * (x => ())
      )
    })
    >
  }

  /** A monitor a la Java
  */
  class JavaMonitor {

    private val lock = new Lock;

    private var waiting: List[Signal] = Nil;

    def synchronized[a](def s: a): a = {
      lock.get; val result = s; lock.release; result
    }

    def Wait = {
      val s = new Signal;
      waiting = s :: waiting;
      lock.release;
      s.receive;
      lock.get;
    }

    def Notify = {
      if (!waiting.isEmpty) {
	waiting.head.send;
	waiting = waiting.tail;
      }
    }

    def NotifyAll = {
      while (!waiting.isEmpty) {
	waiting.head.send;
	waiting = waiting.tail;
      }
    }

    def await(def cond: boolean): unit = while (false == cond) (Wait)
  }

  /*
  class Buffer[a](size: Int) extends JavaMonitor with {
    var in = 0, out = 0, n = 0;
    val elems = new Array[a](size);
    def put(x: a) = synchronized {
      await(n < size);
      elems(out) = x;
      out = (out + 1) % size;
    }
    def get: a = synchronized {
      await(n > 0);
      val x = elems(in);
      in = (in + 1) % size;
      x
    }
  }
  */

  /** A readers/writers lock. */
  trait ReadWriteLock {
    def startRead: unit;
    def startWrite: unit;
    def endRead: unit;
    def endWrite: unit;
  }

  /**
  * A readers/writers lock, using monitor abstractions.
  */
  class ReadWriteLock1 extends JavaMonitor with ReadWriteLock {

    private var nactive: int = 0;
    private var nwriters: int = 0;

    def status =
      System.out.println(nactive + " active, " + nwriters + " writers");

    def startRead = synchronized {
      await(nwriters == 0);
      nactive = nactive + 1;
      status
    }

    def startWrite = synchronized {
      nwriters = nwriters + 1;
      await(nactive == 0);
      nactive = 1;
      status
    }

    def endRead = synchronized {
      nactive = nactive - 1;
      if (nactive == 0) NotifyAll;
      status
    }

    def endWrite = synchronized {
      nwriters = nwriters - 1;
      nactive = 0;
      NotifyAll;
      status
    }
  }

  /** A readers/writers lock, using semaphores
  */
  class ReadWriteLock2 extends ReadWriteLock {

    private var rc: int = 0;  // reading readers
    private var wc: int = 0;  // writing writers
    private var rwc: int = 0; // waiting readers
    private var wwc: int = 0; // waiting writers
    private val mutex = new Lock;
    private val rsem = new Lock;
    private val wsem = new Lock;

    def startRead = {
      mutex.get;
      if (wwc > 0 || wc > 0) {
	rwc = rwc + 1;
	mutex.release;
	rsem.get;
	rwc = rwc - 1;
      }
      rc = rc + 1;
      if (rwc > 0) rsem.release;
      mutex.release
    }

    def startWrite = {
      mutex.get;
      if (rc > 0 || wc > 0) {
	wwc = wwc + 1;
	mutex.release;
	wsem.get;
	wwc = wwc - 1;
      }
      wc = wc + 1;
      mutex.release;
    }

    def endRead = {
      mutex.get;
      rc = rc - 1;
      if (rc == 0 && wwc > 0) wsem.release;
      mutex.release
    }

    def endWrite = {
      mutex.get;
      wc = wc - 1;
      if (rwc > 0)
	rsem.release
      else if (wwc > 0) wsem.release;
      mutex.release
    }
  }

  /** A readers/writers lock, using channels, without priortities
  */
  class ReadWriteLock3 extends ReadWriteLock {

    private val sr = new Signal;
    private val er = new Signal;
    private val sw = new Signal;
    private val ew = new Signal;

    def startRead = sr.send;
    def startWrite = sw.send;
    def endRead = er.send;
    def endWrite = ew.send;

    private def rwlock: unit = choice (
      sr * (x => reading(1)),
      sw * (x => { ew.receive; rwlock })
    );

    private def reading(n: int): unit = choice (
      sr * (x => reading(n+1)),
      er * (x => if (n == 1) rwlock else reading(n-1))
    );

    spawn < rwlock >;
  }

  /** Same, with sequencing
  */
  class ReadWriteLock4 extends ReadWriteLock {

    private val rwlock = new ReadWriteLock3;

    private val sr = new Signal;
    private val ww = new Signal;
    private val sw = new Signal;

    def startRead = sr.send;
    def startWrite = { ww.send; sw.send; }
    def endRead = rwlock.endRead;
    def endWrite = rwlock.endWrite;

    private def queue: unit = choice (
      sr * (x => { rwlock.startRead ; queue }),
      ww * (x => { rwlock.startWrite; sw.receive; queue })
    );

    spawn < queue >;
  }

  /** Readwritelock where writers always have priority over readers
  */
  class ReadWriteLock5 extends ReadWriteLock {

    private val sr = new Signal;
    private val er = new Signal;
    private val ww = new Signal;
    private val sw = new Signal;
    private val ew = new Signal;

    def startRead = sr.send;
    def startWrite = { ww.send; sw.send; }
    def endRead = er.send;
    def endWrite = ew.send;

    private def Reading(nr: int, nw: int): unit =
      if (nr == 0 && nw == 0)
        choice (
          sr * (x => Reading(1, 0)),
          ww * (x => Reading(0, 1))
        )
      else if (nr == 0 && nw != 0) {
        sw.receive;
	Writing(nw);
      }
      else if (nr != 0 && nw == 0)
        choice (
          sr * (x => Reading(nr + 1, 0)),
	  er * (x => Reading(nr - 1, 0)),
          ww * (x => Reading(nr, 1))
        )
      else if (nr != 0 && nw != 0)
        choice (
          ww * (x => Reading(nr, nw + 1)),
          er * (x => Reading(nr - 1, nw))
        );

    private def Writing(nw: int): unit  = choice (
      ew * (x => Reading(0, nw - 1)),
      ww * (x => Writing(nw + 1))
    );

    spawn < Reading(0, 0) >;

  }

  /**
  * Main function.
  */
  def main(args: Array[String]): unit = {
    val random = new java.util.Random();

    def reader(i: int, rwlock: ReadWriteLock): unit = {
      Thread.sleep(1 + random.nextInt(100));
      System.err.println("Reader " + i + " wants to read.");
      rwlock.startRead;
      System.err.println("Reader " + i + " is reading.");
      Thread.sleep(1 + random.nextInt(100));
      rwlock.endRead;
      System.err.println("Reader " + i + " has read.");
      reader(i, rwlock)
    }

    def writer(i: int, rwlock: ReadWriteLock): unit = {
      Thread.sleep(1 + random.nextInt(100));
      System.err.println("Writer " + i + " wants to write.");
      rwlock.startWrite;
      System.err.println("Writer " + i + " is writing.");
      Thread.sleep(1 + random.nextInt(100));
      rwlock.endWrite;
      System.err.println("Writer " + i + " has written.");
      writer(i, rwlock)
    }

    val rwlock = args(0) match {
      case "1" => new ReadWriteLock1
      case "2" => new ReadWriteLock2
      case "3" => new ReadWriteLock3
      case "4" => new ReadWriteLock4
      case "5" => new ReadWriteLock5
    }
    List.range(0, 5) foreach (i => spawn < reader(i, rwlock) >);
    List.range(0, 5) foreach (i => spawn < writer(i, rwlock) >);
  }

}