summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-07-21 17:35:01 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-07-21 17:35:01 +0000
commitd063a9fa5162bf6f59b34d653b46534830310a50 (patch)
treefd9dfd389c834d3091fe37b86e720ede940d6850 /src/actors
parent9e896451708d850d5bc3b3d27d169f09aece912b (diff)
downloadscala-d063a9fa5162bf6f59b34d653b46534830310a50.tar.gz
scala-d063a9fa5162bf6f59b34d653b46534830310a50.tar.bz2
scala-d063a9fa5162bf6f59b34d653b46534830310a50.zip
Enabled synchronous message sends for Replyable...
Enabled synchronous message sends for ReplyableReactor. Added get(timeout: Long) method to SyncVar.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/ReplyableActor.scala55
-rw-r--r--src/actors/scala/actors/ReplyableReactor.scala78
2 files changed, 107 insertions, 26 deletions
diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala
index 709bb0ef98..1e1487bf39 100644
--- a/src/actors/scala/actors/ReplyableActor.scala
+++ b/src/actors/scala/actors/ReplyableActor.scala
@@ -23,6 +23,61 @@ trait ReplyableActor extends ReplyableReactor {
thiz: AbstractActor with ReplyReactor =>
/**
+ * Sends <code>msg</code> to this actor and awaits reply
+ * (synchronous).
+ *
+ * @param msg the message to be sent
+ * @return the reply
+ */
+ override def !?(msg: Any): Any = {
+ val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
+ thiz.send(msg, replyCh)
+ replyCh.receive {
+ case x => x
+ }
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor and awaits reply
+ * (synchronous) within <code>msec</code> milliseconds.
+ *
+ * @param msec the time span before timeout
+ * @param msg the message to be sent
+ * @return <code>None</code> in case of timeout, otherwise
+ * <code>Some(x)</code> where <code>x</code> is the reply
+ */
+ override def !?(msec: Long, msg: Any): Option[Any] = {
+ val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
+ thiz.send(msg, replyCh)
+ replyCh.receiveWithin(msec) {
+ case TIMEOUT => None
+ case x => Some(x)
+ }
+ }
+
+ /**
+ * Sends <code>msg</code> to this actor and immediately
+ * returns a future representing the reply value.
+ * The reply is post-processed using the partial function
+ * <code>f</code>. This also allows to recover a more
+ * precise type for the reply value.
+ */
+ override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
+ val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler))
+ thiz.send(msg, new OutputChannel[Any] {
+ def !(msg: Any) =
+ ftch ! f(msg)
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ ftch.send(f(msg), replyTo)
+ def forward(msg: Any) =
+ ftch.forward(f(msg))
+ def receiver =
+ ftch.receiver
+ })
+ Futures.fromInputChannel(ftch)
+ }
+
+ /**
* Sends <code>msg</code> to this actor and immediately
* returns a future representing the reply value.
*/
diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala
index cc2519e3ee..84168abe0a 100644
--- a/src/actors/scala/actors/ReplyableReactor.scala
+++ b/src/actors/scala/actors/ReplyableReactor.scala
@@ -27,13 +27,8 @@ trait ReplyableReactor extends Replyable[Any, Any] {
* @param msg the message to be sent
* @return the reply
*/
- def !?(msg: Any): Any = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
- replyCh.receive {
- case x => x
- }
- }
+ def !?(msg: Any): Any =
+ (this !! msg)()
/**
* Sends <code>msg</code> to this actor and awaits reply
@@ -45,23 +40,28 @@ trait ReplyableReactor extends Replyable[Any, Any] {
* <code>Some(x)</code> where <code>x</code> is the reply
*/
def !?(msec: Long, msg: Any): Option[Any] = {
- val replyCh = new Channel[Any](Actor.self(thiz.scheduler))
- thiz.send(msg, replyCh)
- replyCh.receiveWithin(msec) {
- case TIMEOUT => None
- case x => Some(x)
+ val myself = Actor.rawSelf(thiz.scheduler)
+ val res = new scala.concurrent.SyncVar[Any]
+ val out = new OutputChannel[Any] {
+ def !(msg: Any) =
+ res set msg
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ res set msg
+ def forward(msg: Any) =
+ res set msg
+ def receiver =
+ myself
}
+ thiz.send(msg, out)
+ res.get(msec)
}
/**
* Sends <code>msg</code> to this actor and immediately
* returns a future representing the reply value.
*/
- override def !!(msg: Any): Future[Any] = {
- val ftch = new Channel[Any](Actor.rawSelf(thiz.scheduler))
- thiz.send(msg, ftch)
- Futures.fromInputChannel(ftch)
- }
+ override def !!(msg: Any): Future[Any] =
+ this !! (msg, { case x => x })
/**
* Sends <code>msg</code> to this actor and immediately
@@ -71,18 +71,44 @@ trait ReplyableReactor extends Replyable[Any, Any] {
* precise type for the reply value.
*/
override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
- val ftch = new Channel[A](Actor.rawSelf(thiz.scheduler))
- thiz.send(msg, new OutputChannel[Any] {
- def !(msg: Any) =
+ val myself = Actor.rawSelf(thiz.scheduler)
+ val ftch = new Channel[A](myself)
+ val res = new scala.concurrent.SyncVar[A]
+
+ val out = new OutputChannel[Any] {
+ def !(msg: Any) = {
ftch ! f(msg)
- def send(msg: Any, replyTo: OutputChannel[Any]) =
+ res set f(msg)
+ }
+ def send(msg: Any, replyTo: OutputChannel[Any]) = {
ftch.send(f(msg), replyTo)
- def forward(msg: Any) =
- ftch.forward(f(msg))
+ res set f(msg)
+ }
+ def forward(msg: Any) = {
+ ftch forward f(msg)
+ res set f(msg)
+ }
def receiver =
- ftch.receiver
- })
- Futures.fromInputChannel(ftch)
+ myself
+ }
+
+ thiz.send(msg, out)
+
+ new Future[A](ftch) {
+ def apply() =
+ if (isSet) value.get.asInstanceOf[A]
+ else {
+ value = Some(res.get)
+ value.get.asInstanceOf[A]
+ }
+ def respond(k: A => Unit): Unit =
+ if (isSet) k(value.get.asInstanceOf[A])
+ else inputChannel.react {
+ case any => value = Some(any); k(value.get.asInstanceOf[A])
+ }
+ def isSet =
+ !value.isEmpty
+ }
}
}