summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/actors/scala/actors/Actor.scala41
-rw-r--r--src/actors/scala/actors/ActorCanReply.scala101
-rw-r--r--src/actors/scala/actors/ActorTask.scala7
-rw-r--r--test/files/jvm/t3356.check1
-rw-r--r--test/files/jvm/t3356.scala59
5 files changed, 102 insertions, 107 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index ccd60f666c..c78447b92e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -12,7 +12,6 @@ package scala.actors
import scala.util.control.ControlThrowable
import java.util.{Timer, TimerTask}
-import java.util.concurrent.{ExecutionException, Callable}
/**
* The <code>Actor</code> object provides functions for the definition of
@@ -695,37 +694,51 @@ trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with Inpu
* <code>reason != 'normal</code>.
* </p>
*/
- protected[actors] def exit(reason: AnyRef): Nothing = synchronized {
- exitReason = reason
+ protected[actors] def exit(reason: AnyRef): Nothing = {
+ synchronized {
+ exitReason = reason
+ }
exit()
}
/**
* Terminates with exit reason <code>'normal</code>.
*/
- protected[actors] override def exit(): Nothing = synchronized {
- if (!links.isEmpty)
- exitLinked()
+ protected[actors] override def exit(): Nothing = {
+ val todo = synchronized {
+ if (!links.isEmpty)
+ exitLinked()
+ else
+ () => {}
+ }
+ todo()
super.exit()
}
// Assume !links.isEmpty
// guarded by this
- private[actors] def exitLinked() {
+ private[actors] def exitLinked(): () => Unit = {
_state = Actor.State.Terminated
+ // reset waitingFor, otherwise getState returns Suspended
+ waitingFor = Reactor.waitingForNone
// remove this from links
val mylinks = links.filterNot(this.==)
- // exit linked processes
- mylinks.foreach((linked: AbstractActor) => {
- unlink(linked)
- if (!linked.exiting)
- linked.exit(this, exitReason)
- })
+ // unlink actors
+ mylinks.foreach(unlinkFrom(_))
+ // return closure that locks linked actors
+ () => {
+ mylinks.foreach((linked: AbstractActor) => {
+ linked.synchronized {
+ if (!linked.exiting)
+ linked.exit(this, exitReason)
+ }
+ })
+ }
}
// Assume !links.isEmpty
// guarded by this
- private[actors] def exitLinked(reason: AnyRef) {
+ private[actors] def exitLinked(reason: AnyRef): () => Unit = {
exitReason = reason
exitLinked()
}
diff --git a/src/actors/scala/actors/ActorCanReply.scala b/src/actors/scala/actors/ActorCanReply.scala
index fdc3833ec4..7b8ac27405 100644
--- a/src/actors/scala/actors/ActorCanReply.scala
+++ b/src/actors/scala/actors/ActorCanReply.scala
@@ -10,8 +10,6 @@
package scala.actors
-import java.util.concurrent.ExecutionException
-
/**
* The `ActorCanReply` trait provides message send operations that
* may result in a response from the receiver.
@@ -19,19 +17,17 @@ import java.util.concurrent.ExecutionException
* @author Philipp Haller
*/
private[actors] trait ActorCanReply extends ReactorCanReply {
- thiz: AbstractActor with ReplyReactor =>
+ this: AbstractActor with ReplyReactor =>
override def !?(msg: Any): Any = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
- replyCh.receive {
- case x => x
- }
+ val replyCh = new Channel[Any](Actor.self(scheduler))
+ send(msg, replyCh)
+ replyCh.?
}
override def !?(msec: Long, msg: Any): Option[Any] = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
+ val replyCh = new Channel[Any](Actor.self(scheduler))
+ send(msg, replyCh)
replyCh.receiveWithin(msec) {
case TIMEOUT => None
case x => Some(x)
@@ -39,8 +35,8 @@ private[actors] trait ActorCanReply extends ReactorCanReply {
}
override def !![A](msg: Any, handler: PartialFunction[Any, A]): Future[A] = {
- val ftch = new Channel[A](Actor.self(thiz.scheduler))
- thiz.send(msg, new OutputChannel[Any] {
+ val ftch = new Channel[A](Actor.self(scheduler))
+ send(msg, new OutputChannel[Any] {
def !(msg: Any) =
ftch ! handler(msg)
def send(msg: Any, replyTo: OutputChannel[Any]) =
@@ -54,85 +50,8 @@ private[actors] trait ActorCanReply extends ReactorCanReply {
}
override def !!(msg: Any): Future[Any] = {
- val ftch = new Channel[Any](Actor.self(thiz.scheduler))
- val linkedChannel = new AbstractActor {
- def !(msg: Any) = {
- ftch ! msg
- thiz unlinkFrom this
- }
- def send(msg: Any, replyTo: OutputChannel[Any]) = {
- ftch.send(msg, replyTo)
- thiz unlinkFrom this
- }
- def forward(msg: Any) = {
- ftch.forward(msg)
- thiz unlinkFrom this
- }
- def receiver =
- ftch.receiver
- def linkTo(to: AbstractActor) { /* do nothing */ }
- def unlinkFrom(from: AbstractActor) { /* do nothing */ }
- def exit(from: AbstractActor, reason: AnyRef) {
- ftch.send(Exit(from, reason), thiz)
- thiz unlinkFrom this
- }
- // should never be invoked; return dummy value
- def !?(msg: Any) = msg
- // should never be invoked; return dummy value
- def !?(msec: Long, msg: Any): Option[Any] = Some(msg)
- // should never be invoked; return dummy value
- override def !!(msg: Any): Future[Any] = {
- val someChan = new Channel[Any](Actor.self(thiz.scheduler))
- Futures.fromInputChannel(someChan)
- }
- // should never be invoked; return dummy value
- override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
- val someChan = new Channel[A](Actor.self(thiz.scheduler))
- Futures.fromInputChannel(someChan)
- }
- }
- thiz linkTo linkedChannel
- thiz.send(msg, linkedChannel)
- new Future[Any](ftch) {
- var exitReason: Option[Any] = None
- val handleReply: PartialFunction[Any, Unit] = {
- case Exit(from, reason) =>
- exitReason = Some(reason)
- case any =>
- fvalue = Some(any)
- }
-
- def apply(): Any =
- if (isSet) {
- if (!fvalue.isEmpty)
- fvalue.get
- else if (!exitReason.isEmpty) {
- val reason = exitReason.get
- if (reason.isInstanceOf[Throwable])
- throw new ExecutionException(reason.asInstanceOf[Throwable])
- else
- throw new ExecutionException(new Exception(reason.toString()))
- }
- } else inputChannel.receive(handleReply andThen { _ => apply() })
-
- def respond(k: Any => Unit): Unit =
- if (isSet)
- apply()
- else
- inputChannel.react(handleReply andThen { _ => k(apply()) })
-
- def isSet = (fvalue match {
- case None =>
- val handleTimeout: PartialFunction[Any, Boolean] = {
- case TIMEOUT =>
- false
- }
- val whatToDo =
- handleTimeout orElse (handleReply andThen { _ => true })
- inputChannel.receiveWithin(0)(whatToDo)
- case Some(_) => true
- }) || !exitReason.isEmpty
- }
+ val noTransform: PartialFunction[Any, Any] = { case x => x}
+ this !! (msg, noTransform)
}
}
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
index ea8624e426..2c48725e8e 100644
--- a/src/actors/scala/actors/ActorTask.scala
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -41,12 +41,15 @@ private[actors] class ActorTask(actor: Actor,
currentThread,
e)
- actor.synchronized {
+ val todo = actor.synchronized {
if (!actor.links.isEmpty)
actor.exitLinked(uncaught)
- else
+ else {
super.terminateExecution(e)
+ () => {}
+ }
}
+ todo()
}
}
diff --git a/test/files/jvm/t3356.check b/test/files/jvm/t3356.check
new file mode 100644
index 0000000000..6a9284d0aa
--- /dev/null
+++ b/test/files/jvm/t3356.check
@@ -0,0 +1 @@
+sending download requests
diff --git a/test/files/jvm/t3356.scala b/test/files/jvm/t3356.scala
new file mode 100644
index 0000000000..8a82677b5f
--- /dev/null
+++ b/test/files/jvm/t3356.scala
@@ -0,0 +1,59 @@
+import scala.actors.{Actor, Exit, !, UncaughtException}
+import Actor._
+
+case class ImageInfo(text: String) {
+ def downloadImage(): ImageData = {
+ ImageData(text)
+ }
+}
+
+case class ImageData(text: String)
+case class Download(info: ImageInfo)
+
+object Test {
+
+ def scanForImageInfo(url: String): List[ImageInfo] =
+ List(ImageInfo("A"), ImageInfo("B"))
+
+ def renderImage(data: ImageData) {
+ println("rendering image "+data.text)
+ }
+
+ def renderImages(url: String) {
+ val imageInfos = scanForImageInfo(url)
+ println("sending download requests")
+ val dataFutures = for (info <- imageInfos) yield {
+ val loader = link {
+ react { case Download(info) =>
+ throw new Exception("no connection")
+ reply(info.downloadImage())
+ }; {}
+ }
+ loader !! Download(info)
+ }
+ var i = 0
+ loopWhile (i < imageInfos.size) {
+ i += 1
+ val FutureInput = dataFutures(i-1).inputChannel
+ react {
+ case FutureInput ! (data @ ImageData(_)) =>
+ renderImage(data)
+ case Exit(from, ue: UncaughtException[_]) =>
+ ue.message match {
+ case Some(Download(info)) =>
+ println("Couldn't download image "+info+" because of "+ue.getCause())
+ case _ =>
+ println("Couldn't download image because of "+ue.getCause())
+ }
+ }
+ }
+ println("OK, all images rendered.")
+ }
+
+ def main(args: Array[String]) {
+ actor {
+ renderImages("panorama.epfl.ch")
+ }
+ }
+
+}