summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-09-24 17:04:29 +0200
committerVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-09-25 11:14:46 +0200
commitb92becd757d8319129fa8bd0a93af8c6fd2b23b7 (patch)
treeab1e7597e268f1a2d6f740926383ded221e8d36a
parentd69483662a596beb13cf6e128450a1c51881a6f6 (diff)
downloadscala-b92becd757d8319129fa8bd0a93af8c6fd2b23b7.tar.gz
scala-b92becd757d8319129fa8bd0a93af8c6fd2b23b7.tar.bz2
scala-b92becd757d8319129fa8bd0a93af8c6fd2b23b7.zip
Support for scala.concurrent for the ActorRef.
Review by @phaller
-rw-r--r--src/actors-migration/scala/actors/Pattern.scala6
-rw-r--r--src/actors/scala/actors/ActorRef.scala23
-rw-r--r--test/files/jvm/actmig-PinS.scala4
-rw-r--r--test/files/jvm/actmig-PinS_1.scala4
-rw-r--r--test/files/jvm/actmig-PinS_2.scala4
-rw-r--r--test/files/jvm/actmig-PinS_3.scala4
-rw-r--r--test/files/jvm/actmig-public-methods.check4
-rw-r--r--test/files/jvm/actmig-public-methods_1.check4
-rw-r--r--test/files/jvm/actmig-public-methods_1.scala85
-rw-r--r--test/files/jvm/actmig-react-receive.scala4
10 files changed, 92 insertions, 50 deletions
diff --git a/src/actors-migration/scala/actors/Pattern.scala b/src/actors-migration/scala/actors/Pattern.scala
index 297bcc7263..25ba191ce7 100644
--- a/src/actors-migration/scala/actors/Pattern.scala
+++ b/src/actors-migration/scala/actors/Pattern.scala
@@ -6,7 +6,7 @@ import language.implicitConversions
object pattern {
- implicit def askSupport(ar: ActorRef): AskableActorRef =
+ implicit def ask(ar: ActorRef): AskableActorRef =
new AskableActorRef(ar)
}
@@ -17,9 +17,9 @@ class AskableActorRef(val ar: ActorRef) extends ActorRef {
def !(message: Any)(implicit sender: ActorRef = null): Unit = ar.!(message)(sender)
- def ?(message: Any)(timeout: Timeout): Future[Any] = ar.?(message, timeout.duration)
+ def ?(message: Any)(implicit timeout: Timeout): scala.concurrent.Future[Any] = ar.?(message, timeout.duration)
- private[actors] def ?(message: Any, timeout: Duration): Future[Any] = ar.?(message, timeout)
+ private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = ar.?(message, timeout)
def forward(message: Any) = ar.forward(message)
diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala
index 7768f04c2b..5e0ca1554a 100644
--- a/src/actors/scala/actors/ActorRef.scala
+++ b/src/actors/scala/actors/ActorRef.scala
@@ -2,6 +2,8 @@ package scala.actors
import java.util.concurrent.TimeoutException
import scala.concurrent.duration.Duration
+import scala.concurrent.Promise
+import scala.concurrent.ExecutionContext.Implicits.global
/**
* Trait used for migration of Scala actors to Akka.
@@ -28,7 +30,7 @@ trait ActorRef {
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
*/
- private[actors] def ?(message: Any, timeout: Duration): Future[Any]
+ private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any]
/**
* Forwards the message and passes the original sender actor as the sender.
@@ -43,7 +45,7 @@ trait ActorRef {
private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef {
- override private[actors] def ?(message: Any, timeout: Duration): Future[Any] =
+ override private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] =
throw new UnsupportedOperationException("Output channel does not support ?")
/**
@@ -88,14 +90,19 @@ private[actors] final class InternalActorRef(override val actor: InternalActor)
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
*/
- override private[actors] def ?(message: Any, timeout: Duration): Future[Any] =
- Futures.future {
- val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2)
- actor !? (dur, message) match {
- case Some(x) => x
- case None => new AskTimeoutException("? operation timed out.")
+ override private[actors] def ?(message: Any, timeout: Duration): scala.concurrent.Future[Any] = {
+ val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2)
+ val replyPromise = Promise[Any]
+ scala.concurrent.future {
+ scala.concurrent.blocking {
+ actor !? (dur, message)
+ } match {
+ case Some(x) => replyPromise success x
+ case None => replyPromise failure new AskTimeoutException("? operation timed out.")
}
}
+ replyPromise.future
+ }
override def !(message: Any)(implicit sender: ActorRef = null): Unit =
if (message == PoisonPill)
diff --git a/test/files/jvm/actmig-PinS.scala b/test/files/jvm/actmig-PinS.scala
index 30307f3737..3f07fab12e 100644
--- a/test/files/jvm/actmig-PinS.scala
+++ b/test/files/jvm/actmig-PinS.scala
@@ -1,3 +1,7 @@
+/**
+ * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change
+ * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov.
+ */
import scala.actors._
import scala.concurrent.duration._
import scala.concurrent.{ Promise, Await }
diff --git a/test/files/jvm/actmig-PinS_1.scala b/test/files/jvm/actmig-PinS_1.scala
index db3ced3f0e..876688ca75 100644
--- a/test/files/jvm/actmig-PinS_1.scala
+++ b/test/files/jvm/actmig-PinS_1.scala
@@ -1,3 +1,7 @@
+/**
+ * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change
+ * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov.
+ */
import scala.actors._
import scala.actors.migration._
import scala.concurrent.duration._
diff --git a/test/files/jvm/actmig-PinS_2.scala b/test/files/jvm/actmig-PinS_2.scala
index b2627b4628..7d12578f71 100644
--- a/test/files/jvm/actmig-PinS_2.scala
+++ b/test/files/jvm/actmig-PinS_2.scala
@@ -1,3 +1,7 @@
+/**
+ * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change
+ * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov.
+ */
import scala.actors._
import scala.actors.migration._
import scala.concurrent.duration._
diff --git a/test/files/jvm/actmig-PinS_3.scala b/test/files/jvm/actmig-PinS_3.scala
index 8a5eded443..c2943008b0 100644
--- a/test/files/jvm/actmig-PinS_3.scala
+++ b/test/files/jvm/actmig-PinS_3.scala
@@ -1,3 +1,7 @@
+/**
+ * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change
+ * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov.
+ */
import scala.actors._
import scala.actors.migration._
import scala.concurrent.duration._
diff --git a/test/files/jvm/actmig-public-methods.check b/test/files/jvm/actmig-public-methods.check
index bb6530c926..c861c90e63 100644
--- a/test/files/jvm/actmig-public-methods.check
+++ b/test/files/jvm/actmig-public-methods.check
@@ -1,6 +1,6 @@
None
Some(bang qmark after 1)
bang
+bang bang in the future after 0
bang qmark after 0
-bang qmark in future after 0
-typed bang qmark in future after 0
+typed bang bang in the future after 0
diff --git a/test/files/jvm/actmig-public-methods_1.check b/test/files/jvm/actmig-public-methods_1.check
index bb6530c926..c861c90e63 100644
--- a/test/files/jvm/actmig-public-methods_1.check
+++ b/test/files/jvm/actmig-public-methods_1.check
@@ -1,6 +1,6 @@
None
Some(bang qmark after 1)
bang
+bang bang in the future after 0
bang qmark after 0
-bang qmark in future after 0
-typed bang qmark in future after 0
+typed bang bang in the future after 0
diff --git a/test/files/jvm/actmig-public-methods_1.scala b/test/files/jvm/actmig-public-methods_1.scala
index 74fdee947b..15516a5d51 100644
--- a/test/files/jvm/actmig-public-methods_1.scala
+++ b/test/files/jvm/actmig-public-methods_1.scala
@@ -1,11 +1,18 @@
+/**
+ * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change
+ * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov.
+ */
import scala.collection.mutable.ArrayBuffer
import scala.actors.Actor._
import scala.actors._
import scala.actors.migration._
import scala.util._
+import scala.concurrent._
+import scala.concurrent.duration._
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import scala.concurrent.duration._
import scala.actors.migration.pattern._
+import scala.concurrent.ExecutionContext.Implicits.global
object Test {
val NUMBER_OF_TESTS = 6
@@ -40,45 +47,53 @@ object Test {
respActor ! "bang"
- implicit val timeout = Timeout(Duration(500, TimeUnit.MILLISECONDS))
- val msg = ("bang qmark", 0L)
- val res1 = respActor.?(msg)(Timeout(Duration.Inf))
- append(res1().toString)
- latch.countDown()
-
- val msg1 = ("bang qmark", 1L)
- val res2 = respActor.?(msg1)(Timeout(Duration(500, TimeUnit.MILLISECONDS)))
- append((res2() match {
- case x: AskTimeoutException => None
- case v => Some(v)
- }).toString)
- latch.countDown()
-
- // this one should time out
- val msg11 = ("bang qmark", 500L)
- val res21 = respActor.?(msg11)(Timeout(Duration(1, TimeUnit.MILLISECONDS)))
- append((res21() match {
- case x: AskTimeoutException => None
- case v => Some(v)
- }).toString)
- latch.countDown()
-
- val msg2 = ("bang qmark in future", 0L)
- val fut1 = respActor.?(msg2)(Duration.Inf)
- append(fut1().toString())
- latch.countDown()
-
- val handler: PartialFunction[Any, String] = {
- case x: String => x.toString
+ {
+ val msg = ("bang qmark", 0L)
+ val res = respActor.?(msg)(Timeout(Duration.Inf))
+ append(Await.result(res, Duration.Inf).toString)
+ latch.countDown()
+ }
+
+ {
+ val msg = ("bang qmark", 1L)
+ val res = respActor.?(msg)(Timeout(5 seconds))
+
+ val promise = Promise[Option[Any]]()
+ res.onComplete(v => promise.success(v.toOption))
+ append(Await.result(promise.future, Duration.Inf).toString)
+
+ latch.countDown()
+ }
+
+ {
+ val msg = ("bang qmark", 5000L)
+ val res = respActor.?(msg)(Timeout(1 millisecond))
+ val promise = Promise[Option[Any]]()
+ res.onComplete(v => promise.success(v.toOption))
+ append(Await.result(promise.future, Duration.Inf).toString)
+ latch.countDown()
+ }
+
+ {
+ val msg = ("bang bang in the future", 0L)
+ val fut = respActor.?(msg)(Timeout(Duration.Inf))
+ append(Await.result(fut, Duration.Inf).toString)
+ latch.countDown()
}
- val msg3 = ("typed bang qmark in future", 0L)
- val fut2 = (respActor.?(msg3)(Duration.Inf))
- append(Futures.future { handler.apply(fut2()) }().toString)
- latch.countDown()
+ {
+ val handler: PartialFunction[Any, String] = {
+ case x: String => x
+ }
+
+ val msg = ("typed bang bang in the future", 0L)
+ val fut = (respActor.?(msg)(Timeout(Duration.Inf)))
+ append((Await.result(fut.map(handler), Duration.Inf)).toString)
+ latch.countDown()
+ }
// output
- latch.await(200, TimeUnit.MILLISECONDS)
+ latch.await(10, TimeUnit.SECONDS)
if (latch.getCount() > 0) {
println("Error: Tasks have not finished!!!")
}
diff --git a/test/files/jvm/actmig-react-receive.scala b/test/files/jvm/actmig-react-receive.scala
index af36c2ac70..6adeac8b52 100644
--- a/test/files/jvm/actmig-react-receive.scala
+++ b/test/files/jvm/actmig-react-receive.scala
@@ -1,3 +1,7 @@
+/**
+ * NOTE: Code snippets from this test are included in the Actor Migration Guide. In case you change
+ * code in these tests prior to the 2.10.0 release please send the notification to @vjovanov.
+ */
import scala.actors.migration.MigrationSystem._
import scala.actors.Actor._
import scala.actors._