package examples.pilib /** * From Pi to Scala: Semaphores, monitors, read/write locks. * Readers/writers locks. */ object rwlock { import scala.concurrent.pilib._ class Signal extends Chan[unit] { def send = write(()) def receive = read } class CountLock { private val busy = new Signal def get = busy.send def release = busy.receive spawn < release > } /** A binary semaphore */ class Lock { private val busy = new Signal; private val free = new Signal; def get = busy.send; def release = free.send; spawn < (while (true) { choice ( busy * (x => free.receive), free * (x => ()) ) }) > } /** A monitor a la Java */ class JavaMonitor { private val lock = new Lock private var waiting: List[Signal] = Nil def Wait = { val s = new Signal waiting = s :: waiting lock.release s.receive lock.get } def Notify = if (!waiting.isEmpty) { waiting.head.send waiting = waiting.tail } def NotifyAll = while (!waiting.isEmpty) { waiting.head.send waiting = waiting.tail } def await(cond: => boolean): unit = while (false == cond) (Wait) } /* class Buffer[a](size: Int) extends JavaMonitor with { var in = 0, out = 0, n = 0; val elems = new Array[a](size); def put(x: a) = synchronized { await(n < size); elems(out) = x; out = (out + 1) % size; } def get: a = synchronized { await(n > 0); val x = elems(in); in = (in + 1) % size; x } } */ /** A readers/writers lock. */ trait ReadWriteLock { def startRead: unit def startWrite: unit def endRead: unit def endWrite: unit } /** * A readers/writers lock, using monitor abstractions. */ class ReadWriteLock1 extends JavaMonitor with ReadWriteLock { private var nactive: int = 0 private var nwriters: int = 0 def status = System.out.println(nactive + " active, " + nwriters + " writers"); def startRead = synchronized { await(nwriters == 0) nactive = nactive + 1 status } def startWrite = synchronized { nwriters = nwriters + 1 await(nactive == 0) nactive = 1 status } def endRead = synchronized { nactive = nactive - 1 if (nactive == 0) NotifyAll status } def endWrite = synchronized { nwriters = nwriters - 1 nactive = 0 NotifyAll status } } /** A readers/writers lock, using semaphores */ class ReadWriteLock2 extends ReadWriteLock { private var rc: int = 0 // reading readers private var wc: int = 0 // writing writers private var rwc: int = 0 // waiting readers private var wwc: int = 0 // waiting writers private val mutex = new Lock private val rsem = new Lock private val wsem = new Lock def startRead = { mutex.get; if (wwc > 0 || wc > 0) { rwc = rwc + 1; mutex.release; rsem.get; rwc = rwc - 1 } rc = rc + 1; if (rwc > 0) rsem.release; mutex.release } def startWrite = { mutex.get; if (rc > 0 || wc > 0) { wwc = wwc + 1; mutex.release; wsem.get; wwc = wwc - 1 } wc = wc + 1; mutex.release } def endRead = { mutex.get; rc = rc - 1; if (rc == 0 && wwc > 0) wsem.release; mutex.release } def endWrite = { mutex.get; wc = wc - 1; if (rwc > 0) rsem.release else if (wwc > 0) wsem.release; mutex.release } } /** A readers/writers lock, using channels, without priortities */ class ReadWriteLock3 extends ReadWriteLock { private val sr = new Signal private val er = new Signal private val sw = new Signal private val ew = new Signal def startRead = sr.send def startWrite = sw.send def endRead = er.send def endWrite = ew.send private def rwlock: unit = choice ( sr * (x => reading(1)), sw * (x => { ew.receive; rwlock }) ) private def reading(n: int): unit = choice ( sr * (x => reading(n+1)), er * (x => if (n == 1) rwlock else reading(n-1)) ) spawn < rwlock > } /** Same, with sequencing */ class ReadWriteLock4 extends ReadWriteLock { private val rwlock = new ReadWriteLock3 private val sr = new Signal private val ww = new Signal private val sw = new Signal def startRead = sr.send def startWrite = { ww.send; sw.send } def endRead = rwlock.endRead def endWrite = rwlock.endWrite private def queue: unit = choice ( sr * (x => { rwlock.startRead ; queue }), ww * (x => { rwlock.startWrite; sw.receive; queue }) ) spawn < queue >; } /** Readwritelock where writers always have priority over readers */ class ReadWriteLock5 extends ReadWriteLock { private val sr = new Signal private val er = new Signal private val ww = new Signal private val sw = new Signal private val ew = new Signal def startRead = sr.send def startWrite = { ww.send; sw.send } def endRead = er.send def endWrite = ew.send private def Reading(nr: int, nw: int): unit = if (nr == 0 && nw == 0) choice ( sr * (x => Reading(1, 0)), ww * (x => Reading(0, 1)) ) else if (nr == 0 && nw != 0) { sw.receive; Writing(nw); } else if (nr != 0 && nw == 0) choice ( sr * (x => Reading(nr + 1, 0)), er * (x => Reading(nr - 1, 0)), ww * (x => Reading(nr, 1)) ) else if (nr != 0 && nw != 0) choice ( ww * (x => Reading(nr, nw + 1)), er * (x => Reading(nr - 1, nw)) ); private def Writing(nw: int): unit = choice ( ew * (x => Reading(0, nw - 1)), ww * (x => Writing(nw + 1)) ); spawn < Reading(0, 0) >; } /** * Main function. */ def main(args: Array[String]): unit = { val random = new java.util.Random() def reader(i: int, rwlock: ReadWriteLock): unit = { Thread.sleep(1 + random.nextInt(100)) System.err.println("Reader " + i + " wants to read.") rwlock.startRead System.err.println("Reader " + i + " is reading.") Thread.sleep(1 + random.nextInt(100)) rwlock.endRead System.err.println("Reader " + i + " has read.") reader(i, rwlock) } def writer(i: int, rwlock: ReadWriteLock): unit = { Thread.sleep(1 + random.nextInt(100)) System.err.println("Writer " + i + " wants to write.") rwlock.startWrite System.err.println("Writer " + i + " is writing.") Thread.sleep(1 + random.nextInt(100)) rwlock.endWrite System.err.println("Writer " + i + " has written.") writer(i, rwlock) } val n = try { Integer.parseInt(args(0)) } catch { case _ => 0 } if (n < 1 || 5 < n) { Console.println("Usage: scala examples.pilib.rwlock (n=1..5)") exit } val rwlock = n match { case 1 => new ReadWriteLock1 case 2 => new ReadWriteLock2 case 3 => new ReadWriteLock3 case 4 => new ReadWriteLock4 case 5 => new ReadWriteLock5 } List.range(0, 5) foreach (i => spawn < reader(i, rwlock) >) List.range(0, 5) foreach (i => spawn < writer(i, rwlock) >) } }