summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-06-28 16:07:14 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-06-28 16:07:14 +0000
commite32113307c381e8bc8558208a6076f60666784a2 (patch)
tree387b2c6842a05e036434c24131bef5d882598cac /src
parent505ea7c3e0326fef71c044cfc424fe562fa8b39c (diff)
downloadscala-e32113307c381e8bc8558208a6076f60666784a2.tar.gz
scala-e32113307c381e8bc8558208a6076f60666784a2.tar.bz2
scala-e32113307c381e8bc8558208a6076f60666784a2.zip
Refactoring of sender/reply, as well as !!, !? ...
Refactoring of sender/reply, as well as !!, !? methods into separate traits.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/AbstractActor.scala10
-rw-r--r--src/actors/scala/actors/Actor.scala213
-rw-r--r--src/actors/scala/actors/ActorTask.scala2
-rw-r--r--src/actors/scala/actors/Future.scala22
-rw-r--r--src/actors/scala/actors/Reactor.scala30
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala63
-rw-r--r--src/actors/scala/actors/Replyable.scala63
-rw-r--r--src/actors/scala/actors/ReplyableActor.scala104
-rw-r--r--src/actors/scala/actors/ReplyableReactor.scala88
-rw-r--r--src/actors/scala/actors/Scheduler.scala1
10 files changed, 356 insertions, 240 deletions
diff --git a/src/actors/scala/actors/AbstractActor.scala b/src/actors/scala/actors/AbstractActor.scala
index 7eaa4a0a58..b2ae3d300a 100644
--- a/src/actors/scala/actors/AbstractActor.scala
+++ b/src/actors/scala/actors/AbstractActor.scala
@@ -16,7 +16,7 @@ package scala.actors
* @version 0.9.18
* @author Philipp Haller
*/
-trait AbstractActor extends OutputChannel[Any] {
+trait AbstractActor extends OutputChannel[Any] with Replyable[Any, Any] {
private[actors] var exiting = false
@@ -26,12 +26,4 @@ trait AbstractActor extends OutputChannel[Any] {
private[actors] def exit(from: AbstractActor, reason: AnyRef): Unit
- def !?(msg: Any): Any
-
- def !?(msec: Long, msg: Any): Option[Any]
-
- def !!(msg: Any): Future[Any]
-
- def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A]
-
}
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 1420a1af4d..11d8d63ac3 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -223,19 +223,22 @@ object Actor {
/**
* Returns the actor which sent the last received message.
*/
- def sender: OutputChannel[Any] = rawSelf.sender
+ def sender: OutputChannel[Any] =
+ rawSelf.asInstanceOf[ReplyReactor].sender
/**
* Send <code>msg</code> to the actor waiting in a call to
* <code>!?</code>.
*/
- def reply(msg: Any): Unit = rawSelf.reply(msg)
+ def reply(msg: Any): Unit =
+ rawSelf.asInstanceOf[ReplyReactor].reply(msg)
/**
* Send <code>()</code> to the actor waiting in a call to
* <code>!?</code>.
*/
- def reply(): Unit = rawSelf.reply(())
+ def reply(): Unit =
+ rawSelf.asInstanceOf[ReplyReactor].reply(())
/**
* Returns the number of messages in <code>self</code>'s mailbox
@@ -375,7 +378,7 @@ object Actor {
* @author Philipp Haller
*/
@serializable
-trait Actor extends Reactor with AbstractActor {
+trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
/* The following two fields are only used when the actor
* suspends by blocking its underlying thread, for example,
@@ -623,208 +626,6 @@ trait Actor extends Reactor with AbstractActor {
}
/**
- * Sends <code>msg</code> to this actor (asynchronous).
- */
- override def !(msg: Any) {
- send(msg, Actor.rawSelf(scheduler))
- }
-
- /**
- * Forwards <code>msg</code> to this actor (asynchronous).
- */
- override def forward(msg: Any) {
- send(msg, Actor.sender)
- }
-
- /**
- * Sends <code>msg</code> to this actor and awaits reply
- * (synchronous).
- *
- * @param msg the message to be sent
- * @return the reply
- */
- def !?(msg: Any): Any = {
- val replyCh = new Channel[Any](Actor.self(scheduler))
- send(msg, replyCh)
- replyCh.receive {
- case x => x
- }
- }
-
- /**
- * Sends <code>msg</code> to this actor and awaits reply
- * (synchronous) within <code>msec</code> milliseconds.
- *
- * @param msec the time span before timeout
- * @param msg the message to be sent
- * @return <code>None</code> in case of timeout, otherwise
- * <code>Some(x)</code> where <code>x</code> is the reply
- */
- def !?(msec: Long, msg: Any): Option[Any] = {
- val replyCh = new Channel[Any](Actor.self(scheduler))
- send(msg, replyCh)
- replyCh.receiveWithin(msec) {
- case TIMEOUT => None
- case x => Some(x)
- }
- }
-
- /**
- * Sends <code>msg</code> to this actor and immediately
- * returns a future representing the reply value.
- */
- def !!(msg: Any): Future[Any] = {
- val ftch = new Channel[Any](Actor.self(scheduler))
- val linkedChannel = new AbstractActor {
- def !(msg: Any) =
- ftch ! msg
- def send(msg: Any, replyTo: OutputChannel[Any]) =
- ftch.send(msg, replyTo)
- def forward(msg: Any) =
- ftch.forward(msg)
- def receiver =
- ftch.receiver
- def linkTo(to: AbstractActor) { /* do nothing */ }
- def unlinkFrom(from: AbstractActor) { /* do nothing */ }
- def exit(from: AbstractActor, reason: AnyRef) {
- ftch.send(Exit(from, reason), Actor.this)
- }
- // should never be invoked; return dummy value
- def !?(msg: Any) = msg
- // should never be invoked; return dummy value
- def !?(msec: Long, msg: Any): Option[Any] = Some(msg)
- // should never be invoked; return dummy value
- def !!(msg: Any): Future[Any] = {
- val someChan = new Channel[Any](Actor.self(scheduler))
- new Future[Any](someChan) {
- def apply() =
- if (isSet) value.get
- else inputChannel.receive {
- case any => value = Some(any); any
- }
- def respond(k: Any => Unit): Unit =
- if (isSet) k(value.get)
- else inputChannel.react {
- case any => value = Some(any); k(any)
- }
- def isSet = value match {
- case None => inputChannel.receiveWithin(0) {
- case TIMEOUT => false
- case any => value = Some(any); true
- }
- case Some(_) => true
- }
- }
- }
- // should never be invoked; return dummy value
- def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
- val someChan = new Channel[A](Actor.self(scheduler))
- new Future[A](someChan) {
- def apply() =
- if (isSet) value.get.asInstanceOf[A]
- else inputChannel.receive {
- case any => value = Some(any); any
- }
- def respond(k: A => Unit): Unit =
- if (isSet) k(value.get.asInstanceOf[A])
- else inputChannel.react {
- case any => value = Some(any); k(any)
- }
- def isSet = value match {
- case None => inputChannel.receiveWithin(0) {
- case TIMEOUT => false
- case any => value = Some(any); true
- }
- case Some(_) => true
- }
- }
- }
- }
- linkTo(linkedChannel)
- send(msg, linkedChannel)
- new Future[Any](ftch) {
- var exitReason: Option[Any] = None
- val handleReply: PartialFunction[Any, Unit] = {
- case Exit(from, reason) =>
- exitReason = Some(reason)
- case any =>
- value = Some(any)
- }
-
- def apply(): Any =
- if (isSet) {
- if (!value.isEmpty)
- value.get
- else if (!exitReason.isEmpty) {
- val reason = exitReason.get
- if (reason.isInstanceOf[Throwable])
- throw new ExecutionException(reason.asInstanceOf[Throwable])
- else
- throw new ExecutionException(new Exception(reason.toString()))
- }
- } else inputChannel.receive(handleReply andThen {(x: Unit) => apply()})
-
- def respond(k: Any => Unit): Unit =
- if (isSet)
- apply()
- else
- inputChannel.react(handleReply andThen {(x: Unit) => k(apply())})
-
- def isSet = (value match {
- case None =>
- val handleTimeout: PartialFunction[Any, Boolean] = {
- case TIMEOUT =>
- false
- }
- val whatToDo =
- handleTimeout orElse (handleReply andThen {(x: Unit) => true})
- inputChannel.receiveWithin(0)(whatToDo)
- case Some(_) => true
- }) || !exitReason.isEmpty
- }
- }
-
- /**
- * Sends <code>msg</code> to this actor and immediately
- * returns a future representing the reply value.
- * The reply is post-processed using the partial function
- * <code>f</code>. This also allows to recover a more
- * precise type for the reply value.
- */
- def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
- val ftch = new Channel[A](Actor.self(scheduler))
- send(msg, new OutputChannel[Any] {
- def !(msg: Any) =
- ftch ! f(msg)
- def send(msg: Any, replyTo: OutputChannel[Any]) =
- ftch.send(f(msg), replyTo)
- def forward(msg: Any) =
- ftch.forward(f(msg))
- def receiver =
- ftch.receiver
- })
- new Future[A](ftch) {
- def apply() =
- if (isSet) value.get.asInstanceOf[A]
- else inputChannel.receive {
- case any => value = Some(any); value.get.asInstanceOf[A]
- }
- def respond(k: A => Unit): Unit =
- if (isSet) k(value.get.asInstanceOf[A])
- else inputChannel.react {
- case any => value = Some(any); k(value.get.asInstanceOf[A])
- }
- def isSet = value match {
- case None => inputChannel.receiveWithin(0) {
- case TIMEOUT => false
- case any => value = Some(any); true
- }
- case Some(_) => true
- }
- }
- }
-
- /**
* Receives the next message from this actor's mailbox.
*/
def ? : Any = receive {
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
index 4236785433..c780aad2ef 100644
--- a/src/actors/scala/actors/ActorTask.scala
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -59,8 +59,6 @@ class ActorTask extends Runnable {
a.synchronized {
if (!a.links.isEmpty)
a.exitLinked(t)
- else
- t.printStackTrace()
}
}
} finally {
diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala
index ff938ab1aa..0487c02cfa 100644
--- a/src/actors/scala/actors/Future.scala
+++ b/src/actors/scala/actors/Future.scala
@@ -136,4 +136,26 @@ object Futures {
}
results
}
+
+ def fromInputChannel[T](inputChannel: InputChannel[T]): Future[T] =
+ new Future[T](inputChannel) {
+ def apply() =
+ if (isSet) value.get.asInstanceOf[T]
+ else inputChannel.receive {
+ case any => value = Some(any); value.get.asInstanceOf[T]
+ }
+ def respond(k: T => Unit): Unit =
+ if (isSet) k(value.get.asInstanceOf[T])
+ else inputChannel.react {
+ case any => value = Some(any); k(value.get.asInstanceOf[T])
+ }
+ def isSet = value match {
+ case None => inputChannel.receiveWithin(0) {
+ case TIMEOUT => false
+ case any => value = Some(any); true
+ }
+ case Some(_) => true
+ }
+ }
+
}
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 599dee69db..d8c1ba3904 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -12,23 +12,18 @@ package scala.actors
import scala.collection.mutable.Queue
+/**
+ * The Reactor trait provides lightweight actors.
+ *
+ * @author Philipp Haller
+ */
trait Reactor extends OutputChannel[Any] {
- @volatile
- protected var ignoreSender: Boolean = false
-
/* The actor's mailbox. */
protected val mailbox = new MessageQueue
protected var sendBuffer = new Queue[(Any, OutputChannel[Any])]
- /* A list of the current senders. The head of the list is
- * the sender of the message that was received last.
- */
- protected var senders: List[OutputChannel[Any]] =
- if (ignoreSender) List(null)
- else Nil
-
/* If the actor waits in a react, continuation holds the
* message handler that react was called with.
*/
@@ -88,8 +83,6 @@ trait Reactor extends OutputChannel[Any] {
new ReactorTask(this, { block })
protected[this] def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
- if (!ignoreSender)
- senders = List(item._2)
// assert continuation != null
if (onSameThread)
continuation(item._1)
@@ -98,11 +91,11 @@ trait Reactor extends OutputChannel[Any] {
}
def !(msg: Any) {
- send(msg, if (ignoreSender) null else Actor.rawSelf(scheduler))
+ send(msg, null)
}
def forward(msg: Any) {
- send(msg, if (ignoreSender) null else Actor.sender)
+ send(msg, null)
}
def receiver: Actor = this.asInstanceOf[Actor]
@@ -151,15 +144,6 @@ trait Reactor extends OutputChannel[Any] {
throw Actor.suspendException
}
- protected[actors] def sender: OutputChannel[Any] = senders.head
-
- /**
- * Replies with <code>msg</code> to the sender.
- */
- protected[actors] def reply(msg: Any) {
- sender ! msg
- }
-
protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
scheduler execute (new LightReaction(this,
if (f eq null) continuation else f,
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
new file mode 100644
index 0000000000..7e8bc0ab37
--- /dev/null
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -0,0 +1,63 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+/**
+ * The ReplyReactor trait extends the Reactor trait with
+ * methods to reply to the sender of a message.
+ * Sending a message to a ReplyReactor implicitly
+ * passes a reference to the sender together with the
+ * message.
+ *
+ * @author Philipp Haller
+ */
+trait ReplyReactor extends Reactor {
+
+ /* A list of the current senders. The head of the list is
+ * the sender of the message that was received last.
+ */
+ protected var senders: List[OutputChannel[Any]] =
+ Nil
+
+ protected[actors] def sender: OutputChannel[Any] =
+ senders.head
+
+ /**
+ * Replies with <code>msg</code> to the sender.
+ */
+ protected[actors] def reply(msg: Any) {
+ sender ! msg
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor (asynchronous).
+ */
+ override def !(msg: Any) {
+ send(msg, Actor.rawSelf(scheduler))
+ }
+
+ /**
+ * Forwards <code>msg</code> to this actor (asynchronous).
+ */
+ override def forward(msg: Any) {
+ send(msg, Actor.sender)
+ }
+
+ override protected[this] def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
+ senders = List(item._2)
+ // assert continuation != null
+ if (onSameThread)
+ continuation(item._1)
+ else
+ scheduleActor(null, item._1)
+ }
+
+}
diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala
new file mode 100644
index 0000000000..330b16461d
--- /dev/null
+++ b/src/actors/scala/actors/Replyable.scala
@@ -0,0 +1,63 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+/**
+ * The Replyable trait defines result-bearing message send operations
+ * on replyable actors.
+ *
+ * @author Philipp Haller
+ */
+trait Replyable[T, R] {
+
+ /**
+ * Sends <code>msg</code> to this Replyable and awaits reply
+ * (synchronous).
+ *
+ * @param msg the message to be sent
+ * @return the reply
+ */
+ def !?(msg: T): R
+
+ /**
+ * Sends <code>msg</code> to this Replyable and awaits reply
+ * (synchronous) within <code>msec</code> milliseconds.
+ *
+ * @param msec the time span before timeout
+ * @param msg the message to be sent
+ * @return <code>None</code> in case of timeout, otherwise
+ * <code>Some(x)</code> where <code>x</code> is the reply
+ */
+ def !?(msec: Long, msg: T): Option[R]
+
+ /**
+ * Sends <code>msg</code> to this actor and immediately
+ * returns a future representing the reply value.
+ *
+ * @param msg the message to be sent
+ * @return the future
+ */
+ def !!(msg: T): Future[R]
+
+ /**
+ * Sends <code>msg</code> to this actor and immediately
+ * returns a future representing the reply value.
+ * The reply is post-processed using the partial function
+ * <code>f</code>. This also allows to recover a more
+ * precise type for the reply value.
+ *
+ * @param msg the message to be sent
+ * @param f the function to be applied to the response
+ * @return the future
+ */
+ def !![P](msg: T, f: PartialFunction[R, P]): Future[P]
+
+}
diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala
new file mode 100644
index 0000000000..b9576a837b
--- /dev/null
+++ b/src/actors/scala/actors/ReplyableActor.scala
@@ -0,0 +1,104 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+import java.util.concurrent.ExecutionException
+
+/**
+ * The ReplyableActor trait provides
+ * message send operations that may result in a
+ * response from the receiver.
+ *
+ * @author Philipp Haller
+ */
+trait ReplyableActor extends ReplyableReactor {
+ thiz: AbstractActor with ReplyReactor =>
+
+ /**
+ * Sends <code>msg</code> to this actor and immediately
+ * returns a future representing the reply value.
+ */
+ override def !!(msg: Any): Future[Any] = {
+ val ftch = new Channel[Any](Actor.self(thiz.scheduler))
+ val linkedChannel = new AbstractActor {
+ def !(msg: Any) =
+ ftch ! msg
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ ftch.send(msg, replyTo)
+ def forward(msg: Any) =
+ ftch.forward(msg)
+ def receiver =
+ ftch.receiver
+ def linkTo(to: AbstractActor) { /* do nothing */ }
+ def unlinkFrom(from: AbstractActor) { /* do nothing */ }
+ def exit(from: AbstractActor, reason: AnyRef) {
+ ftch.send(Exit(from, reason), thiz)
+ }
+ // should never be invoked; return dummy value
+ def !?(msg: Any) = msg
+ // should never be invoked; return dummy value
+ def !?(msec: Long, msg: Any): Option[Any] = Some(msg)
+ // should never be invoked; return dummy value
+ def !!(msg: Any): Future[Any] = {
+ val someChan = new Channel[Any](Actor.self(thiz.scheduler))
+ Futures.fromInputChannel(someChan)
+ }
+ // should never be invoked; return dummy value
+ def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
+ val someChan = new Channel[A](Actor.self(thiz.scheduler))
+ Futures.fromInputChannel(someChan)
+ }
+ }
+ thiz.linkTo(linkedChannel)
+ thiz.send(msg, linkedChannel)
+ new Future[Any](ftch) {
+ var exitReason: Option[Any] = None
+ val handleReply: PartialFunction[Any, Unit] = {
+ case Exit(from, reason) =>
+ exitReason = Some(reason)
+ case any =>
+ value = Some(any)
+ }
+
+ def apply(): Any =
+ if (isSet) {
+ if (!value.isEmpty)
+ value.get
+ else if (!exitReason.isEmpty) {
+ val reason = exitReason.get
+ if (reason.isInstanceOf[Throwable])
+ throw new ExecutionException(reason.asInstanceOf[Throwable])
+ else
+ throw new ExecutionException(new Exception(reason.toString()))
+ }
+ } else inputChannel.receive(handleReply andThen {(x: Unit) => apply()})
+
+ def respond(k: Any => Unit): Unit =
+ if (isSet)
+ apply()
+ else
+ inputChannel.react(handleReply andThen {(x: Unit) => k(apply())})
+
+ def isSet = (value match {
+ case None =>
+ val handleTimeout: PartialFunction[Any, Boolean] = {
+ case TIMEOUT =>
+ false
+ }
+ val whatToDo =
+ handleTimeout orElse (handleReply andThen {(x: Unit) => true})
+ inputChannel.receiveWithin(0)(whatToDo)
+ case Some(_) => true
+ }) || !exitReason.isEmpty
+ }
+ }
+
+}
diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala
new file mode 100644
index 0000000000..221b424c2c
--- /dev/null
+++ b/src/actors/scala/actors/ReplyableReactor.scala
@@ -0,0 +1,88 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+/**
+ * The ReplyableReactor trait provides
+ * message send operations that may result in a
+ * response from the receiver.
+ *
+ * @author Philipp Haller
+ */
+trait ReplyableReactor extends Replyable[Any, Any] {
+ thiz: ReplyReactor =>
+
+ /**
+ * Sends <code>msg</code> to this actor and awaits reply
+ * (synchronous).
+ *
+ * @param msg the message to be sent
+ * @return the reply
+ */
+ def !?(msg: Any): Any = {
+ val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
+ thiz.send(msg, replyCh)
+ replyCh.receive {
+ case x => x
+ }
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor and awaits reply
+ * (synchronous) within <code>msec</code> milliseconds.
+ *
+ * @param msec the time span before timeout
+ * @param msg the message to be sent
+ * @return <code>None</code> in case of timeout, otherwise
+ * <code>Some(x)</code> where <code>x</code> is the reply
+ */
+ def !?(msec: Long, msg: Any): Option[Any] = {
+ val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
+ thiz.send(msg, replyCh)
+ replyCh.receiveWithin(msec) {
+ case TIMEOUT => None
+ case x => Some(x)
+ }
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor and immediately
+ * returns a future representing the reply value.
+ */
+ def !!(msg: Any): Future[Any] = {
+ val ftch = new Channel[Any](Actor.self(thiz.scheduler))
+ thiz.send(msg, ftch)
+ Futures.fromInputChannel(ftch)
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor and immediately
+ * returns a future representing the reply value.
+ * The reply is post-processed using the partial function
+ * <code>f</code>. This also allows to recover a more
+ * precise type for the reply value.
+ */
+ def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
+ val ftch = new Channel[A](Actor.self(thiz.scheduler))
+ thiz.send(msg, new OutputChannel[Any] {
+ def !(msg: Any) =
+ ftch ! f(msg)
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ ftch.send(f(msg), replyTo)
+ def forward(msg: Any) =
+ ftch.forward(f(msg))
+ def receiver =
+ ftch.receiver
+ })
+ Futures.fromInputChannel(ftch)
+ }
+
+}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index df9f94cdfb..abda6d329e 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -33,6 +33,7 @@ object Scheduler extends DelegatingScheduler {
workQueue)
val s = new SimpleExecutorScheduler(threadPool, true)
//val s = new ForkJoinScheduler
+ //Debug.info(this+": starting new "+s)
s.start()
s
}