From 64342a3d920ea055739f94d1f10119bcfeabd12e Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 21 Jul 2009 12:21:56 +0000 Subject: Enabled future-type message sends for reactors. --- src/actors/scala/actors/Channel.scala | 13 ++++--- src/actors/scala/actors/OutputChannel.scala | 4 +-- src/actors/scala/actors/ReplyableReactor.scala | 4 +-- test/files/jvm/replyablereactor.check | 5 +++ test/files/jvm/replyablereactor.scala | 47 ++++++++++++++++++++++++++ 5 files changed, 64 insertions(+), 9 deletions(-) create mode 100644 test/files/jvm/replyablereactor.check create mode 100644 test/files/jvm/replyablereactor.scala diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index 93525dddbd..b8c0420372 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -38,9 +38,9 @@ case class ! [a](ch: Channel[a], msg: a) * @version 0.9.17 * @author Philipp Haller */ -class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputChannel[Msg] { +class Channel[Msg](val receiver: Reactor) extends InputChannel[Msg] with OutputChannel[Msg] { - def this() = this(Actor.self) + def this() = this(Actor.rawSelf) /** * Sends a message to this Channel. @@ -78,7 +78,8 @@ class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputCha */ def receive[R](f: PartialFunction[Msg, R]): R = { val C = this.asInstanceOf[Channel[Any]] - receiver.receive { + val recvActor = receiver.asInstanceOf[Actor] + recvActor.receive { case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg]) } } @@ -100,7 +101,8 @@ class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputCha */ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { val C = this.asInstanceOf[Channel[Any]] - receiver.receiveWithin(msec) { + val recvActor = receiver.asInstanceOf[Actor] + recvActor.receiveWithin(msec) { case C ! msg if (f.isDefinedAt(msg)) => f(msg) case TIMEOUT => f(TIMEOUT) } @@ -133,7 +135,8 @@ class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputCha */ def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { val C = this.asInstanceOf[Channel[Any]] - receiver.reactWithin(msec) { + val recvActor = receiver.asInstanceOf[Actor] + recvActor.reactWithin(msec) { case C ! msg if (f.isDefinedAt(msg)) => f(msg) case TIMEOUT => f(TIMEOUT) } diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala index 92c42282d9..261ef56359 100644 --- a/src/actors/scala/actors/OutputChannel.scala +++ b/src/actors/scala/actors/OutputChannel.scala @@ -42,8 +42,8 @@ trait OutputChannel[-Msg] { def forward(msg: Msg): Unit /** - * Returns the Actor that is + * Returns the Reactor that is * receiving from this OutputChannel. */ - def receiver: Actor + def receiver: Reactor } diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala index 221b424c2c..1f7c774f4a 100644 --- a/src/actors/scala/actors/ReplyableReactor.scala +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -58,7 +58,7 @@ trait ReplyableReactor extends Replyable[Any, Any] { * returns a future representing the reply value. */ def !!(msg: Any): Future[Any] = { - val ftch = new Channel[Any](Actor.self(thiz.scheduler)) + val ftch = new Channel[Any](Actor.rawSelf(thiz.scheduler)) thiz.send(msg, ftch) Futures.fromInputChannel(ftch) } @@ -71,7 +71,7 @@ trait ReplyableReactor extends Replyable[Any, Any] { * precise type for the reply value. */ def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { - val ftch = new Channel[A](Actor.self(thiz.scheduler)) + val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler)) thiz.send(msg, new OutputChannel[Any] { def !(msg: Any) = ftch ! f(msg) diff --git a/test/files/jvm/replyablereactor.check b/test/files/jvm/replyablereactor.check new file mode 100644 index 0000000000..0944b17279 --- /dev/null +++ b/test/files/jvm/replyablereactor.check @@ -0,0 +1,5 @@ +'hello +'hello +'hello +'hello +'hello diff --git a/test/files/jvm/replyablereactor.scala b/test/files/jvm/replyablereactor.scala new file mode 100644 index 0000000000..5c45c7fc54 --- /dev/null +++ b/test/files/jvm/replyablereactor.scala @@ -0,0 +1,47 @@ +import scala.actors._ +import scala.actors.Actor._ + +class MyActor extends ReplyReactor with ReplyableReactor { + def act() { + loop { + react { + case 'hello => + sender ! 'hello + case 'stop => + exit() + } + } + } +} + +object Test { + def main(args: Array[String]) { + val a = new MyActor + a.start() + + val b = new Reactor { + def act() { + react { + case r: MyActor => + var i = 0 + loop { + i += 1 + val ft = r !! 'hello + react { + case ft.inputChannel ! msg => + if (i % 10000 == 0) + println(msg) + if (i >= 50000) { + r ! 'stop + exit() + } + } + } + } + } + } + b.start() + + b ! a + } +} -- cgit v1.2.3