From 64609f93548ea7637718057b23d0fb19d998e812 Mon Sep 17 00:00:00 2001 From: Antonio Cunei Date: Mon, 10 May 2010 08:09:54 +0000 Subject: Merged revisions 21866 via svnmerge from https://lampsvn.epfl.ch/svn-repos/scala/scala/trunk ........ r21866 | phaller | 2010-05-09 20:09:17 +0200 (Sun, 09 May 2010) | 1 line Closes #3407. Closes #3412. Review by plocinic. ........ --- src/actors/scala/actors/ActorCanReply.scala | 34 +++++--- src/actors/scala/actors/Channel.scala | 33 +++++--- src/actors/scala/actors/Future.scala | 117 +++++++++++++------------- src/actors/scala/actors/ReactorCanReply.scala | 3 +- test/files/jvm/t3407.check | 10 +++ test/files/jvm/t3407.scala | 19 +++++ test/files/jvm/t3412-channel.check | 10 +++ test/files/jvm/t3412-channel.scala | 38 +++++++++ test/files/jvm/t3412.check | 10 +++ test/files/jvm/t3412.scala | 32 +++++++ 10 files changed, 224 insertions(+), 82 deletions(-) create mode 100644 test/files/jvm/t3407.check create mode 100644 test/files/jvm/t3407.scala create mode 100644 test/files/jvm/t3412-channel.check create mode 100644 test/files/jvm/t3412-channel.scala create mode 100644 test/files/jvm/t3412.check create mode 100644 test/files/jvm/t3412.scala diff --git a/src/actors/scala/actors/ActorCanReply.scala b/src/actors/scala/actors/ActorCanReply.scala index a6a81815c1..0f46c1b9de 100644 --- a/src/actors/scala/actors/ActorCanReply.scala +++ b/src/actors/scala/actors/ActorCanReply.scala @@ -10,6 +10,8 @@ package scala.actors +import scala.concurrent.SyncVar + /** * The `ActorCanReply` trait provides message send operations that * may result in a response from the receiver. @@ -35,18 +37,26 @@ private[actors] trait ActorCanReply extends ReactorCanReply { } override def !![A](msg: Any, handler: PartialFunction[Any, A]): Future[A] = { - val ftch = new Channel[A](Actor.self(scheduler)) - send(msg, new OutputChannel[Any] { - def !(msg: Any) = - ftch ! handler(msg) - def send(msg: Any, replyTo: OutputChannel[Any]) = - ftch.send(handler(msg), replyTo) - def forward(msg: Any) = - ftch.forward(handler(msg)) - def receiver = - ftch.receiver - }) - Futures.fromInputChannel(ftch) + val c = new Channel[A](Actor.self(scheduler)) + val fun = (res: SyncVar[A]) => { + val ftch = new Channel[A](Actor.self(scheduler)) + send(msg, new OutputChannel[Any] { + def !(msg: Any) = + ftch ! handler(msg) + def send(msg: Any, replyTo: OutputChannel[Any]) = + ftch.send(handler(msg), replyTo) + def forward(msg: Any) = + ftch.forward(handler(msg)) + def receiver = + ftch.receiver + }) + ftch.react { + case any => res.set(any) + } + } + val a = new FutureActor[A](fun, c) + a.start() + a } override def !!(msg: Any): Future[Any] = { diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index ae1e3029d1..25b41e39a4 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -10,6 +10,7 @@ package scala.actors +import scala.concurrent.SyncVar /** * This class is used to pattern match on values that were sent @@ -108,18 +109,26 @@ class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputCha } def !![A](msg: Msg, handler: PartialFunction[Any, A]): Future[A] = { - val ftch = new Channel[A](Actor.self(receiver.scheduler)) - receiver.send(scala.actors.!(this, msg), new OutputChannel[Any] { - def !(msg: Any) = - ftch ! handler(msg) - def send(msg: Any, replyTo: OutputChannel[Any]) = - ftch.send(handler(msg), replyTo) - def forward(msg: Any) = - ftch.forward(handler(msg)) - def receiver = - ftch.receiver - }) - Futures.fromInputChannel(ftch) + val c = new Channel[A](Actor.self(receiver.scheduler)) + val fun = (res: SyncVar[A]) => { + val ftch = new Channel[A](Actor.self(receiver.scheduler)) + receiver.send(scala.actors.!(this, msg), new OutputChannel[Any] { + def !(msg: Any) = + ftch ! handler(msg) + def send(msg: Any, replyTo: OutputChannel[Any]) = + ftch.send(handler(msg), replyTo) + def forward(msg: Any) = + ftch.forward(handler(msg)) + def receiver = + ftch.receiver + }) + ftch.react { + case any => res.set(any) + } + } + val a = new FutureActor[A](fun, c) + a.start() + a } def !!(msg: Msg): Future[Any] = { diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala index 9f65786879..e34569f36b 100644 --- a/src/actors/scala/actors/Future.scala +++ b/src/actors/scala/actors/Future.scala @@ -11,6 +11,7 @@ package scala.actors import scala.actors.scheduler.DaemonScheduler +import scala.concurrent.SyncVar /** A `Future[T]` is a function of arity 0 that returns * a value of type `T`. @@ -22,7 +23,8 @@ import scala.actors.scheduler.DaemonScheduler * * @author Philipp Haller */ -abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T] with Function0[T] { +abstract class Future[+T] extends Responder[T] with Function0[T] { + @volatile private[actors] var fvalue: Option[Any] = None private[actors] def fvalueTyped = fvalue.get.asInstanceOf[T] @@ -41,57 +43,79 @@ abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T * `false` otherwise. */ def isSet: Boolean -} -/** The `Futures` object contains methods that operate on futures. - * - * @author Philipp Haller - */ -object Futures { + /** Returns an input channel that can be used to receive the future's result. + * + * @return the future's input channel + */ + def inputChannel: InputChannel[T] - import scala.concurrent.SyncVar +} - private case object Eval +private case object Eval - private class FutureActor[T](fun: SyncVar[T] => Unit, channel: Channel[T]) - extends Future[T](channel) with DaemonActor { +private class FutureActor[T](fun: SyncVar[T] => Unit, channel: Channel[T]) extends Future[T] with DaemonActor { - import Actor._ + var enableChannel = false // guarded by this - def isSet = !fvalue.isEmpty + def isSet = !fvalue.isEmpty - def apply(): T = { - if (fvalue.isEmpty) - this !? Eval - fvalueTyped + def apply(): T = { + if (fvalue.isEmpty) { + this !? Eval } + fvalueTyped + } - def respond(k: T => Unit) { - if (isSet) k(fvalueTyped) - else { - val ft = this !! Eval - ft.inputChannel.react { - case _ => k(fvalueTyped) - } + def respond(k: T => Unit) { + if (isSet) k(fvalueTyped) + else { + val ft = this !! Eval + ft.inputChannel.react { + case _ => k(fvalueTyped) } } + } - def act() { - val res = new SyncVar[T] - - { - fun(res) - } andThen { - fvalue = Some(res.get) - channel ! res.get - loop { - react { - case Eval => reply() - } + def inputChannel: InputChannel[T] = { + synchronized { + if (!enableChannel) { + if (isSet) + channel ! fvalueTyped + enableChannel = true + } + } + channel + } + + def act() { + val res = new SyncVar[T] + + { + fun(res) + } andThen { + + synchronized { + val v = res.get + fvalue = Some(v) + if (enableChannel) + channel ! v + } + + loop { + react { + case Eval => reply() } } } } +} + +/** The `Futures` object contains methods that operate on futures. + * + * @author Philipp Haller + */ +object Futures { /** Arranges for the asynchronous execution of `body`, * returning a future representing the result. @@ -222,25 +246,4 @@ object Futures { results } - private[actors] def fromInputChannel[T](inputChannel: InputChannel[T]): Future[T] = - new Future[T](inputChannel) { - def apply() = - if (isSet) fvalueTyped - else inputChannel.receive { - case any => fvalue = Some(any); fvalueTyped - } - def respond(k: T => Unit): Unit = - if (isSet) k(fvalueTyped) - else inputChannel.react { - case any => fvalue = Some(any); k(fvalueTyped) - } - def isSet = fvalue match { - case None => inputChannel.receiveWithin(0) { - case TIMEOUT => false - case any => fvalue = Some(any); true - } - case Some(_) => true - } - } - } diff --git a/src/actors/scala/actors/ReactorCanReply.scala b/src/actors/scala/actors/ReactorCanReply.scala index e279845c9b..14cb423cf6 100644 --- a/src/actors/scala/actors/ReactorCanReply.scala +++ b/src/actors/scala/actors/ReactorCanReply.scala @@ -71,7 +71,7 @@ private[actors] trait ReactorCanReply extends CanReply[Any, Any] { this.send(msg, out) - new Future[A](ftch) { + new Future[A] { def apply() = { if (!isSet) fvalue = Some(res.get) @@ -85,6 +85,7 @@ private[actors] trait ReactorCanReply extends CanReply[Any, Any] { } def isSet = !fvalue.isEmpty + def inputChannel = ftch } } } diff --git a/test/files/jvm/t3407.check b/test/files/jvm/t3407.check new file mode 100644 index 0000000000..a133c88bbe --- /dev/null +++ b/test/files/jvm/t3407.check @@ -0,0 +1,10 @@ +result: 42 +result: 42 +result: 42 +result: 42 +result: 42 +result: 42 +result: 42 +result: 42 +result: 42 +result: 42 diff --git a/test/files/jvm/t3407.scala b/test/files/jvm/t3407.scala new file mode 100644 index 0000000000..6c2ce85c71 --- /dev/null +++ b/test/files/jvm/t3407.scala @@ -0,0 +1,19 @@ +import scala.actors._, scala.actors.Actor._ + +object Test { + + def main(args: Array[String]) { + for (i <- 1 to 10) { + val ft = Futures.future { 42 } + println("result: " + ft()) + } + + for (i <- 1 to 10) { + receiveWithin(0) { + case TIMEOUT => + case msg => println("unexpected: " + msg) + } + } + } + +} diff --git a/test/files/jvm/t3412-channel.check b/test/files/jvm/t3412-channel.check new file mode 100644 index 0000000000..954c6e835d --- /dev/null +++ b/test/files/jvm/t3412-channel.check @@ -0,0 +1,10 @@ +6 +6 +6 +6 +6 +6 +6 +6 +6 +6 diff --git a/test/files/jvm/t3412-channel.scala b/test/files/jvm/t3412-channel.scala new file mode 100644 index 0000000000..fcc439b488 --- /dev/null +++ b/test/files/jvm/t3412-channel.scala @@ -0,0 +1,38 @@ +import scala.actors._, scala.actors.Actor._, scala.actors.Futures._ + +object Test { + + def main(args: Array[String]) { + + actor { + val C: Channel[Int] = new Channel[Int](self) + + def respondAll(fts: List[Future[Int]], cnt: Int): Unit = + fts match { + case List() => C ! 0 + case ft :: rest => + if (cnt % 100 == 0) + println(ft()) + respondAll(rest, cnt + 1) + } + + actor { + val fts = for (_ <- 1 to 1000) + yield C !! (3, {case x: Int => x}) + + actor { + respondAll(fts.toList, 0) + } + } + + loop { + C.react { + case 0 => exit() + case i => reply(i * 2) + } + } + } + + } + +} diff --git a/test/files/jvm/t3412.check b/test/files/jvm/t3412.check new file mode 100644 index 0000000000..954c6e835d --- /dev/null +++ b/test/files/jvm/t3412.check @@ -0,0 +1,10 @@ +6 +6 +6 +6 +6 +6 +6 +6 +6 +6 diff --git a/test/files/jvm/t3412.scala b/test/files/jvm/t3412.scala new file mode 100644 index 0000000000..ced15ab5dc --- /dev/null +++ b/test/files/jvm/t3412.scala @@ -0,0 +1,32 @@ +import scala.actors._, scala.actors.Actor._, scala.actors.Futures._ + +object Test { + + def main(args: Array[String]) { + + val a = actor { + loop { react { + case i: Int => reply(i * 2) + case 'stop => exit() + } } + } + + val fts = for (_ <- 1 to 1000) + yield a !! (3, {case x: Int => x}) + + def respondAll(fts: List[Future[Int]], cnt: Int): Unit = + fts match { + case List() => a ! 'stop + case ft :: rest => + if (cnt % 100 == 0) + println(ft()) + respondAll(rest, cnt + 1) + } + + actor { + respondAll(fts.toList, 0) + } + + } + +} -- cgit v1.2.3