summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/actors/scala/actors/ReplyableActor.scala55
-rw-r--r--src/actors/scala/actors/ReplyableReactor.scala78
-rw-r--r--src/library/scala/concurrent/SyncVar.scala14
-rw-r--r--test/files/jvm/replyablereactor2.check5
-rw-r--r--test/files/jvm/replyablereactor2.scala45
-rw-r--r--test/files/jvm/replyablereactor3.check5
-rw-r--r--test/files/jvm/replyablereactor3.scala44
-rw-r--r--test/files/jvm/replyablereactor4.check5
-rw-r--r--test/files/jvm/replyablereactor4.scala44
9 files changed, 269 insertions, 26 deletions
diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala
index 709bb0ef98..1e1487bf39 100644
--- a/src/actors/scala/actors/ReplyableActor.scala
+++ b/src/actors/scala/actors/ReplyableActor.scala
@@ -23,6 +23,61 @@ trait ReplyableActor extends ReplyableReactor {
thiz: AbstractActor with ReplyReactor =>
/**
+ * Sends <code>msg</code> to this actor and awaits reply
+ * (synchronous).
+ *
+ * @param msg the message to be sent
+ * @return the reply
+ */
+ override def !?(msg: Any): Any = {
+ val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
+ thiz.send(msg, replyCh)
+ replyCh.receive {
+ case x => x
+ }
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor and awaits reply
+ * (synchronous) within <code>msec</code> milliseconds.
+ *
+ * @param msec the time span before timeout
+ * @param msg the message to be sent
+ * @return <code>None</code> in case of timeout, otherwise
+ * <code>Some(x)</code> where <code>x</code> is the reply
+ */
+ override def !?(msec: Long, msg: Any): Option[Any] = {
+ val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
+ thiz.send(msg, replyCh)
+ replyCh.receiveWithin(msec) {
+ case TIMEOUT => None
+ case x => Some(x)
+ }
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor and immediately
+ * returns a future representing the reply value.
+ * The reply is post-processed using the partial function
+ * <code>f</code>. This also allows to recover a more
+ * precise type for the reply value.
+ */
+ override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
+ val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler))
+ thiz.send(msg, new OutputChannel[Any] {
+ def !(msg: Any) =
+ ftch ! f(msg)
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ ftch.send(f(msg), replyTo)
+ def forward(msg: Any) =
+ ftch.forward(f(msg))
+ def receiver =
+ ftch.receiver
+ })
+ Futures.fromInputChannel(ftch)
+ }
+
+ /**
* Sends <code>msg</code> to this actor and immediately
* returns a future representing the reply value.
*/
diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala
index cc2519e3ee..84168abe0a 100644
--- a/src/actors/scala/actors/ReplyableReactor.scala
+++ b/src/actors/scala/actors/ReplyableReactor.scala
@@ -27,13 +27,8 @@ trait ReplyableReactor extends Replyable[Any, Any] {
* @param msg the message to be sent
* @return the reply
*/
- def !?(msg: Any): Any = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
- replyCh.receive {
- case x => x
- }
- }
+ def !?(msg: Any): Any =
+ (this !! msg)()
/**
* Sends <code>msg</code> to this actor and awaits reply
@@ -45,23 +40,28 @@ trait ReplyableReactor extends Replyable[Any, Any] {
* <code>Some(x)</code> where <code>x</code> is the reply
*/
def !?(msec: Long, msg: Any): Option[Any] = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
- replyCh.receiveWithin(msec) {
- case TIMEOUT => None
- case x => Some(x)
+ val myself = Actor.rawSelf(thiz.scheduler)
+ val res = new scala.concurrent.SyncVar[Any]
+ val out = new OutputChannel[Any] {
+ def !(msg: Any) =
+ res set msg
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ res set msg
+ def forward(msg: Any) =
+ res set msg
+ def receiver =
+ myself
}
+ thiz.send(msg, out)
+ res.get(msec)
}
/**
* Sends <code>msg</code> to this actor and immediately
* returns a future representing the reply value.
*/
- override def !!(msg: Any): Future[Any] = {
- val ftch = new Channel[Any](Actor.rawSelf(thiz.scheduler))
- thiz.send(msg, ftch)
- Futures.fromInputChannel(ftch)
- }
+ override def !!(msg: Any): Future[Any] =
+ this !! (msg, { case x => x })
/**
* Sends <code>msg</code> to this actor and immediately
@@ -71,18 +71,44 @@ trait ReplyableReactor extends Replyable[Any, Any] {
* precise type for the reply value.
*/
override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
- val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler))
- thiz.send(msg, new OutputChannel[Any] {
- def !(msg: Any) =
+ val myself = Actor.rawSelf(thiz.scheduler)
+ val ftch = new Channel[A](myself)
+ val res = new scala.concurrent.SyncVar[A]
+
+ val out = new OutputChannel[Any] {
+ def !(msg: Any) = {
ftch ! f(msg)
- def send(msg: Any, replyTo: OutputChannel[Any]) =
+ res set f(msg)
+ }
+ def send(msg: Any, replyTo: OutputChannel[Any]) = {
ftch.send(f(msg), replyTo)
- def forward(msg: Any) =
- ftch.forward(f(msg))
+ res set f(msg)
+ }
+ def forward(msg: Any) = {
+ ftch forward f(msg)
+ res set f(msg)
+ }
def receiver =
- ftch.receiver
- })
- Futures.fromInputChannel(ftch)
+ myself
+ }
+
+ thiz.send(msg, out)
+
+ new Future[A](ftch) {
+ def apply() =
+ if (isSet) value.get.asInstanceOf[A]
+ else {
+ value = Some(res.get)
+ value.get.asInstanceOf[A]
+ }
+ def respond(k: A => Unit): Unit =
+ if (isSet) k(value.get.asInstanceOf[A])
+ else inputChannel.react {
+ case any => value = Some(any); k(value.get.asInstanceOf[A])
+ }
+ def isSet =
+ !value.isEmpty
+ }
}
}
diff --git a/src/library/scala/concurrent/SyncVar.scala b/src/library/scala/concurrent/SyncVar.scala
index 3b35c9ef10..d64c2fa51c 100644
--- a/src/library/scala/concurrent/SyncVar.scala
+++ b/src/library/scala/concurrent/SyncVar.scala
@@ -28,6 +28,20 @@ class SyncVar[A] {
else throw exception.get
}
+ def get(timeout: Long): Option[A] = synchronized {
+ if (!isDefined) {
+ try {
+ wait(timeout)
+ } catch {
+ case _: InterruptedException =>
+ }
+ }
+ if (exception.isEmpty) {
+ if (isDefined) Some(value) else None
+ } else
+ throw exception.get
+ }
+
def take() = synchronized {
try {
get
diff --git a/test/files/jvm/replyablereactor2.check b/test/files/jvm/replyablereactor2.check
new file mode 100644
index 0000000000..0944b17279
--- /dev/null
+++ b/test/files/jvm/replyablereactor2.check
@@ -0,0 +1,5 @@
+'hello
+'hello
+'hello
+'hello
+'hello
diff --git a/test/files/jvm/replyablereactor2.scala b/test/files/jvm/replyablereactor2.scala
new file mode 100644
index 0000000000..6f0b43175d
--- /dev/null
+++ b/test/files/jvm/replyablereactor2.scala
@@ -0,0 +1,45 @@
+import scala.actors._
+import scala.actors.Actor._
+
+class MyActor extends ReplyReactor with ReplyableReactor {
+ def act() {
+ loop {
+ react {
+ case 'hello =>
+ sender ! 'hello
+ case 'stop =>
+ exit()
+ }
+ }
+ }
+}
+
+object Test {
+ def main(args: Array[String]) {
+ val a = new MyActor
+ a.start()
+
+ val b = new Reactor {
+ def act() {
+ react {
+ case r: MyActor =>
+ var i = 0
+ loop {
+ i += 1
+ val ft = r !! 'hello
+ val msg = ft()
+ if (i % 10000 == 0)
+ println(msg)
+ if (i >= 50000) {
+ r ! 'stop
+ exit()
+ }
+ }
+ }
+ }
+ }
+ b.start()
+
+ b ! a
+ }
+}
diff --git a/test/files/jvm/replyablereactor3.check b/test/files/jvm/replyablereactor3.check
new file mode 100644
index 0000000000..0944b17279
--- /dev/null
+++ b/test/files/jvm/replyablereactor3.check
@@ -0,0 +1,5 @@
+'hello
+'hello
+'hello
+'hello
+'hello
diff --git a/test/files/jvm/replyablereactor3.scala b/test/files/jvm/replyablereactor3.scala
new file mode 100644
index 0000000000..6a646731d8
--- /dev/null
+++ b/test/files/jvm/replyablereactor3.scala
@@ -0,0 +1,44 @@
+import scala.actors._
+import scala.actors.Actor._
+
+class MyActor extends ReplyReactor with ReplyableReactor {
+ def act() {
+ loop {
+ react {
+ case 'hello =>
+ sender ! 'hello
+ case 'stop =>
+ exit()
+ }
+ }
+ }
+}
+
+object Test {
+ def main(args: Array[String]) {
+ val a = new MyActor
+ a.start()
+
+ val b = new Reactor {
+ def act() {
+ react {
+ case r: MyActor =>
+ var i = 0
+ loop {
+ i += 1
+ val msg = r !? 'hello
+ if (i % 10000 == 0)
+ println(msg)
+ if (i >= 50000) {
+ r ! 'stop
+ exit()
+ }
+ }
+ }
+ }
+ }
+ b.start()
+
+ b ! a
+ }
+}
diff --git a/test/files/jvm/replyablereactor4.check b/test/files/jvm/replyablereactor4.check
new file mode 100644
index 0000000000..cac0fffe3b
--- /dev/null
+++ b/test/files/jvm/replyablereactor4.check
@@ -0,0 +1,5 @@
+Some('hello)
+Some('hello)
+Some('hello)
+Some('hello)
+Some('hello)
diff --git a/test/files/jvm/replyablereactor4.scala b/test/files/jvm/replyablereactor4.scala
new file mode 100644
index 0000000000..f09e32e356
--- /dev/null
+++ b/test/files/jvm/replyablereactor4.scala
@@ -0,0 +1,44 @@
+import scala.actors._
+import scala.actors.Actor._
+
+class MyActor extends ReplyReactor with ReplyableReactor {
+ def act() {
+ loop {
+ react {
+ case 'hello =>
+ sender ! 'hello
+ case 'stop =>
+ exit()
+ }
+ }
+ }
+}
+
+object Test {
+ def main(args: Array[String]) {
+ val a = new MyActor
+ a.start()
+
+ val b = new Reactor {
+ def act() {
+ react {
+ case r: MyActor =>
+ var i = 0
+ loop {
+ i += 1
+ val msg = r !? (500, 'hello)
+ if (i % 10000 == 0)
+ println(msg)
+ if (i >= 50000) {
+ r ! 'stop
+ exit()
+ }
+ }
+ }
+ }
+ }
+ b.start()
+
+ b ! a
+ }
+}