diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-07-21 17:35:01 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-07-21 17:35:01 +0000 |
commit | d063a9fa5162bf6f59b34d653b46534830310a50 (patch) | |
tree | fd9dfd389c834d3091fe37b86e720ede940d6850 | |
parent | 9e896451708d850d5bc3b3d27d169f09aece912b (diff) | |
download | scala-d063a9fa5162bf6f59b34d653b46534830310a50.tar.gz scala-d063a9fa5162bf6f59b34d653b46534830310a50.tar.bz2 scala-d063a9fa5162bf6f59b34d653b46534830310a50.zip |
Enabled synchronous message sends for Replyable...
Enabled synchronous message sends for ReplyableReactor. Added
get(timeout: Long) method to SyncVar.
-rw-r--r-- | src/actors/scala/actors/ReplyableActor.scala | 55 | ||||
-rw-r--r-- | src/actors/scala/actors/ReplyableReactor.scala | 78 | ||||
-rw-r--r-- | src/library/scala/concurrent/SyncVar.scala | 14 | ||||
-rw-r--r-- | test/files/jvm/replyablereactor2.check | 5 | ||||
-rw-r--r-- | test/files/jvm/replyablereactor2.scala | 45 | ||||
-rw-r--r-- | test/files/jvm/replyablereactor3.check | 5 | ||||
-rw-r--r-- | test/files/jvm/replyablereactor3.scala | 44 | ||||
-rw-r--r-- | test/files/jvm/replyablereactor4.check | 5 | ||||
-rw-r--r-- | test/files/jvm/replyablereactor4.scala | 44 |
9 files changed, 269 insertions, 26 deletions
diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala index 709bb0ef98..1e1487bf39 100644 --- a/src/actors/scala/actors/ReplyableActor.scala +++ b/src/actors/scala/actors/ReplyableActor.scala @@ -23,6 +23,61 @@ trait ReplyableActor extends ReplyableReactor { thiz: AbstractActor with ReplyReactor => /** + * Sends <code>msg</code> to this actor and awaits reply + * (synchronous). + * + * @param msg the message to be sent + * @return the reply + */ + override def !?(msg: Any): Any = { + val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) + thiz.send(msg, replyCh) + replyCh.receive { + case x => x + } + } + + /** + * Sends <code>msg</code> to this actor and awaits reply + * (synchronous) within <code>msec</code> milliseconds. + * + * @param msec the time span before timeout + * @param msg the message to be sent + * @return <code>None</code> in case of timeout, otherwise + * <code>Some(x)</code> where <code>x</code> is the reply + */ + override def !?(msec: Long, msg: Any): Option[Any] = { + val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) + thiz.send(msg, replyCh) + replyCh.receiveWithin(msec) { + case TIMEOUT => None + case x => Some(x) + } + } + + /** + * Sends <code>msg</code> to this actor and immediately + * returns a future representing the reply value. + * The reply is post-processed using the partial function + * <code>f</code>. This also allows to recover a more + * precise type for the reply value. + */ + override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { + val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler)) + thiz.send(msg, new OutputChannel[Any] { + def !(msg: Any) = + ftch ! f(msg) + def send(msg: Any, replyTo: OutputChannel[Any]) = + ftch.send(f(msg), replyTo) + def forward(msg: Any) = + ftch.forward(f(msg)) + def receiver = + ftch.receiver + }) + Futures.fromInputChannel(ftch) + } + + /** * Sends <code>msg</code> to this actor and immediately * returns a future representing the reply value. */ diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala index cc2519e3ee..84168abe0a 100644 --- a/src/actors/scala/actors/ReplyableReactor.scala +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -27,13 +27,8 @@ trait ReplyableReactor extends Replyable[Any, Any] { * @param msg the message to be sent * @return the reply */ - def !?(msg: Any): Any = { - val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) - thiz.send(msg, replyCh) - replyCh.receive { - case x => x - } - } + def !?(msg: Any): Any = + (this !! msg)() /** * Sends <code>msg</code> to this actor and awaits reply @@ -45,23 +40,28 @@ trait ReplyableReactor extends Replyable[Any, Any] { * <code>Some(x)</code> where <code>x</code> is the reply */ def !?(msec: Long, msg: Any): Option[Any] = { - val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) - thiz.send(msg, replyCh) - replyCh.receiveWithin(msec) { - case TIMEOUT => None - case x => Some(x) + val myself = Actor.rawSelf(thiz.scheduler) + val res = new scala.concurrent.SyncVar[Any] + val out = new OutputChannel[Any] { + def !(msg: Any) = + res set msg + def send(msg: Any, replyTo: OutputChannel[Any]) = + res set msg + def forward(msg: Any) = + res set msg + def receiver = + myself } + thiz.send(msg, out) + res.get(msec) } /** * Sends <code>msg</code> to this actor and immediately * returns a future representing the reply value. */ - override def !!(msg: Any): Future[Any] = { - val ftch = new Channel[Any](Actor.rawSelf(thiz.scheduler)) - thiz.send(msg, ftch) - Futures.fromInputChannel(ftch) - } + override def !!(msg: Any): Future[Any] = + this !! (msg, { case x => x }) /** * Sends <code>msg</code> to this actor and immediately @@ -71,18 +71,44 @@ trait ReplyableReactor extends Replyable[Any, Any] { * precise type for the reply value. */ override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { - val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler)) - thiz.send(msg, new OutputChannel[Any] { - def !(msg: Any) = + val myself = Actor.rawSelf(thiz.scheduler) + val ftch = new Channel[A](myself) + val res = new scala.concurrent.SyncVar[A] + + val out = new OutputChannel[Any] { + def !(msg: Any) = { ftch ! f(msg) - def send(msg: Any, replyTo: OutputChannel[Any]) = + res set f(msg) + } + def send(msg: Any, replyTo: OutputChannel[Any]) = { ftch.send(f(msg), replyTo) - def forward(msg: Any) = - ftch.forward(f(msg)) + res set f(msg) + } + def forward(msg: Any) = { + ftch forward f(msg) + res set f(msg) + } def receiver = - ftch.receiver - }) - Futures.fromInputChannel(ftch) + myself + } + + thiz.send(msg, out) + + new Future[A](ftch) { + def apply() = + if (isSet) value.get.asInstanceOf[A] + else { + value = Some(res.get) + value.get.asInstanceOf[A] + } + def respond(k: A => Unit): Unit = + if (isSet) k(value.get.asInstanceOf[A]) + else inputChannel.react { + case any => value = Some(any); k(value.get.asInstanceOf[A]) + } + def isSet = + !value.isEmpty + } } } diff --git a/src/library/scala/concurrent/SyncVar.scala b/src/library/scala/concurrent/SyncVar.scala index 3b35c9ef10..d64c2fa51c 100644 --- a/src/library/scala/concurrent/SyncVar.scala +++ b/src/library/scala/concurrent/SyncVar.scala @@ -28,6 +28,20 @@ class SyncVar[A] { else throw exception.get } + def get(timeout: Long): Option[A] = synchronized { + if (!isDefined) { + try { + wait(timeout) + } catch { + case _: InterruptedException => + } + } + if (exception.isEmpty) { + if (isDefined) Some(value) else None + } else + throw exception.get + } + def take() = synchronized { try { get diff --git a/test/files/jvm/replyablereactor2.check b/test/files/jvm/replyablereactor2.check new file mode 100644 index 0000000000..0944b17279 --- /dev/null +++ b/test/files/jvm/replyablereactor2.check @@ -0,0 +1,5 @@ +'hello +'hello +'hello +'hello +'hello diff --git a/test/files/jvm/replyablereactor2.scala b/test/files/jvm/replyablereactor2.scala new file mode 100644 index 0000000000..6f0b43175d --- /dev/null +++ b/test/files/jvm/replyablereactor2.scala @@ -0,0 +1,45 @@ +import scala.actors._ +import scala.actors.Actor._ + +class MyActor extends ReplyReactor with ReplyableReactor { + def act() { + loop { + react { + case 'hello => + sender ! 'hello + case 'stop => + exit() + } + } + } +} + +object Test { + def main(args: Array[String]) { + val a = new MyActor + a.start() + + val b = new Reactor { + def act() { + react { + case r: MyActor => + var i = 0 + loop { + i += 1 + val ft = r !! 'hello + val msg = ft() + if (i % 10000 == 0) + println(msg) + if (i >= 50000) { + r ! 'stop + exit() + } + } + } + } + } + b.start() + + b ! a + } +} diff --git a/test/files/jvm/replyablereactor3.check b/test/files/jvm/replyablereactor3.check new file mode 100644 index 0000000000..0944b17279 --- /dev/null +++ b/test/files/jvm/replyablereactor3.check @@ -0,0 +1,5 @@ +'hello +'hello +'hello +'hello +'hello diff --git a/test/files/jvm/replyablereactor3.scala b/test/files/jvm/replyablereactor3.scala new file mode 100644 index 0000000000..6a646731d8 --- /dev/null +++ b/test/files/jvm/replyablereactor3.scala @@ -0,0 +1,44 @@ +import scala.actors._ +import scala.actors.Actor._ + +class MyActor extends ReplyReactor with ReplyableReactor { + def act() { + loop { + react { + case 'hello => + sender ! 'hello + case 'stop => + exit() + } + } + } +} + +object Test { + def main(args: Array[String]) { + val a = new MyActor + a.start() + + val b = new Reactor { + def act() { + react { + case r: MyActor => + var i = 0 + loop { + i += 1 + val msg = r !? 'hello + if (i % 10000 == 0) + println(msg) + if (i >= 50000) { + r ! 'stop + exit() + } + } + } + } + } + b.start() + + b ! a + } +} diff --git a/test/files/jvm/replyablereactor4.check b/test/files/jvm/replyablereactor4.check new file mode 100644 index 0000000000..cac0fffe3b --- /dev/null +++ b/test/files/jvm/replyablereactor4.check @@ -0,0 +1,5 @@ +Some('hello) +Some('hello) +Some('hello) +Some('hello) +Some('hello) diff --git a/test/files/jvm/replyablereactor4.scala b/test/files/jvm/replyablereactor4.scala new file mode 100644 index 0000000000..f09e32e356 --- /dev/null +++ b/test/files/jvm/replyablereactor4.scala @@ -0,0 +1,44 @@ +import scala.actors._ +import scala.actors.Actor._ + +class MyActor extends ReplyReactor with ReplyableReactor { + def act() { + loop { + react { + case 'hello => + sender ! 'hello + case 'stop => + exit() + } + } + } +} + +object Test { + def main(args: Array[String]) { + val a = new MyActor + a.start() + + val b = new Reactor { + def act() { + react { + case r: MyActor => + var i = 0 + loop { + i += 1 + val msg = r !? (500, 'hello) + if (i % 10000 == 0) + println(msg) + if (i >= 50000) { + r ! 'stop + exit() + } + } + } + } + } + b.start() + + b ! a + } +} |