diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-05-28 14:55:58 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-05-28 14:55:58 +0000 |
commit | 8434c271e5fb1f6130aaf87a209aa84605dd7919 (patch) | |
tree | c0f16d9b256cd38b1e85b76ffee002a299b32d21 /src/actors/scala/actors/Actor.scala | |
parent | f34e908054b82c70f1912e67ace2140c8e9af50c (diff) | |
download | scala-8434c271e5fb1f6130aaf87a209aa84605dd7919.tar.gz scala-8434c271e5fb1f6130aaf87a209aa84605dd7919.tar.bz2 scala-8434c271e5fb1f6130aaf87a209aa84605dd7919.zip |
Fixed #2010 by scheduling waitingFor check.
Diffstat (limited to 'src/actors/scala/actors/Actor.scala')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 69 |
1 files changed, 40 insertions, 29 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index ddeeca8c9f..1b0d258d1b 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -10,11 +10,8 @@ package scala.actors -import scala.collection.mutable.{HashSet, Queue} import scala.compat.Platform - import java.util.{Timer, TimerTask} - import java.util.concurrent.ExecutionException /** @@ -32,6 +29,8 @@ object Actor { // timer thread runs as daemon private[actors] val timer = new Timer(true) + private[actors] val suspendException = new SuspendActorException + /** * Returns the currently executing actor. Should be used instead * of <code>this</code> in all blocks of code executed by @@ -380,6 +379,7 @@ trait Actor extends OutputChannelActor with AbstractActor { * suspends by blocking its underlying thread, for example, * when waiting in a receive or synchronous send. */ + @volatile private var isSuspended = false /* This field is used to communicate the received message from @@ -402,22 +402,35 @@ trait Actor extends OutputChannelActor with AbstractActor { * @param replyTo the reply destination */ override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { - if (waitingFor(msg)) { + if (waitingFor ne waitingForNone) { + val savedWaitingFor = waitingFor waitingFor = waitingForNone + scheduler execute { + if (synchronized { savedWaitingFor(msg) }) { + synchronized { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } + } - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - - if (isSuspended) { - senders = replyTo :: senders - received = Some(msg) - resumeActor() - } else { - senders = List(replyTo) - // assert continuation != null - scheduler.execute(new Reaction(this, continuation, msg)) + if (isSuspended) { + synchronized { + senders = replyTo :: senders + received = Some(msg) + resumeActor() + } + } else { + synchronized { + senders = List(replyTo) + } + // assert continuation != null + (new Reaction(this, continuation, msg)).run() + } + } else synchronized { + waitingFor = savedWaitingFor + mailbox.append(msg, replyTo) + } } } else { mailbox.append(msg, replyTo) @@ -432,7 +445,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def receive[R](f: PartialFunction[Any, R]): R = { assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { @@ -459,7 +472,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links // first, remove spurious TIMEOUT message from mailbox if any @@ -473,8 +486,7 @@ trait Actor extends OutputChannelActor with AbstractActor { senders = this :: senders } else error("unhandled timeout") - } - else { + } else { waitingFor = f.isDefinedAt received = None suspendActorFor(msec) @@ -511,7 +523,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ override def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { @@ -521,8 +533,8 @@ trait Actor extends OutputChannelActor with AbstractActor { senders = List(qel.session) scheduleActor(f, qel.msg) } - throw new SuspendActorException } + throw Actor.suspendException } /** @@ -537,7 +549,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links // first, remove spurious TIMEOUT message from mailbox if any @@ -552,8 +564,7 @@ trait Actor extends OutputChannelActor with AbstractActor { } else error("unhandled timeout") - } - else { + } else { waitingFor = f.isDefinedAt val thisActor = this onTimeout = Some(new TimerTask { @@ -566,8 +577,8 @@ trait Actor extends OutputChannelActor with AbstractActor { senders = List(qel.session) scheduleActor(f, qel.msg) } - throw new SuspendActorException } + throw Actor.suspendException } /** @@ -938,7 +949,7 @@ trait Actor extends OutputChannelActor with AbstractActor { if (!links.isEmpty) exitLinked() terminated() - throw new SuspendActorException + throw Actor.suspendException } // Assume !links.isEmpty @@ -966,7 +977,7 @@ trait Actor extends OutputChannelActor with AbstractActor { this ! Exit(from, reason) } else if (reason != 'normal) - this.synchronized { + synchronized { shouldExit = true exitReason = reason // resume this Actor in a way that |