From e32113307c381e8bc8558208a6076f60666784a2 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Sun, 28 Jun 2009 16:07:14 +0000 Subject: Refactoring of sender/reply, as well as !!, !? ... Refactoring of sender/reply, as well as !!, !? methods into separate traits. --- src/actors/scala/actors/AbstractActor.scala | 10 +- src/actors/scala/actors/Actor.scala | 213 +------------------------ src/actors/scala/actors/ActorTask.scala | 2 - src/actors/scala/actors/Future.scala | 22 +++ src/actors/scala/actors/Reactor.scala | 30 +--- src/actors/scala/actors/ReplyReactor.scala | 63 ++++++++ src/actors/scala/actors/Replyable.scala | 63 ++++++++ src/actors/scala/actors/ReplyableActor.scala | 104 ++++++++++++ src/actors/scala/actors/ReplyableReactor.scala | 88 ++++++++++ src/actors/scala/actors/Scheduler.scala | 1 + 10 files changed, 356 insertions(+), 240 deletions(-) create mode 100644 src/actors/scala/actors/ReplyReactor.scala create mode 100644 src/actors/scala/actors/Replyable.scala create mode 100644 src/actors/scala/actors/ReplyableActor.scala create mode 100644 src/actors/scala/actors/ReplyableReactor.scala diff --git a/src/actors/scala/actors/AbstractActor.scala b/src/actors/scala/actors/AbstractActor.scala index 7eaa4a0a58..b2ae3d300a 100644 --- a/src/actors/scala/actors/AbstractActor.scala +++ b/src/actors/scala/actors/AbstractActor.scala @@ -16,7 +16,7 @@ package scala.actors * @version 0.9.18 * @author Philipp Haller */ -trait AbstractActor extends OutputChannel[Any] { +trait AbstractActor extends OutputChannel[Any] with Replyable[Any, Any] { private[actors] var exiting = false @@ -26,12 +26,4 @@ trait AbstractActor extends OutputChannel[Any] { private[actors] def exit(from: AbstractActor, reason: AnyRef): Unit - def !?(msg: Any): Any - - def !?(msec: Long, msg: Any): Option[Any] - - def !!(msg: Any): Future[Any] - - def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] - } diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 1420a1af4d..11d8d63ac3 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -223,19 +223,22 @@ object Actor { /** * Returns the actor which sent the last received message. */ - def sender: OutputChannel[Any] = rawSelf.sender + def sender: OutputChannel[Any] = + rawSelf.asInstanceOf[ReplyReactor].sender /** * Send msg to the actor waiting in a call to * !?. */ - def reply(msg: Any): Unit = rawSelf.reply(msg) + def reply(msg: Any): Unit = + rawSelf.asInstanceOf[ReplyReactor].reply(msg) /** * Send () to the actor waiting in a call to * !?. */ - def reply(): Unit = rawSelf.reply(()) + def reply(): Unit = + rawSelf.asInstanceOf[ReplyReactor].reply(()) /** * Returns the number of messages in self's mailbox @@ -375,7 +378,7 @@ object Actor { * @author Philipp Haller */ @serializable -trait Actor extends Reactor with AbstractActor { +trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { /* The following two fields are only used when the actor * suspends by blocking its underlying thread, for example, @@ -622,208 +625,6 @@ trait Actor extends Reactor with AbstractActor { throw Actor.suspendException } - /** - * Sends msg to this actor (asynchronous). - */ - override def !(msg: Any) { - send(msg, Actor.rawSelf(scheduler)) - } - - /** - * Forwards msg to this actor (asynchronous). - */ - override def forward(msg: Any) { - send(msg, Actor.sender) - } - - /** - * Sends msg to this actor and awaits reply - * (synchronous). - * - * @param msg the message to be sent - * @return the reply - */ - def !?(msg: Any): Any = { - val replyCh = new Channel[Any](Actor.self(scheduler)) - send(msg, replyCh) - replyCh.receive { - case x => x - } - } - - /** - * Sends msg to this actor and awaits reply - * (synchronous) within msec milliseconds. - * - * @param msec the time span before timeout - * @param msg the message to be sent - * @return None in case of timeout, otherwise - * Some(x) where x is the reply - */ - def !?(msec: Long, msg: Any): Option[Any] = { - val replyCh = new Channel[Any](Actor.self(scheduler)) - send(msg, replyCh) - replyCh.receiveWithin(msec) { - case TIMEOUT => None - case x => Some(x) - } - } - - /** - * Sends msg to this actor and immediately - * returns a future representing the reply value. - */ - def !!(msg: Any): Future[Any] = { - val ftch = new Channel[Any](Actor.self(scheduler)) - val linkedChannel = new AbstractActor { - def !(msg: Any) = - ftch ! msg - def send(msg: Any, replyTo: OutputChannel[Any]) = - ftch.send(msg, replyTo) - def forward(msg: Any) = - ftch.forward(msg) - def receiver = - ftch.receiver - def linkTo(to: AbstractActor) { /* do nothing */ } - def unlinkFrom(from: AbstractActor) { /* do nothing */ } - def exit(from: AbstractActor, reason: AnyRef) { - ftch.send(Exit(from, reason), Actor.this) - } - // should never be invoked; return dummy value - def !?(msg: Any) = msg - // should never be invoked; return dummy value - def !?(msec: Long, msg: Any): Option[Any] = Some(msg) - // should never be invoked; return dummy value - def !!(msg: Any): Future[Any] = { - val someChan = new Channel[Any](Actor.self(scheduler)) - new Future[Any](someChan) { - def apply() = - if (isSet) value.get - else inputChannel.receive { - case any => value = Some(any); any - } - def respond(k: Any => Unit): Unit = - if (isSet) k(value.get) - else inputChannel.react { - case any => value = Some(any); k(any) - } - def isSet = value match { - case None => inputChannel.receiveWithin(0) { - case TIMEOUT => false - case any => value = Some(any); true - } - case Some(_) => true - } - } - } - // should never be invoked; return dummy value - def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { - val someChan = new Channel[A](Actor.self(scheduler)) - new Future[A](someChan) { - def apply() = - if (isSet) value.get.asInstanceOf[A] - else inputChannel.receive { - case any => value = Some(any); any - } - def respond(k: A => Unit): Unit = - if (isSet) k(value.get.asInstanceOf[A]) - else inputChannel.react { - case any => value = Some(any); k(any) - } - def isSet = value match { - case None => inputChannel.receiveWithin(0) { - case TIMEOUT => false - case any => value = Some(any); true - } - case Some(_) => true - } - } - } - } - linkTo(linkedChannel) - send(msg, linkedChannel) - new Future[Any](ftch) { - var exitReason: Option[Any] = None - val handleReply: PartialFunction[Any, Unit] = { - case Exit(from, reason) => - exitReason = Some(reason) - case any => - value = Some(any) - } - - def apply(): Any = - if (isSet) { - if (!value.isEmpty) - value.get - else if (!exitReason.isEmpty) { - val reason = exitReason.get - if (reason.isInstanceOf[Throwable]) - throw new ExecutionException(reason.asInstanceOf[Throwable]) - else - throw new ExecutionException(new Exception(reason.toString())) - } - } else inputChannel.receive(handleReply andThen {(x: Unit) => apply()}) - - def respond(k: Any => Unit): Unit = - if (isSet) - apply() - else - inputChannel.react(handleReply andThen {(x: Unit) => k(apply())}) - - def isSet = (value match { - case None => - val handleTimeout: PartialFunction[Any, Boolean] = { - case TIMEOUT => - false - } - val whatToDo = - handleTimeout orElse (handleReply andThen {(x: Unit) => true}) - inputChannel.receiveWithin(0)(whatToDo) - case Some(_) => true - }) || !exitReason.isEmpty - } - } - - /** - * Sends msg to this actor and immediately - * returns a future representing the reply value. - * The reply is post-processed using the partial function - * f. This also allows to recover a more - * precise type for the reply value. - */ - def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { - val ftch = new Channel[A](Actor.self(scheduler)) - send(msg, new OutputChannel[Any] { - def !(msg: Any) = - ftch ! f(msg) - def send(msg: Any, replyTo: OutputChannel[Any]) = - ftch.send(f(msg), replyTo) - def forward(msg: Any) = - ftch.forward(f(msg)) - def receiver = - ftch.receiver - }) - new Future[A](ftch) { - def apply() = - if (isSet) value.get.asInstanceOf[A] - else inputChannel.receive { - case any => value = Some(any); value.get.asInstanceOf[A] - } - def respond(k: A => Unit): Unit = - if (isSet) k(value.get.asInstanceOf[A]) - else inputChannel.react { - case any => value = Some(any); k(value.get.asInstanceOf[A]) - } - def isSet = value match { - case None => inputChannel.receiveWithin(0) { - case TIMEOUT => false - case any => value = Some(any); true - } - case Some(_) => true - } - } - } - /** * Receives the next message from this actor's mailbox. */ diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index 4236785433..c780aad2ef 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -59,8 +59,6 @@ class ActorTask extends Runnable { a.synchronized { if (!a.links.isEmpty) a.exitLinked(t) - else - t.printStackTrace() } } } finally { diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala index ff938ab1aa..0487c02cfa 100644 --- a/src/actors/scala/actors/Future.scala +++ b/src/actors/scala/actors/Future.scala @@ -136,4 +136,26 @@ object Futures { } results } + + def fromInputChannel[T](inputChannel: InputChannel[T]): Future[T] = + new Future[T](inputChannel) { + def apply() = + if (isSet) value.get.asInstanceOf[T] + else inputChannel.receive { + case any => value = Some(any); value.get.asInstanceOf[T] + } + def respond(k: T => Unit): Unit = + if (isSet) k(value.get.asInstanceOf[T]) + else inputChannel.react { + case any => value = Some(any); k(value.get.asInstanceOf[T]) + } + def isSet = value match { + case None => inputChannel.receiveWithin(0) { + case TIMEOUT => false + case any => value = Some(any); true + } + case Some(_) => true + } + } + } diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 599dee69db..d8c1ba3904 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -12,23 +12,18 @@ package scala.actors import scala.collection.mutable.Queue +/** + * The Reactor trait provides lightweight actors. + * + * @author Philipp Haller + */ trait Reactor extends OutputChannel[Any] { - @volatile - protected var ignoreSender: Boolean = false - /* The actor's mailbox. */ protected val mailbox = new MessageQueue protected var sendBuffer = new Queue[(Any, OutputChannel[Any])] - /* A list of the current senders. The head of the list is - * the sender of the message that was received last. - */ - protected var senders: List[OutputChannel[Any]] = - if (ignoreSender) List(null) - else Nil - /* If the actor waits in a react, continuation holds the * message handler that react was called with. */ @@ -88,8 +83,6 @@ trait Reactor extends OutputChannel[Any] { new ReactorTask(this, { block }) protected[this] def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) { - if (!ignoreSender) - senders = List(item._2) // assert continuation != null if (onSameThread) continuation(item._1) @@ -98,11 +91,11 @@ trait Reactor extends OutputChannel[Any] { } def !(msg: Any) { - send(msg, if (ignoreSender) null else Actor.rawSelf(scheduler)) + send(msg, null) } def forward(msg: Any) { - send(msg, if (ignoreSender) null else Actor.sender) + send(msg, null) } def receiver: Actor = this.asInstanceOf[Actor] @@ -151,15 +144,6 @@ trait Reactor extends OutputChannel[Any] { throw Actor.suspendException } - protected[actors] def sender: OutputChannel[Any] = senders.head - - /** - * Replies with msg to the sender. - */ - protected[actors] def reply(msg: Any) { - sender ! msg - } - protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { scheduler execute (new LightReaction(this, if (f eq null) continuation else f, diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala new file mode 100644 index 0000000000..7e8bc0ab37 --- /dev/null +++ b/src/actors/scala/actors/ReplyReactor.scala @@ -0,0 +1,63 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +/** + * The ReplyReactor trait extends the Reactor trait with + * methods to reply to the sender of a message. + * Sending a message to a ReplyReactor implicitly + * passes a reference to the sender together with the + * message. + * + * @author Philipp Haller + */ +trait ReplyReactor extends Reactor { + + /* A list of the current senders. The head of the list is + * the sender of the message that was received last. + */ + protected var senders: List[OutputChannel[Any]] = + Nil + + protected[actors] def sender: OutputChannel[Any] = + senders.head + + /** + * Replies with msg to the sender. + */ + protected[actors] def reply(msg: Any) { + sender ! msg + } + + /** + * Sends msg to this actor (asynchronous). + */ + override def !(msg: Any) { + send(msg, Actor.rawSelf(scheduler)) + } + + /** + * Forwards msg to this actor (asynchronous). + */ + override def forward(msg: Any) { + send(msg, Actor.sender) + } + + override protected[this] def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) { + senders = List(item._2) + // assert continuation != null + if (onSameThread) + continuation(item._1) + else + scheduleActor(null, item._1) + } + +} diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala new file mode 100644 index 0000000000..330b16461d --- /dev/null +++ b/src/actors/scala/actors/Replyable.scala @@ -0,0 +1,63 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +/** + * The Replyable trait defines result-bearing message send operations + * on replyable actors. + * + * @author Philipp Haller + */ +trait Replyable[T, R] { + + /** + * Sends msg to this Replyable and awaits reply + * (synchronous). + * + * @param msg the message to be sent + * @return the reply + */ + def !?(msg: T): R + + /** + * Sends msg to this Replyable and awaits reply + * (synchronous) within msec milliseconds. + * + * @param msec the time span before timeout + * @param msg the message to be sent + * @return None in case of timeout, otherwise + * Some(x) where x is the reply + */ + def !?(msec: Long, msg: T): Option[R] + + /** + * Sends msg to this actor and immediately + * returns a future representing the reply value. + * + * @param msg the message to be sent + * @return the future + */ + def !!(msg: T): Future[R] + + /** + * Sends msg to this actor and immediately + * returns a future representing the reply value. + * The reply is post-processed using the partial function + * f. This also allows to recover a more + * precise type for the reply value. + * + * @param msg the message to be sent + * @param f the function to be applied to the response + * @return the future + */ + def !![P](msg: T, f: PartialFunction[R, P]): Future[P] + +} diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala new file mode 100644 index 0000000000..b9576a837b --- /dev/null +++ b/src/actors/scala/actors/ReplyableActor.scala @@ -0,0 +1,104 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +import java.util.concurrent.ExecutionException + +/** + * The ReplyableActor trait provides + * message send operations that may result in a + * response from the receiver. + * + * @author Philipp Haller + */ +trait ReplyableActor extends ReplyableReactor { + thiz: AbstractActor with ReplyReactor => + + /** + * Sends msg to this actor and immediately + * returns a future representing the reply value. + */ + override def !!(msg: Any): Future[Any] = { + val ftch = new Channel[Any](Actor.self(thiz.scheduler)) + val linkedChannel = new AbstractActor { + def !(msg: Any) = + ftch ! msg + def send(msg: Any, replyTo: OutputChannel[Any]) = + ftch.send(msg, replyTo) + def forward(msg: Any) = + ftch.forward(msg) + def receiver = + ftch.receiver + def linkTo(to: AbstractActor) { /* do nothing */ } + def unlinkFrom(from: AbstractActor) { /* do nothing */ } + def exit(from: AbstractActor, reason: AnyRef) { + ftch.send(Exit(from, reason), thiz) + } + // should never be invoked; return dummy value + def !?(msg: Any) = msg + // should never be invoked; return dummy value + def !?(msec: Long, msg: Any): Option[Any] = Some(msg) + // should never be invoked; return dummy value + def !!(msg: Any): Future[Any] = { + val someChan = new Channel[Any](Actor.self(thiz.scheduler)) + Futures.fromInputChannel(someChan) + } + // should never be invoked; return dummy value + def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { + val someChan = new Channel[A](Actor.self(thiz.scheduler)) + Futures.fromInputChannel(someChan) + } + } + thiz.linkTo(linkedChannel) + thiz.send(msg, linkedChannel) + new Future[Any](ftch) { + var exitReason: Option[Any] = None + val handleReply: PartialFunction[Any, Unit] = { + case Exit(from, reason) => + exitReason = Some(reason) + case any => + value = Some(any) + } + + def apply(): Any = + if (isSet) { + if (!value.isEmpty) + value.get + else if (!exitReason.isEmpty) { + val reason = exitReason.get + if (reason.isInstanceOf[Throwable]) + throw new ExecutionException(reason.asInstanceOf[Throwable]) + else + throw new ExecutionException(new Exception(reason.toString())) + } + } else inputChannel.receive(handleReply andThen {(x: Unit) => apply()}) + + def respond(k: Any => Unit): Unit = + if (isSet) + apply() + else + inputChannel.react(handleReply andThen {(x: Unit) => k(apply())}) + + def isSet = (value match { + case None => + val handleTimeout: PartialFunction[Any, Boolean] = { + case TIMEOUT => + false + } + val whatToDo = + handleTimeout orElse (handleReply andThen {(x: Unit) => true}) + inputChannel.receiveWithin(0)(whatToDo) + case Some(_) => true + }) || !exitReason.isEmpty + } + } + +} diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala new file mode 100644 index 0000000000..221b424c2c --- /dev/null +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -0,0 +1,88 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +/** + * The ReplyableReactor trait provides + * message send operations that may result in a + * response from the receiver. + * + * @author Philipp Haller + */ +trait ReplyableReactor extends Replyable[Any, Any] { + thiz: ReplyReactor => + + /** + * Sends msg to this actor and awaits reply + * (synchronous). + * + * @param msg the message to be sent + * @return the reply + */ + def !?(msg: Any): Any = { + val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) + thiz.send(msg, replyCh) + replyCh.receive { + case x => x + } + } + + /** + * Sends msg to this actor and awaits reply + * (synchronous) within msec milliseconds. + * + * @param msec the time span before timeout + * @param msg the message to be sent + * @return None in case of timeout, otherwise + * Some(x) where x is the reply + */ + def !?(msec: Long, msg: Any): Option[Any] = { + val replyCh = new Channel[Any](Actor.self(thiz.scheduler)) + thiz.send(msg, replyCh) + replyCh.receiveWithin(msec) { + case TIMEOUT => None + case x => Some(x) + } + } + + /** + * Sends msg to this actor and immediately + * returns a future representing the reply value. + */ + def !!(msg: Any): Future[Any] = { + val ftch = new Channel[Any](Actor.self(thiz.scheduler)) + thiz.send(msg, ftch) + Futures.fromInputChannel(ftch) + } + + /** + * Sends msg to this actor and immediately + * returns a future representing the reply value. + * The reply is post-processed using the partial function + * f. This also allows to recover a more + * precise type for the reply value. + */ + def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { + val ftch = new Channel[A](Actor.self(thiz.scheduler)) + thiz.send(msg, new OutputChannel[Any] { + def !(msg: Any) = + ftch ! f(msg) + def send(msg: Any, replyTo: OutputChannel[Any]) = + ftch.send(f(msg), replyTo) + def forward(msg: Any) = + ftch.forward(f(msg)) + def receiver = + ftch.receiver + }) + Futures.fromInputChannel(ftch) + } + +} diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index df9f94cdfb..abda6d329e 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -33,6 +33,7 @@ object Scheduler extends DelegatingScheduler { workQueue) val s = new SimpleExecutorScheduler(threadPool, true) //val s = new ForkJoinScheduler + //Debug.info(this+": starting new "+s) s.start() s } -- cgit v1.2.3