diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-04-09 16:21:18 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-04-09 16:21:18 +0000 |
commit | c3e29c28b0db043e1f090154c36ea3485809f5e3 (patch) | |
tree | 156b1a0668a0feb6f3a1de43ad2e07b0e1f2e64a /src | |
parent | b06edbc46d2b08752f6bd751f030a166c7628c98 (diff) | |
download | scala-c3e29c28b0db043e1f090154c36ea3485809f5e3.tar.gz scala-c3e29c28b0db043e1f090154c36ea3485809f5e3.tar.bz2 scala-c3e29c28b0db043e1f090154c36ea3485809f5e3.zip |
Improved error-handling using futures.
Diffstat (limited to 'src')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 121 |
1 files changed, 105 insertions, 16 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 8deef11398..4f71a1ab8d 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -15,6 +15,8 @@ import scala.compat.Platform import java.util.{Timer, TimerTask} +import java.util.concurrent.ExecutionException + /** * The <code>Actor</code> object provides functions for the definition of * actors, as well as actor operations, such as @@ -629,26 +631,113 @@ trait Actor extends AbstractActor { */ def !!(msg: Any): Future[Any] = { val ftch = new Channel[Any](Actor.self(scheduler)) - send(msg, ftch) - new Future[Any](ftch) { - def apply() = - if (isSet) value.get - else ch.receive { - case any => value = Some(any); any + val linkedChannel = new AbstractActor { + def !(msg: Any) = + ftch ! msg + def send(msg: Any, replyTo: OutputChannel[Any]) = + ftch.send(msg, replyTo) + def forward(msg: Any) = + ftch.forward(msg) + def receiver = + ftch.receiver + def linkTo(to: AbstractActor) { /* do nothing */ } + def unlinkFrom(from: AbstractActor) { /* do nothing */ } + def exit(from: AbstractActor, reason: AnyRef) { + ftch.send(Exit(from, reason), Actor.this) + } + // should never be invoked; return dummy value + def !?(msg: Any) = msg + // should never be invoked; return dummy value + def !?(msec: Long, msg: Any): Option[Any] = Some(msg) + // should never be invoked; return dummy value + def !!(msg: Any): Future[Any] = { + val someChan = new Channel[Any](Actor.self(scheduler)) + new Future[Any](someChan) { + def apply() = + if (isSet) value.get + else ch.receive { + case any => value = Some(any); any + } + def respond(k: Any => Unit): Unit = + if (isSet) k(value.get) + else ch.react { + case any => value = Some(any); k(any) + } + def isSet = value match { + case None => ch.receiveWithin(0) { + case TIMEOUT => false + case any => value = Some(any); true + } + case Some(_) => true + } } - def respond(k: Any => Unit): Unit = - if (isSet) k(value.get) - else ch.react { - case any => value = Some(any); k(any) - } - def isSet = value match { - case None => ch.receiveWithin(0) { - case TIMEOUT => false - case any => value = Some(any); true + } + // should never be invoked; return dummy value + def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { + val someChan = new Channel[A](Actor.self(scheduler)) + new Future[A](someChan) { + def apply() = + if (isSet) value.get.asInstanceOf[A] + else ch.receive { + case any => value = Some(any); any + } + def respond(k: A => Unit): Unit = + if (isSet) k(value.get.asInstanceOf[A]) + else ch.react { + case any => value = Some(any); k(any) + } + def isSet = value match { + case None => ch.receiveWithin(0) { + case TIMEOUT => false + case any => value = Some(any); true + } + case Some(_) => true + } } - case Some(_) => true } } + linkTo(linkedChannel) + send(msg, linkedChannel) + new Future[Any](ftch) { + var exitReason: Option[Any] = None + val handleReply: PartialFunction[Any, Unit] = { + case Exit(from, reason) => + exitReason = Some(reason) + case any => + value = Some(any) + } + + def apply(): Any = + if (isSet) { + if (!value.isEmpty) + value.get + else if (!exitReason.isEmpty) { + val reason = exitReason.get + if (reason.isInstanceOf[Throwable]) + throw new ExecutionException(reason.asInstanceOf[Throwable]) + else + throw new ExecutionException(new Exception(reason.toString())) + } + } else ch.receive(handleReply andThen {(x: Unit) => apply()}) + + def respond(k: Any => Unit): Unit = + if (isSet) + apply() + else + ch.react(handleReply andThen {(x: Unit) => k(apply())}) + + def isSet = (value match { + case None => + val handleTimeout: PartialFunction[Any, Boolean] = { + case TIMEOUT => + false + } + val whatToDo = + handleTimeout orElse (handleReply andThen {(x: Unit) => true}) + ch.receiveWithin(0)(whatToDo) + case Some(_) => true + }) || !exitReason.isEmpty + } } /** |