From 8befdb8b05b097c7861dc959f4ec29e5b8a758e6 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 6 Oct 2009 17:43:02 +0000 Subject: Restored type of receiver in OutputChannel and ... Restored type of receiver in OutputChannel and Channel to Actor. ReplyReactor inherits from ReplyableReactor. ReplyableReactor is now private. --- src/actors/scala/actors/Channel.scala | 4 +- src/actors/scala/actors/OutputChannel.scala | 2 +- src/actors/scala/actors/ReactChannel.scala | 126 +++++++++++++++++++++++++ src/actors/scala/actors/ReplyReactor.scala | 2 +- src/actors/scala/actors/ReplyableActor.scala | 2 +- src/actors/scala/actors/ReplyableReactor.scala | 33 ++++--- 6 files changed, 149 insertions(+), 20 deletions(-) create mode 100644 src/actors/scala/actors/ReactChannel.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index b8c0420372..0c9beacca6 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -38,9 +38,9 @@ case class ! [a](ch: Channel[a], msg: a) * @version 0.9.17 * @author Philipp Haller */ -class Channel[Msg](val receiver: Reactor) extends InputChannel[Msg] with OutputChannel[Msg] { +class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputChannel[Msg] { - def this() = this(Actor.rawSelf) + def this() = this(Actor.self) /** * Sends a message to this Channel. diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala index 92bd12a55b..aba8327970 100644 --- a/src/actors/scala/actors/OutputChannel.scala +++ b/src/actors/scala/actors/OutputChannel.scala @@ -45,5 +45,5 @@ trait OutputChannel[-Msg] extends AbstractReactor[Msg] { * Returns the Reactor that is * receiving from this OutputChannel. */ - def receiver: Reactor + def receiver: Actor } diff --git a/src/actors/scala/actors/ReactChannel.scala b/src/actors/scala/actors/ReactChannel.scala new file mode 100644 index 0000000000..ba0828a807 --- /dev/null +++ b/src/actors/scala/actors/ReactChannel.scala @@ -0,0 +1,126 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: InputChannel.scala 18844 2009-09-30 20:28:49Z phaller $ + +package scala.actors + +/** + * The ReactChannel trait. + * + * @author Philipp Haller + */ +private[actors] class ReactChannel[Msg](receiver: Reactor) extends InputChannel[Msg] { + + private case class SendToReactor(channel: ReactChannel[Msg], msg: Msg) + + /** + * Sends a message to this ReactChannel. + * + * @param msg the message to be sent + */ + def !(msg: Msg) { + receiver ! SendToReactor(this, msg) + } + + /** + * Sends a message to this ReactChannel + * (asynchronous) supplying explicit reply destination. + * + * @param msg the message to send + * @param replyTo the reply destination + */ + def send(msg: Msg, replyTo: OutputChannel[Any]) { + receiver.send(SendToReactor(this, msg), replyTo) + } + + /** + * Forwards msg to this keeping the + * last sender as sender instead of self. + */ + def forward(msg: Msg) { + receiver forward SendToReactor(this, msg) + } + + /** + * Receives a message from this ReactChannel. + *

+ * This method never returns. Therefore, the rest of the computation + * has to be contained in the actions of the partial function. + * + * @param f a partial function with message patterns and actions + */ + def react(f: PartialFunction[Msg, Unit]): Nothing = { + val C = this + receiver.react { + case SendToReactor(C, msg) if (f.isDefinedAt(msg.asInstanceOf[Msg])) => + f(msg.asInstanceOf[Msg]) + } + } + + /** + * Receives a message from this ReactChannel within + * a certain time span. + *

+ * This method never returns. Therefore, the rest of the computation + * has to be contained in the actions of the partial function. + * + * @param msec the time span before timeout + * @param f a partial function with message patterns and actions + */ + def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { + val C = this + val recvActor = receiver.asInstanceOf[Actor] + recvActor.reactWithin(msec) { + case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => + f(msg.asInstanceOf[Msg]) + case TIMEOUT => f(TIMEOUT) + } + } + + /** + * Receives a message from this ReactChannel. + * + * @param f a partial function with message patterns and actions + * @return result of processing the received value + */ + def receive[R](f: PartialFunction[Msg, R]): R = { + val C = this + val recvActor = receiver.asInstanceOf[Actor] + recvActor.receive { + case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => + f(msg.asInstanceOf[Msg]) + } + } + + /** + * Receives a message from this ReactChannel within a certain + * time span. + * + * @param msec the time span before timeout + * @param f a partial function with message patterns and actions + * @return result of processing the received value + */ + def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { + val C = this + val recvActor = receiver.asInstanceOf[Actor] + recvActor.receiveWithin(msec) { + case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => + f(msg.asInstanceOf[Msg]) + case TIMEOUT => f(TIMEOUT) + } + } + + /** + * Receives the next message from this ReactChannel. + */ + def ? : Msg = receive { + case x => x + } + +} diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala index db7a74aba5..762ceef4d6 100644 --- a/src/actors/scala/actors/ReplyReactor.scala +++ b/src/actors/scala/actors/ReplyReactor.scala @@ -19,7 +19,7 @@ package scala.actors * * @author Philipp Haller */ -trait ReplyReactor extends Reactor { +trait ReplyReactor extends Reactor with ReplyableReactor { /* A list of the current senders. The head of the list is * the sender of the message that was received last. diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala index a7fb44dd3c..bf9703a629 100644 --- a/src/actors/scala/actors/ReplyableActor.scala +++ b/src/actors/scala/actors/ReplyableActor.scala @@ -63,7 +63,7 @@ private[actors] trait ReplyableActor extends ReplyableReactor { * precise type for the reply value. */ override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { - val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler)) + val ftch = new Channel[A](Actor.self(thiz.scheduler)) thiz.send(msg, new OutputChannel[Any] { def !(msg: Any) = ftch ! f(msg) diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala index 84168abe0a..6ec1d31d1f 100644 --- a/src/actors/scala/actors/ReplyableReactor.scala +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -17,8 +17,8 @@ package scala.actors * * @author Philipp Haller */ -trait ReplyableReactor extends Replyable[Any, Any] { - thiz: ReplyReactor => +private[actors] trait ReplyableReactor extends Replyable[Any, Any] { + _: ReplyReactor => /** * Sends msg to this actor and awaits reply @@ -40,7 +40,7 @@ trait ReplyableReactor extends Replyable[Any, Any] { * Some(x) where x is the reply */ def !?(msec: Long, msg: Any): Option[Any] = { - val myself = Actor.rawSelf(thiz.scheduler) + val myself = Actor.rawSelf(this.scheduler) val res = new scala.concurrent.SyncVar[Any] val out = new OutputChannel[Any] { def !(msg: Any) = @@ -50,9 +50,9 @@ trait ReplyableReactor extends Replyable[Any, Any] { def forward(msg: Any) = res set msg def receiver = - myself + myself.asInstanceOf[Actor] } - thiz.send(msg, out) + this.send(msg, out) res.get(msec) } @@ -71,28 +71,31 @@ trait ReplyableReactor extends Replyable[Any, Any] { * precise type for the reply value. */ override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { - val myself = Actor.rawSelf(thiz.scheduler) - val ftch = new Channel[A](myself) + val myself = Actor.rawSelf(this.scheduler) + val ftch = new ReactChannel[A](myself) val res = new scala.concurrent.SyncVar[A] val out = new OutputChannel[Any] { def !(msg: Any) = { - ftch ! f(msg) - res set f(msg) + val msg1 = f(msg) + ftch ! msg1 + res set msg1 } def send(msg: Any, replyTo: OutputChannel[Any]) = { - ftch.send(f(msg), replyTo) - res set f(msg) + val msg1 = f(msg) + ftch.send(msg1, replyTo) + res set msg1 } def forward(msg: Any) = { - ftch forward f(msg) - res set f(msg) + val msg1 = f(msg) + ftch forward msg1 + res set msg1 } def receiver = - myself + myself.asInstanceOf[Actor] } - thiz.send(msg, out) + this.send(msg, out) new Future[A](ftch) { def apply() = -- cgit v1.2.3