diff options
author | Philipp Haller <hallerp@gmail.com> | 2010-04-25 16:50:01 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2010-04-25 16:50:01 +0000 |
commit | 1148683005899ec86f7b1028cd36671a1045e650 (patch) | |
tree | 399784d6ad72368bc942ce4a33868594ec99784f /src/actors | |
parent | a29eafaf4bbb439ef4893bf1442782d5d50dd5f5 (diff) | |
download | scala-1148683005899ec86f7b1028cd36671a1045e650.tar.gz scala-1148683005899ec86f7b1028cd36671a1045e650.tar.bz2 scala-1148683005899ec86f7b1028cd36671a1045e650.zip |
Closes #3356.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 41 | ||||
-rw-r--r-- | src/actors/scala/actors/ActorCanReply.scala | 101 | ||||
-rw-r--r-- | src/actors/scala/actors/ActorTask.scala | 7 |
3 files changed, 42 insertions, 107 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index ccd60f666c..c78447b92e 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -12,7 +12,6 @@ package scala.actors import scala.util.control.ControlThrowable import java.util.{Timer, TimerTask} -import java.util.concurrent.{ExecutionException, Callable} /** * The <code>Actor</code> object provides functions for the definition of @@ -695,37 +694,51 @@ trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with Inpu * <code>reason != 'normal</code>. * </p> */ - protected[actors] def exit(reason: AnyRef): Nothing = synchronized { - exitReason = reason + protected[actors] def exit(reason: AnyRef): Nothing = { + synchronized { + exitReason = reason + } exit() } /** * Terminates with exit reason <code>'normal</code>. */ - protected[actors] override def exit(): Nothing = synchronized { - if (!links.isEmpty) - exitLinked() + protected[actors] override def exit(): Nothing = { + val todo = synchronized { + if (!links.isEmpty) + exitLinked() + else + () => {} + } + todo() super.exit() } // Assume !links.isEmpty // guarded by this - private[actors] def exitLinked() { + private[actors] def exitLinked(): () => Unit = { _state = Actor.State.Terminated + // reset waitingFor, otherwise getState returns Suspended + waitingFor = Reactor.waitingForNone // remove this from links val mylinks = links.filterNot(this.==) - // exit linked processes - mylinks.foreach((linked: AbstractActor) => { - unlink(linked) - if (!linked.exiting) - linked.exit(this, exitReason) - }) + // unlink actors + mylinks.foreach(unlinkFrom(_)) + // return closure that locks linked actors + () => { + mylinks.foreach((linked: AbstractActor) => { + linked.synchronized { + if (!linked.exiting) + linked.exit(this, exitReason) + } + }) + } } // Assume !links.isEmpty // guarded by this - private[actors] def exitLinked(reason: AnyRef) { + private[actors] def exitLinked(reason: AnyRef): () => Unit = { exitReason = reason exitLinked() } diff --git a/src/actors/scala/actors/ActorCanReply.scala b/src/actors/scala/actors/ActorCanReply.scala index fdc3833ec4..7b8ac27405 100644 --- a/src/actors/scala/actors/ActorCanReply.scala +++ b/src/actors/scala/actors/ActorCanReply.scala @@ -10,8 +10,6 @@ package scala.actors -import java.util.concurrent.ExecutionException - /** * The `ActorCanReply` trait provides message send operations that * may result in a response from the receiver. @@ -19,19 +17,17 @@ import java.util.concurrent.ExecutionException * @author Philipp Haller */ private[actors] trait ActorCanReply extends ReactorCanReply { - thiz: AbstractActor with ReplyReactor => + this: AbstractActor with ReplyReactor => override def !?(msg: Any): Any = { - val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) - thiz.send(msg, replyCh) - replyCh.receive { - case x => x - } + val replyCh = new Channel[Any](Actor.self(scheduler)) + send(msg, replyCh) + replyCh.? } override def !?(msec: Long, msg: Any): Option[Any] = { - val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) - thiz.send(msg, replyCh) + val replyCh = new Channel[Any](Actor.self(scheduler)) + send(msg, replyCh) replyCh.receiveWithin(msec) { case TIMEOUT => None case x => Some(x) @@ -39,8 +35,8 @@ private[actors] trait ActorCanReply extends ReactorCanReply { } override def !![A](msg: Any, handler: PartialFunction[Any, A]): Future[A] = { - val ftch = new Channel[A](Actor.self(thiz.scheduler)) - thiz.send(msg, new OutputChannel[Any] { + val ftch = new Channel[A](Actor.self(scheduler)) + send(msg, new OutputChannel[Any] { def !(msg: Any) = ftch ! handler(msg) def send(msg: Any, replyTo: OutputChannel[Any]) = @@ -54,85 +50,8 @@ private[actors] trait ActorCanReply extends ReactorCanReply { } override def !!(msg: Any): Future[Any] = { - val ftch = new Channel[Any](Actor.self(thiz.scheduler)) - val linkedChannel = new AbstractActor { - def !(msg: Any) = { - ftch ! msg - thiz unlinkFrom this - } - def send(msg: Any, replyTo: OutputChannel[Any]) = { - ftch.send(msg, replyTo) - thiz unlinkFrom this - } - def forward(msg: Any) = { - ftch.forward(msg) - thiz unlinkFrom this - } - def receiver = - ftch.receiver - def linkTo(to: AbstractActor) { /* do nothing */ } - def unlinkFrom(from: AbstractActor) { /* do nothing */ } - def exit(from: AbstractActor, reason: AnyRef) { - ftch.send(Exit(from, reason), thiz) - thiz unlinkFrom this - } - // should never be invoked; return dummy value - def !?(msg: Any) = msg - // should never be invoked; return dummy value - def !?(msec: Long, msg: Any): Option[Any] = Some(msg) - // should never be invoked; return dummy value - override def !!(msg: Any): Future[Any] = { - val someChan = new Channel[Any](Actor.self(thiz.scheduler)) - Futures.fromInputChannel(someChan) - } - // should never be invoked; return dummy value - override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { - val someChan = new Channel[A](Actor.self(thiz.scheduler)) - Futures.fromInputChannel(someChan) - } - } - thiz linkTo linkedChannel - thiz.send(msg, linkedChannel) - new Future[Any](ftch) { - var exitReason: Option[Any] = None - val handleReply: PartialFunction[Any, Unit] = { - case Exit(from, reason) => - exitReason = Some(reason) - case any => - fvalue = Some(any) - } - - def apply(): Any = - if (isSet) { - if (!fvalue.isEmpty) - fvalue.get - else if (!exitReason.isEmpty) { - val reason = exitReason.get - if (reason.isInstanceOf[Throwable]) - throw new ExecutionException(reason.asInstanceOf[Throwable]) - else - throw new ExecutionException(new Exception(reason.toString())) - } - } else inputChannel.receive(handleReply andThen { _ => apply() }) - - def respond(k: Any => Unit): Unit = - if (isSet) - apply() - else - inputChannel.react(handleReply andThen { _ => k(apply()) }) - - def isSet = (fvalue match { - case None => - val handleTimeout: PartialFunction[Any, Boolean] = { - case TIMEOUT => - false - } - val whatToDo = - handleTimeout orElse (handleReply andThen { _ => true }) - inputChannel.receiveWithin(0)(whatToDo) - case Some(_) => true - }) || !exitReason.isEmpty - } + val noTransform: PartialFunction[Any, Any] = { case x => x} + this !! (msg, noTransform) } } diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index ea8624e426..2c48725e8e 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -41,12 +41,15 @@ private[actors] class ActorTask(actor: Actor, currentThread, e) - actor.synchronized { + val todo = actor.synchronized { if (!actor.links.isEmpty) actor.exitLinked(uncaught) - else + else { super.terminateExecution(e) + () => {} + } } + todo() } } |