summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2010-04-25 16:50:01 +0000
committerPhilipp Haller <hallerp@gmail.com>2010-04-25 16:50:01 +0000
commit1148683005899ec86f7b1028cd36671a1045e650 (patch)
tree399784d6ad72368bc942ce4a33868594ec99784f /src
parenta29eafaf4bbb439ef4893bf1442782d5d50dd5f5 (diff)
downloadscala-1148683005899ec86f7b1028cd36671a1045e650.tar.gz
scala-1148683005899ec86f7b1028cd36671a1045e650.tar.bz2
scala-1148683005899ec86f7b1028cd36671a1045e650.zip
Closes #3356.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Actor.scala41
-rw-r--r--src/actors/scala/actors/ActorCanReply.scala101
-rw-r--r--src/actors/scala/actors/ActorTask.scala7
3 files changed, 42 insertions, 107 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index ccd60f666c..c78447b92e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -12,7 +12,6 @@ package scala.actors
import scala.util.control.ControlThrowable
import java.util.{Timer, TimerTask}
-import java.util.concurrent.{ExecutionException, Callable}
/**
* The <code>Actor</code> object provides functions for the definition of
@@ -695,37 +694,51 @@ trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with Inpu
* <code>reason != 'normal</code>.
* </p>
*/
- protected[actors] def exit(reason: AnyRef): Nothing = synchronized {
- exitReason = reason
+ protected[actors] def exit(reason: AnyRef): Nothing = {
+ synchronized {
+ exitReason = reason
+ }
exit()
}
/**
* Terminates with exit reason <code>'normal</code>.
*/
- protected[actors] override def exit(): Nothing = synchronized {
- if (!links.isEmpty)
- exitLinked()
+ protected[actors] override def exit(): Nothing = {
+ val todo = synchronized {
+ if (!links.isEmpty)
+ exitLinked()
+ else
+ () => {}
+ }
+ todo()
super.exit()
}
// Assume !links.isEmpty
// guarded by this
- private[actors] def exitLinked() {
+ private[actors] def exitLinked(): () => Unit = {
_state = Actor.State.Terminated
+ // reset waitingFor, otherwise getState returns Suspended
+ waitingFor = Reactor.waitingForNone
// remove this from links
val mylinks = links.filterNot(this.==)
- // exit linked processes
- mylinks.foreach((linked: AbstractActor) => {
- unlink(linked)
- if (!linked.exiting)
- linked.exit(this, exitReason)
- })
+ // unlink actors
+ mylinks.foreach(unlinkFrom(_))
+ // return closure that locks linked actors
+ () => {
+ mylinks.foreach((linked: AbstractActor) => {
+ linked.synchronized {
+ if (!linked.exiting)
+ linked.exit(this, exitReason)
+ }
+ })
+ }
}
// Assume !links.isEmpty
// guarded by this
- private[actors] def exitLinked(reason: AnyRef) {
+ private[actors] def exitLinked(reason: AnyRef): () => Unit = {
exitReason = reason
exitLinked()
}
diff --git a/src/actors/scala/actors/ActorCanReply.scala b/src/actors/scala/actors/ActorCanReply.scala
index fdc3833ec4..7b8ac27405 100644
--- a/src/actors/scala/actors/ActorCanReply.scala
+++ b/src/actors/scala/actors/ActorCanReply.scala
@@ -10,8 +10,6 @@
package scala.actors
-import java.util.concurrent.ExecutionException
-
/**
* The `ActorCanReply` trait provides message send operations that
* may result in a response from the receiver.
@@ -19,19 +17,17 @@ import java.util.concurrent.ExecutionException
* @author Philipp Haller
*/
private[actors] trait ActorCanReply extends ReactorCanReply {
- thiz: AbstractActor with ReplyReactor =>
+ this: AbstractActor with ReplyReactor =>
override def !?(msg: Any): Any = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
- replyCh.receive {
- case x => x
- }
+ val replyCh = new Channel[Any](Actor.self(scheduler))
+ send(msg, replyCh)
+ replyCh.?
}
override def !?(msec: Long, msg: Any): Option[Any] = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
+ val replyCh = new Channel[Any](Actor.self(scheduler))
+ send(msg, replyCh)
replyCh.receiveWithin(msec) {
case TIMEOUT => None
case x => Some(x)
@@ -39,8 +35,8 @@ private[actors] trait ActorCanReply extends ReactorCanReply {
}
override def !![A](msg: Any, handler: PartialFunction[Any, A]): Future[A] = {
- val ftch = new Channel[A](Actor.self(thiz.scheduler))
- thiz.send(msg, new OutputChannel[Any] {
+ val ftch = new Channel[A](Actor.self(scheduler))
+ send(msg, new OutputChannel[Any] {
def !(msg: Any) =
ftch ! handler(msg)
def send(msg: Any, replyTo: OutputChannel[Any]) =
@@ -54,85 +50,8 @@ private[actors] trait ActorCanReply extends ReactorCanReply {
}
override def !!(msg: Any): Future[Any] = {
- val ftch = new Channel[Any](Actor.self(thiz.scheduler))
- val linkedChannel = new AbstractActor {
- def !(msg: Any) = {
- ftch ! msg
- thiz unlinkFrom this
- }
- def send(msg: Any, replyTo: OutputChannel[Any]) = {
- ftch.send(msg, replyTo)
- thiz unlinkFrom this
- }
- def forward(msg: Any) = {
- ftch.forward(msg)
- thiz unlinkFrom this
- }
- def receiver =
- ftch.receiver
- def linkTo(to: AbstractActor) { /* do nothing */ }
- def unlinkFrom(from: AbstractActor) { /* do nothing */ }
- def exit(from: AbstractActor, reason: AnyRef) {
- ftch.send(Exit(from, reason), thiz)
- thiz unlinkFrom this
- }
- // should never be invoked; return dummy value
- def !?(msg: Any) = msg
- // should never be invoked; return dummy value
- def !?(msec: Long, msg: Any): Option[Any] = Some(msg)
- // should never be invoked; return dummy value
- override def !!(msg: Any): Future[Any] = {
- val someChan = new Channel[Any](Actor.self(thiz.scheduler))
- Futures.fromInputChannel(someChan)
- }
- // should never be invoked; return dummy value
- override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
- val someChan = new Channel[A](Actor.self(thiz.scheduler))
- Futures.fromInputChannel(someChan)
- }
- }
- thiz linkTo linkedChannel
- thiz.send(msg, linkedChannel)
- new Future[Any](ftch) {
- var exitReason: Option[Any] = None
- val handleReply: PartialFunction[Any, Unit] = {
- case Exit(from, reason) =>
- exitReason = Some(reason)
- case any =>
- fvalue = Some(any)
- }
-
- def apply(): Any =
- if (isSet) {
- if (!fvalue.isEmpty)
- fvalue.get
- else if (!exitReason.isEmpty) {
- val reason = exitReason.get
- if (reason.isInstanceOf[Throwable])
- throw new ExecutionException(reason.asInstanceOf[Throwable])
- else
- throw new ExecutionException(new Exception(reason.toString()))
- }
- } else inputChannel.receive(handleReply andThen { _ => apply() })
-
- def respond(k: Any => Unit): Unit =
- if (isSet)
- apply()
- else
- inputChannel.react(handleReply andThen { _ => k(apply()) })
-
- def isSet = (fvalue match {
- case None =>
- val handleTimeout: PartialFunction[Any, Boolean] = {
- case TIMEOUT =>
- false
- }
- val whatToDo =
- handleTimeout orElse (handleReply andThen { _ => true })
- inputChannel.receiveWithin(0)(whatToDo)
- case Some(_) => true
- }) || !exitReason.isEmpty
- }
+ val noTransform: PartialFunction[Any, Any] = { case x => x}
+ this !! (msg, noTransform)
}
}
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
index ea8624e426..2c48725e8e 100644
--- a/src/actors/scala/actors/ActorTask.scala
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -41,12 +41,15 @@ private[actors] class ActorTask(actor: Actor,
currentThread,
e)
- actor.synchronized {
+ val todo = actor.synchronized {
if (!actor.links.isEmpty)
actor.exitLinked(uncaught)
- else
+ else {
super.terminateExecution(e)
+ () => {}
+ }
}
+ todo()
}
}