summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2010-01-07 11:35:41 +0000
committerPhilipp Haller <hallerp@gmail.com>2010-01-07 11:35:41 +0000
commitb5c141b4ed5e2db673a306a2ba0c23a1fbe2d9ee (patch)
treef9916464bb630df0e57a7ad1c8e5616cf0155b6a
parent57f14277dad0427b90abc1e6f70e2f8ae4dcbb51 (diff)
downloadscala-b5c141b4ed5e2db673a306a2ba0c23a1fbe2d9ee.tar.gz
scala-b5c141b4ed5e2db673a306a2ba0c23a1fbe2d9ee.tar.bz2
scala-b5c141b4ed5e2db673a306a2ba0c23a1fbe2d9ee.zip
Fixed issue in Reactor/Actor that could lead to...
Fixed issue in Reactor/Actor that could lead to premature termination of actors. Added test that could reproduce it (occurred more often on larger inputs, but test should not take too much time). The issue also caused the reactor-exceptionOnSend test to timeout sometimes. Review by plocinic.
-rw-r--r--src/actors/scala/actors/Actor.scala8
-rw-r--r--src/actors/scala/actors/Debug.scala9
-rw-r--r--src/actors/scala/actors/Reactor.scala22
-rw-r--r--src/actors/scala/actors/ReactorTask.scala1
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala8
-rw-r--r--test/files/jvm/reactor-producer-consumer.check10
-rw-r--r--test/files/jvm/reactor-producer-consumer.scala75
7 files changed, 129 insertions, 4 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index fb90cb9c46..907389b9f0 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -646,6 +646,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
// guarded by lock of this
+ // never throws SuspendActorException
private[actors] override def scheduleActor(f: Any =>? Unit, msg: Any) =
if ((f eq null) && (continuation eq null)) {
// do nothing (timeout is handled instead)
@@ -825,6 +826,13 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
resumeActor()
else if (waitingFor ne waitingForNone) {
scheduleActor(continuation, null)
+ /* Here we should not throw a SuspendActorException,
+ since the current method is called from an actor that
+ is in the process of exiting.
+
+ Therefore, the contract for scheduleActor is that
+ it never throws a SuspendActorException.
+ */
}
}
}
diff --git a/src/actors/scala/actors/Debug.scala b/src/actors/scala/actors/Debug.scala
index 481b68d7f4..bad19b8aeb 100644
--- a/src/actors/scala/actors/Debug.scala
+++ b/src/actors/scala/actors/Debug.scala
@@ -27,6 +27,15 @@ object Debug {
def error(s: String) =
if (lev > 0) System.err.println("Error: " + s)
+
+ def doInfo(b: => Unit) =
+ if (lev > 2) b
+
+ def doWarning(b: => Unit) =
+ if (lev > 1) b
+
+ def doError(b: => Unit) =
+ if (lev > 0) b
}
class Debug(tag: String) {
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index d641f54eb6..8545b92d1e 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -108,8 +108,17 @@ trait Reactor extends OutputChannel[Any] {
// assert continuation != null
if (onSameThread)
continuation(item._1)
- else
+ else {
scheduleActor(continuation, item._1)
+ /* Here, we throw a SuspendActorException to avoid
+ terminating this actor when the current ReactorTask
+ is finished.
+
+ The SuspendActorException skips the termination code
+ in ReactorTask.
+ */
+ throw Actor.suspendException
+ }
}
def !(msg: Any) {
@@ -149,7 +158,14 @@ trait Reactor extends OutputChannel[Any] {
// keep going
} else {
waitingFor = handlesMessage
- done = true
+ /* Here, we throw a SuspendActorException to avoid
+ terminating this actor when the current ReactorTask
+ is finished.
+
+ The SuspendActorException skips the termination code
+ in ReactorTask.
+ */
+ throw Actor.suspendException
}
}
} else {
@@ -171,6 +187,8 @@ trait Reactor extends OutputChannel[Any] {
* an actors act method.
*
* assume handler != null
+ *
+ * never throws SuspendActorException
*/
private[actors] def scheduleActor(handler: Any =>? Unit, msg: Any) = {
val fun = () => handler(msg)
diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala
index 28e93bbbff..f6ec67e94c 100644
--- a/src/actors/scala/actors/ReactorTask.scala
+++ b/src/actors/scala/actors/ReactorTask.scala
@@ -46,6 +46,7 @@ private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun:
case e: Exception =>
Debug.info(reactor+": caught "+e)
+ Debug.doInfo { e.printStackTrace() }
reactor.terminated()
afterExecuting(e)
} finally {
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 4b31369db4..64860f4d38 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -57,8 +57,11 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
// assert continuation != null
if (onSameThread)
continuation(item._1)
- else
+ else {
scheduleActor(continuation, item._1)
+ // see Reactor.resumeReceiver
+ throw Actor.suspendException
+ }
}
// assume continuation != null
@@ -83,7 +86,8 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
// keep going
} else {
waitingFor = handlesMessage
- done = true
+ // see Reactor.searchMailbox
+ throw Actor.suspendException
}
}
} else {
diff --git a/test/files/jvm/reactor-producer-consumer.check b/test/files/jvm/reactor-producer-consumer.check
new file mode 100644
index 0000000000..d971cea19e
--- /dev/null
+++ b/test/files/jvm/reactor-producer-consumer.check
@@ -0,0 +1,10 @@
+42
+42
+42
+42
+42
+42
+42
+42
+42
+42
diff --git a/test/files/jvm/reactor-producer-consumer.scala b/test/files/jvm/reactor-producer-consumer.scala
new file mode 100644
index 0000000000..946e1561ce
--- /dev/null
+++ b/test/files/jvm/reactor-producer-consumer.scala
@@ -0,0 +1,75 @@
+import scala.actors.Reactor
+
+object Test {
+ case class Stop()
+ case class Get(from: Reactor)
+ case class Put(x: Int)
+
+ class UnboundedBuffer extends Reactor {
+ def act() {
+ react {
+ case Stop() =>
+ case Get(from) =>
+ val consumer = from
+ react {
+ case msg @ Put(x) =>
+ consumer ! x
+ act()
+ }
+ }
+ }
+ }
+
+ class Producer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor {
+ def act() {
+ var i = 0
+ while (i < n) {
+ i += 1
+ if (delay > 0) Thread.sleep(delay)
+ buf ! Put(42)
+ }
+ parent ! Stop()
+ }
+ }
+
+ class Consumer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor {
+ val step = n / 10
+ var i = 0
+ def act() {
+ if (i < n) {
+ i += 1
+ if (delay > 0) Thread.sleep(delay)
+ buf ! Get(this)
+ react {
+ case res =>
+ if (i % step == 0)
+ println(res)
+ act()
+ }
+ } else {
+ parent ! Stop()
+ }
+ }
+ }
+
+ def main(args: Array[String]) {
+ val parent = new Reactor {
+ def act() {
+ val buffer = new UnboundedBuffer
+ buffer.start()
+ val producer = new Producer(buffer, 10000, 0, this)
+ producer.start()
+ val consumer = new Consumer(buffer, 10000, 0, this)
+ consumer.start()
+ react {
+ case Stop() =>
+ react {
+ case Stop() =>
+ buffer ! Stop()
+ }
+ }
+ }
+ }
+ parent.start()
+ }
+}