summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-04-09 16:21:18 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-04-09 16:21:18 +0000
commitc3e29c28b0db043e1f090154c36ea3485809f5e3 (patch)
tree156b1a0668a0feb6f3a1de43ad2e07b0e1f2e64a /src/actors
parentb06edbc46d2b08752f6bd751f030a166c7628c98 (diff)
downloadscala-c3e29c28b0db043e1f090154c36ea3485809f5e3.tar.gz
scala-c3e29c28b0db043e1f090154c36ea3485809f5e3.tar.bz2
scala-c3e29c28b0db043e1f090154c36ea3485809f5e3.zip
Improved error-handling using futures.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala121
1 files changed, 105 insertions, 16 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 8deef11398..4f71a1ab8d 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -15,6 +15,8 @@ import scala.compat.Platform
import java.util.{Timer, TimerTask}
+import java.util.concurrent.ExecutionException
+
/**
* The <code>Actor</code> object provides functions for the definition of
* actors, as well as actor operations, such as
@@ -629,26 +631,113 @@ trait Actor extends AbstractActor {
*/
def !!(msg: Any): Future[Any] = {
val ftch = new Channel[Any](Actor.self(scheduler))
- send(msg, ftch)
- new Future[Any](ftch) {
- def apply() =
- if (isSet) value.get
- else ch.receive {
- case any => value = Some(any); any
+ val linkedChannel = new AbstractActor {
+ def !(msg: Any) =
+ ftch ! msg
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ ftch.send(msg, replyTo)
+ def forward(msg: Any) =
+ ftch.forward(msg)
+ def receiver =
+ ftch.receiver
+ def linkTo(to: AbstractActor) { /* do nothing */ }
+ def unlinkFrom(from: AbstractActor) { /* do nothing */ }
+ def exit(from: AbstractActor, reason: AnyRef) {
+ ftch.send(Exit(from, reason), Actor.this)
+ }
+ // should never be invoked; return dummy value
+ def !?(msg: Any) = msg
+ // should never be invoked; return dummy value
+ def !?(msec: Long, msg: Any): Option[Any] = Some(msg)
+ // should never be invoked; return dummy value
+ def !!(msg: Any): Future[Any] = {
+ val someChan = new Channel[Any](Actor.self(scheduler))
+ new Future[Any](someChan) {
+ def apply() =
+ if (isSet) value.get
+ else ch.receive {
+ case any => value = Some(any); any
+ }
+ def respond(k: Any => Unit): Unit =
+ if (isSet) k(value.get)
+ else ch.react {
+ case any => value = Some(any); k(any)
+ }
+ def isSet = value match {
+ case None => ch.receiveWithin(0) {
+ case TIMEOUT => false
+ case any => value = Some(any); true
+ }
+ case Some(_) => true
+ }
}
- def respond(k: Any => Unit): Unit =
- if (isSet) k(value.get)
- else ch.react {
- case any => value = Some(any); k(any)
- }
- def isSet = value match {
- case None => ch.receiveWithin(0) {
- case TIMEOUT => false
- case any => value = Some(any); true
+ }
+ // should never be invoked; return dummy value
+ def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
+ val someChan = new Channel[A](Actor.self(scheduler))
+ new Future[A](someChan) {
+ def apply() =
+ if (isSet) value.get.asInstanceOf[A]
+ else ch.receive {
+ case any => value = Some(any); any
+ }
+ def respond(k: A => Unit): Unit =
+ if (isSet) k(value.get.asInstanceOf[A])
+ else ch.react {
+ case any => value = Some(any); k(any)
+ }
+ def isSet = value match {
+ case None => ch.receiveWithin(0) {
+ case TIMEOUT => false
+ case any => value = Some(any); true
+ }
+ case Some(_) => true
+ }
}
- case Some(_) => true
}
}
+ linkTo(linkedChannel)
+ send(msg, linkedChannel)
+ new Future[Any](ftch) {
+ var exitReason: Option[Any] = None
+ val handleReply: PartialFunction[Any, Unit] = {
+ case Exit(from, reason) =>
+ exitReason = Some(reason)
+ case any =>
+ value = Some(any)
+ }
+
+ def apply(): Any =
+ if (isSet) {
+ if (!value.isEmpty)
+ value.get
+ else if (!exitReason.isEmpty) {
+ val reason = exitReason.get
+ if (reason.isInstanceOf[Throwable])
+ throw new ExecutionException(reason.asInstanceOf[Throwable])
+ else
+ throw new ExecutionException(new Exception(reason.toString()))
+ }
+ } else ch.receive(handleReply andThen {(x: Unit) => apply()})
+
+ def respond(k: Any => Unit): Unit =
+ if (isSet)
+ apply()
+ else
+ ch.react(handleReply andThen {(x: Unit) => k(apply())})
+
+ def isSet = (value match {
+ case None =>
+ val handleTimeout: PartialFunction[Any, Boolean] = {
+ case TIMEOUT =>
+ false
+ }
+ val whatToDo =
+ handleTimeout orElse (handleReply andThen {(x: Unit) => true})
+ ch.receiveWithin(0)(whatToDo)
+ case Some(_) => true
+ }) || !exitReason.isEmpty
+ }
}
/**