diff options
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/Debug.scala | 9 | ||||
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 22 | ||||
-rw-r--r-- | src/actors/scala/actors/ReactorTask.scala | 1 | ||||
-rw-r--r-- | src/actors/scala/actors/ReplyReactor.scala | 8 | ||||
-rw-r--r-- | test/files/jvm/reactor-producer-consumer.check | 10 | ||||
-rw-r--r-- | test/files/jvm/reactor-producer-consumer.scala | 75 |
7 files changed, 129 insertions, 4 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index fb90cb9c46..907389b9f0 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -646,6 +646,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { } // guarded by lock of this + // never throws SuspendActorException private[actors] override def scheduleActor(f: Any =>? Unit, msg: Any) = if ((f eq null) && (continuation eq null)) { // do nothing (timeout is handled instead) @@ -825,6 +826,13 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { resumeActor() else if (waitingFor ne waitingForNone) { scheduleActor(continuation, null) + /* Here we should not throw a SuspendActorException, + since the current method is called from an actor that + is in the process of exiting. + + Therefore, the contract for scheduleActor is that + it never throws a SuspendActorException. + */ } } } diff --git a/src/actors/scala/actors/Debug.scala b/src/actors/scala/actors/Debug.scala index 481b68d7f4..bad19b8aeb 100644 --- a/src/actors/scala/actors/Debug.scala +++ b/src/actors/scala/actors/Debug.scala @@ -27,6 +27,15 @@ object Debug { def error(s: String) = if (lev > 0) System.err.println("Error: " + s) + + def doInfo(b: => Unit) = + if (lev > 2) b + + def doWarning(b: => Unit) = + if (lev > 1) b + + def doError(b: => Unit) = + if (lev > 0) b } class Debug(tag: String) { diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index d641f54eb6..8545b92d1e 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -108,8 +108,17 @@ trait Reactor extends OutputChannel[Any] { // assert continuation != null if (onSameThread) continuation(item._1) - else + else { scheduleActor(continuation, item._1) + /* Here, we throw a SuspendActorException to avoid + terminating this actor when the current ReactorTask + is finished. + + The SuspendActorException skips the termination code + in ReactorTask. + */ + throw Actor.suspendException + } } def !(msg: Any) { @@ -149,7 +158,14 @@ trait Reactor extends OutputChannel[Any] { // keep going } else { waitingFor = handlesMessage - done = true + /* Here, we throw a SuspendActorException to avoid + terminating this actor when the current ReactorTask + is finished. + + The SuspendActorException skips the termination code + in ReactorTask. + */ + throw Actor.suspendException } } } else { @@ -171,6 +187,8 @@ trait Reactor extends OutputChannel[Any] { * an actors act method. * * assume handler != null + * + * never throws SuspendActorException */ private[actors] def scheduleActor(handler: Any =>? Unit, msg: Any) = { val fun = () => handler(msg) diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala index 28e93bbbff..f6ec67e94c 100644 --- a/src/actors/scala/actors/ReactorTask.scala +++ b/src/actors/scala/actors/ReactorTask.scala @@ -46,6 +46,7 @@ private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun: case e: Exception => Debug.info(reactor+": caught "+e) + Debug.doInfo { e.printStackTrace() } reactor.terminated() afterExecuting(e) } finally { diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala index 4b31369db4..64860f4d38 100644 --- a/src/actors/scala/actors/ReplyReactor.scala +++ b/src/actors/scala/actors/ReplyReactor.scala @@ -57,8 +57,11 @@ trait ReplyReactor extends Reactor with ReplyableReactor { // assert continuation != null if (onSameThread) continuation(item._1) - else + else { scheduleActor(continuation, item._1) + // see Reactor.resumeReceiver + throw Actor.suspendException + } } // assume continuation != null @@ -83,7 +86,8 @@ trait ReplyReactor extends Reactor with ReplyableReactor { // keep going } else { waitingFor = handlesMessage - done = true + // see Reactor.searchMailbox + throw Actor.suspendException } } } else { diff --git a/test/files/jvm/reactor-producer-consumer.check b/test/files/jvm/reactor-producer-consumer.check new file mode 100644 index 0000000000..d971cea19e --- /dev/null +++ b/test/files/jvm/reactor-producer-consumer.check @@ -0,0 +1,10 @@ +42 +42 +42 +42 +42 +42 +42 +42 +42 +42 diff --git a/test/files/jvm/reactor-producer-consumer.scala b/test/files/jvm/reactor-producer-consumer.scala new file mode 100644 index 0000000000..946e1561ce --- /dev/null +++ b/test/files/jvm/reactor-producer-consumer.scala @@ -0,0 +1,75 @@ +import scala.actors.Reactor + +object Test { + case class Stop() + case class Get(from: Reactor) + case class Put(x: Int) + + class UnboundedBuffer extends Reactor { + def act() { + react { + case Stop() => + case Get(from) => + val consumer = from + react { + case msg @ Put(x) => + consumer ! x + act() + } + } + } + } + + class Producer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor { + def act() { + var i = 0 + while (i < n) { + i += 1 + if (delay > 0) Thread.sleep(delay) + buf ! Put(42) + } + parent ! Stop() + } + } + + class Consumer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor { + val step = n / 10 + var i = 0 + def act() { + if (i < n) { + i += 1 + if (delay > 0) Thread.sleep(delay) + buf ! Get(this) + react { + case res => + if (i % step == 0) + println(res) + act() + } + } else { + parent ! Stop() + } + } + } + + def main(args: Array[String]) { + val parent = new Reactor { + def act() { + val buffer = new UnboundedBuffer + buffer.start() + val producer = new Producer(buffer, 10000, 0, this) + producer.start() + val consumer = new Consumer(buffer, 10000, 0, this) + consumer.start() + react { + case Stop() => + react { + case Stop() => + buffer ! Stop() + } + } + } + } + parent.start() + } +} |