summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/actors/scala/actors/Actor.scala8
-rw-r--r--src/actors/scala/actors/Debug.scala9
-rw-r--r--src/actors/scala/actors/Reactor.scala22
-rw-r--r--src/actors/scala/actors/ReactorTask.scala1
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala8
-rw-r--r--test/files/jvm/reactor-producer-consumer.check10
-rw-r--r--test/files/jvm/reactor-producer-consumer.scala75
7 files changed, 129 insertions, 4 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index fb90cb9c46..907389b9f0 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -646,6 +646,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
// guarded by lock of this
+ // never throws SuspendActorException
private[actors] override def scheduleActor(f: Any =>? Unit, msg: Any) =
if ((f eq null) && (continuation eq null)) {
// do nothing (timeout is handled instead)
@@ -825,6 +826,13 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
resumeActor()
else if (waitingFor ne waitingForNone) {
scheduleActor(continuation, null)
+ /* Here we should not throw a SuspendActorException,
+ since the current method is called from an actor that
+ is in the process of exiting.
+
+ Therefore, the contract for scheduleActor is that
+ it never throws a SuspendActorException.
+ */
}
}
}
diff --git a/src/actors/scala/actors/Debug.scala b/src/actors/scala/actors/Debug.scala
index 481b68d7f4..bad19b8aeb 100644
--- a/src/actors/scala/actors/Debug.scala
+++ b/src/actors/scala/actors/Debug.scala
@@ -27,6 +27,15 @@ object Debug {
def error(s: String) =
if (lev > 0) System.err.println("Error: " + s)
+
+ def doInfo(b: => Unit) =
+ if (lev > 2) b
+
+ def doWarning(b: => Unit) =
+ if (lev > 1) b
+
+ def doError(b: => Unit) =
+ if (lev > 0) b
}
class Debug(tag: String) {
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index d641f54eb6..8545b92d1e 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -108,8 +108,17 @@ trait Reactor extends OutputChannel[Any] {
// assert continuation != null
if (onSameThread)
continuation(item._1)
- else
+ else {
scheduleActor(continuation, item._1)
+ /* Here, we throw a SuspendActorException to avoid
+ terminating this actor when the current ReactorTask
+ is finished.
+
+ The SuspendActorException skips the termination code
+ in ReactorTask.
+ */
+ throw Actor.suspendException
+ }
}
def !(msg: Any) {
@@ -149,7 +158,14 @@ trait Reactor extends OutputChannel[Any] {
// keep going
} else {
waitingFor = handlesMessage
- done = true
+ /* Here, we throw a SuspendActorException to avoid
+ terminating this actor when the current ReactorTask
+ is finished.
+
+ The SuspendActorException skips the termination code
+ in ReactorTask.
+ */
+ throw Actor.suspendException
}
}
} else {
@@ -171,6 +187,8 @@ trait Reactor extends OutputChannel[Any] {
* an actors act method.
*
* assume handler != null
+ *
+ * never throws SuspendActorException
*/
private[actors] def scheduleActor(handler: Any =>? Unit, msg: Any) = {
val fun = () => handler(msg)
diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala
index 28e93bbbff..f6ec67e94c 100644
--- a/src/actors/scala/actors/ReactorTask.scala
+++ b/src/actors/scala/actors/ReactorTask.scala
@@ -46,6 +46,7 @@ private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun:
case e: Exception =>
Debug.info(reactor+": caught "+e)
+ Debug.doInfo { e.printStackTrace() }
reactor.terminated()
afterExecuting(e)
} finally {
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 4b31369db4..64860f4d38 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -57,8 +57,11 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
// assert continuation != null
if (onSameThread)
continuation(item._1)
- else
+ else {
scheduleActor(continuation, item._1)
+ // see Reactor.resumeReceiver
+ throw Actor.suspendException
+ }
}
// assume continuation != null
@@ -83,7 +86,8 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
// keep going
} else {
waitingFor = handlesMessage
- done = true
+ // see Reactor.searchMailbox
+ throw Actor.suspendException
}
}
} else {
diff --git a/test/files/jvm/reactor-producer-consumer.check b/test/files/jvm/reactor-producer-consumer.check
new file mode 100644
index 0000000000..d971cea19e
--- /dev/null
+++ b/test/files/jvm/reactor-producer-consumer.check
@@ -0,0 +1,10 @@
+42
+42
+42
+42
+42
+42
+42
+42
+42
+42
diff --git a/test/files/jvm/reactor-producer-consumer.scala b/test/files/jvm/reactor-producer-consumer.scala
new file mode 100644
index 0000000000..946e1561ce
--- /dev/null
+++ b/test/files/jvm/reactor-producer-consumer.scala
@@ -0,0 +1,75 @@
+import scala.actors.Reactor
+
+object Test {
+ case class Stop()
+ case class Get(from: Reactor)
+ case class Put(x: Int)
+
+ class UnboundedBuffer extends Reactor {
+ def act() {
+ react {
+ case Stop() =>
+ case Get(from) =>
+ val consumer = from
+ react {
+ case msg @ Put(x) =>
+ consumer ! x
+ act()
+ }
+ }
+ }
+ }
+
+ class Producer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor {
+ def act() {
+ var i = 0
+ while (i < n) {
+ i += 1
+ if (delay > 0) Thread.sleep(delay)
+ buf ! Put(42)
+ }
+ parent ! Stop()
+ }
+ }
+
+ class Consumer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor {
+ val step = n / 10
+ var i = 0
+ def act() {
+ if (i < n) {
+ i += 1
+ if (delay > 0) Thread.sleep(delay)
+ buf ! Get(this)
+ react {
+ case res =>
+ if (i % step == 0)
+ println(res)
+ act()
+ }
+ } else {
+ parent ! Stop()
+ }
+ }
+ }
+
+ def main(args: Array[String]) {
+ val parent = new Reactor {
+ def act() {
+ val buffer = new UnboundedBuffer
+ buffer.start()
+ val producer = new Producer(buffer, 10000, 0, this)
+ producer.start()
+ val consumer = new Consumer(buffer, 10000, 0, this)
+ consumer.start()
+ react {
+ case Stop() =>
+ react {
+ case Stop() =>
+ buffer ! Stop()
+ }
+ }
+ }
+ }
+ parent.start()
+ }
+}