diff options
author | Vojin Jovanovic <vojin.jovanovic@epfl.ch> | 2012-09-24 17:04:29 +0200 |
---|---|---|
committer | Vojin Jovanovic <vojin.jovanovic@epfl.ch> | 2012-09-25 11:14:46 +0200 |
commit | b92becd757d8319129fa8bd0a93af8c6fd2b23b7 (patch) | |
tree | ab1e7597e268f1a2d6f740926383ded221e8d36a /src/actors | |
parent | d69483662a596beb13cf6e128450a1c51881a6f6 (diff) | |
download | scala-b92becd757d8319129fa8bd0a93af8c6fd2b23b7.tar.gz scala-b92becd757d8319129fa8bd0a93af8c6fd2b23b7.tar.bz2 scala-b92becd757d8319129fa8bd0a93af8c6fd2b23b7.zip |
Support for scala.concurrent for the ActorRef.
Review by @phaller
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/ActorRef.scala | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala index 7768f04c2b..5e0ca1554a 100644 --- a/src/actors/scala/actors/ActorRef.scala +++ b/src/actors/scala/actors/ActorRef.scala @@ -2,6 +2,8 @@ package scala.actors import java.util.concurrent.TimeoutException import scala.concurrent.duration.Duration +import scala.concurrent.Promise +import scala.concurrent.ExecutionContext.Implicits.global /** * Trait used for migration of Scala actors to Akka. @@ -28,7 +30,7 @@ trait ActorRef { /** * Sends a message asynchronously, returning a future which may eventually hold the reply. */ - private[actors] def ?(message: Any, timeout: Duration): Future[Any] + private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] /** * Forwards the message and passes the original sender actor as the sender. @@ -43,7 +45,7 @@ trait ActorRef { private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef { - override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = + override private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = throw new UnsupportedOperationException("Output channel does not support ?") /** @@ -88,14 +90,19 @@ private[actors] final class InternalActorRef(override val actor: InternalActor) /** * Sends a message asynchronously, returning a future which may eventually hold the reply. */ - override private[actors] def ?(message: Any, timeout: Duration): Future[Any] = - Futures.future { - val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2) - actor !? (dur, message) match { - case Some(x) => x - case None => new AskTimeoutException("? operation timed out.") + override private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = { + val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2) + val replyPromise = Promise[Any] + scala.concurrent.future { + scala.concurrent.blocking { + actor !? (dur, message) + } match { + case Some(x) => replyPromise success x + case None => replyPromise failure new AskTimeoutException("? operation timed out.") } } + replyPromise.future + } override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (message == PoisonPill) |