From 85e0fb8ca8c46aa7bec19bbdf95112c1b47dc43f Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 17 Apr 2008 16:04:55 +0000 Subject: Applied bug fix for t595 --- src/library/scala/concurrent/SyncChannel.scala | 83 ++++++++++++++++++-------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/src/library/scala/concurrent/SyncChannel.scala b/src/library/scala/concurrent/SyncChannel.scala index afbf067881..3816eff3aa 100644 --- a/src/library/scala/concurrent/SyncChannel.scala +++ b/src/library/scala/concurrent/SyncChannel.scala @@ -11,34 +11,67 @@ package scala.concurrent -/** The class SyncChannel ... +/** A SyncChannel allows one to exchange data + * synchronously between a reader and a writer thread. + * The writer thread is blocked until the data to be written + * has been read by a corresponding reader thread. * - * @author Martin Odersky - * @version 1.0, 10/03/2003 + * @author Philipp Haller + * @version 2.0, 04/17/2008 */ -class SyncChannel[a] { - private var data: a = _ - private var reading = false - private var writing = false - - def await(cond: => Boolean) = while (!cond) wait() - - def write(x: a) = synchronized { - await(!writing) - data = x - writing = true - if (reading) notifyAll() - else await(reading) +class SyncChannel[A] { + + private var pendingWrites = List[(A, SyncVar[Boolean])]() + private var pendingReads = List[SyncVar[A]]() + + def write(data: A) { + // create write request + val writeReq = new SyncVar[Boolean] + + this.synchronized { + // check whether there is a reader waiting + if (!pendingReads.isEmpty) { + val readReq = pendingReads.head + pendingReads = pendingReads.tail + + // let reader continue + readReq set data + + // resolve write request + writeReq set true + } + else { + // enqueue write request + pendingWrites = pendingWrites ::: List((data, writeReq)) + } + } + + writeReq.get } - def read: a = synchronized { - await(!reading) - reading = true - await(writing) - val x = data - writing = false - reading = false - notifyAll() - x + def read: A = { + // create read request + val readReq = new SyncVar[A] + + this.synchronized { + // check whether there is a writer waiting + if (!pendingWrites.isEmpty) { + // read data + val (data, writeReq) = pendingWrites.head + pendingWrites = pendingWrites.tail + + // let writer continue + writeReq set true + + // resolve read request + readReq set data + } + else { + // enqueue read request + pendingReads = pendingReads ::: List(readReq) + } + } + + readReq.get } } -- cgit v1.2.3