summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2010-05-09 18:09:17 +0000
committerPhilipp Haller <hallerp@gmail.com>2010-05-09 18:09:17 +0000
commit8fe7b531649dd5f82b6ac46720c751750e893500 (patch)
tree6386d1c0b8a2d07c3f1a6467ede4667cff2b261b
parentd1185713fa066528eac5f228aae943dd5ad3a142 (diff)
downloadscala-8fe7b531649dd5f82b6ac46720c751750e893500.tar.gz
scala-8fe7b531649dd5f82b6ac46720c751750e893500.tar.bz2
scala-8fe7b531649dd5f82b6ac46720c751750e893500.zip
Closes #3407. Closes #3412. Review by plocinic.
-rw-r--r--src/actors/scala/actors/ActorCanReply.scala34
-rw-r--r--src/actors/scala/actors/Channel.scala33
-rw-r--r--src/actors/scala/actors/Future.scala117
-rw-r--r--src/actors/scala/actors/ReactorCanReply.scala3
-rw-r--r--test/files/jvm/t3407.check10
-rw-r--r--test/files/jvm/t3407.scala19
-rw-r--r--test/files/jvm/t3412-channel.check10
-rw-r--r--test/files/jvm/t3412-channel.scala38
-rw-r--r--test/files/jvm/t3412.check10
-rw-r--r--test/files/jvm/t3412.scala32
10 files changed, 224 insertions, 82 deletions
diff --git a/src/actors/scala/actors/ActorCanReply.scala b/src/actors/scala/actors/ActorCanReply.scala
index a6a81815c1..0f46c1b9de 100644
--- a/src/actors/scala/actors/ActorCanReply.scala
+++ b/src/actors/scala/actors/ActorCanReply.scala
@@ -10,6 +10,8 @@
package scala.actors
+import scala.concurrent.SyncVar
+
/**
* The `ActorCanReply` trait provides message send operations that
* may result in a response from the receiver.
@@ -35,18 +37,26 @@ private[actors] trait ActorCanReply extends ReactorCanReply {
}
override def !![A](msg: Any, handler: PartialFunction[Any, A]): Future[A] = {
- val ftch = new Channel[A](Actor.self(scheduler))
- send(msg, new OutputChannel[Any] {
- def !(msg: Any) =
- ftch ! handler(msg)
- def send(msg: Any, replyTo: OutputChannel[Any]) =
- ftch.send(handler(msg), replyTo)
- def forward(msg: Any) =
- ftch.forward(handler(msg))
- def receiver =
- ftch.receiver
- })
- Futures.fromInputChannel(ftch)
+ val c = new Channel[A](Actor.self(scheduler))
+ val fun = (res: SyncVar[A]) => {
+ val ftch = new Channel[A](Actor.self(scheduler))
+ send(msg, new OutputChannel[Any] {
+ def !(msg: Any) =
+ ftch ! handler(msg)
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ ftch.send(handler(msg), replyTo)
+ def forward(msg: Any) =
+ ftch.forward(handler(msg))
+ def receiver =
+ ftch.receiver
+ })
+ ftch.react {
+ case any => res.set(any)
+ }
+ }
+ val a = new FutureActor[A](fun, c)
+ a.start()
+ a
}
override def !!(msg: Any): Future[Any] = {
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index ae1e3029d1..25b41e39a4 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -10,6 +10,7 @@
package scala.actors
+import scala.concurrent.SyncVar
/**
* This class is used to pattern match on values that were sent
@@ -108,18 +109,26 @@ class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputCha
}
def !![A](msg: Msg, handler: PartialFunction[Any, A]): Future[A] = {
- val ftch = new Channel[A](Actor.self(receiver.scheduler))
- receiver.send(scala.actors.!(this, msg), new OutputChannel[Any] {
- def !(msg: Any) =
- ftch ! handler(msg)
- def send(msg: Any, replyTo: OutputChannel[Any]) =
- ftch.send(handler(msg), replyTo)
- def forward(msg: Any) =
- ftch.forward(handler(msg))
- def receiver =
- ftch.receiver
- })
- Futures.fromInputChannel(ftch)
+ val c = new Channel[A](Actor.self(receiver.scheduler))
+ val fun = (res: SyncVar[A]) => {
+ val ftch = new Channel[A](Actor.self(receiver.scheduler))
+ receiver.send(scala.actors.!(this, msg), new OutputChannel[Any] {
+ def !(msg: Any) =
+ ftch ! handler(msg)
+ def send(msg: Any, replyTo: OutputChannel[Any]) =
+ ftch.send(handler(msg), replyTo)
+ def forward(msg: Any) =
+ ftch.forward(handler(msg))
+ def receiver =
+ ftch.receiver
+ })
+ ftch.react {
+ case any => res.set(any)
+ }
+ }
+ val a = new FutureActor[A](fun, c)
+ a.start()
+ a
}
def !!(msg: Msg): Future[Any] = {
diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala
index 9f65786879..e34569f36b 100644
--- a/src/actors/scala/actors/Future.scala
+++ b/src/actors/scala/actors/Future.scala
@@ -11,6 +11,7 @@
package scala.actors
import scala.actors.scheduler.DaemonScheduler
+import scala.concurrent.SyncVar
/** A `Future[T]` is a function of arity 0 that returns
* a value of type `T`.
@@ -22,7 +23,8 @@ import scala.actors.scheduler.DaemonScheduler
*
* @author Philipp Haller
*/
-abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T] with Function0[T] {
+abstract class Future[+T] extends Responder[T] with Function0[T] {
+
@volatile
private[actors] var fvalue: Option[Any] = None
private[actors] def fvalueTyped = fvalue.get.asInstanceOf[T]
@@ -41,57 +43,79 @@ abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T
* `false` otherwise.
*/
def isSet: Boolean
-}
-/** The `Futures` object contains methods that operate on futures.
- *
- * @author Philipp Haller
- */
-object Futures {
+ /** Returns an input channel that can be used to receive the future's result.
+ *
+ * @return the future's input channel
+ */
+ def inputChannel: InputChannel[T]
- import scala.concurrent.SyncVar
+}
- private case object Eval
+private case object Eval
- private class FutureActor[T](fun: SyncVar[T] => Unit, channel: Channel[T])
- extends Future[T](channel) with DaemonActor {
+private class FutureActor[T](fun: SyncVar[T] => Unit, channel: Channel[T]) extends Future[T] with DaemonActor {
- import Actor._
+ var enableChannel = false // guarded by this
- def isSet = !fvalue.isEmpty
+ def isSet = !fvalue.isEmpty
- def apply(): T = {
- if (fvalue.isEmpty)
- this !? Eval
- fvalueTyped
+ def apply(): T = {
+ if (fvalue.isEmpty) {
+ this !? Eval
}
+ fvalueTyped
+ }
- def respond(k: T => Unit) {
- if (isSet) k(fvalueTyped)
- else {
- val ft = this !! Eval
- ft.inputChannel.react {
- case _ => k(fvalueTyped)
- }
+ def respond(k: T => Unit) {
+ if (isSet) k(fvalueTyped)
+ else {
+ val ft = this !! Eval
+ ft.inputChannel.react {
+ case _ => k(fvalueTyped)
}
}
+ }
- def act() {
- val res = new SyncVar[T]
-
- {
- fun(res)
- } andThen {
- fvalue = Some(res.get)
- channel ! res.get
- loop {
- react {
- case Eval => reply()
- }
+ def inputChannel: InputChannel[T] = {
+ synchronized {
+ if (!enableChannel) {
+ if (isSet)
+ channel ! fvalueTyped
+ enableChannel = true
+ }
+ }
+ channel
+ }
+
+ def act() {
+ val res = new SyncVar[T]
+
+ {
+ fun(res)
+ } andThen {
+
+ synchronized {
+ val v = res.get
+ fvalue = Some(v)
+ if (enableChannel)
+ channel ! v
+ }
+
+ loop {
+ react {
+ case Eval => reply()
}
}
}
}
+}
+
+/** The `Futures` object contains methods that operate on futures.
+ *
+ * @author Philipp Haller
+ */
+object Futures {
/** Arranges for the asynchronous execution of `body`,
* returning a future representing the result.
@@ -222,25 +246,4 @@ object Futures {
results
}
- private[actors] def fromInputChannel[T](inputChannel: InputChannel[T]): Future[T] =
- new Future[T](inputChannel) {
- def apply() =
- if (isSet) fvalueTyped
- else inputChannel.receive {
- case any => fvalue = Some(any); fvalueTyped
- }
- def respond(k: T => Unit): Unit =
- if (isSet) k(fvalueTyped)
- else inputChannel.react {
- case any => fvalue = Some(any); k(fvalueTyped)
- }
- def isSet = fvalue match {
- case None => inputChannel.receiveWithin(0) {
- case TIMEOUT => false
- case any => fvalue = Some(any); true
- }
- case Some(_) => true
- }
- }
-
}
diff --git a/src/actors/scala/actors/ReactorCanReply.scala b/src/actors/scala/actors/ReactorCanReply.scala
index e279845c9b..14cb423cf6 100644
--- a/src/actors/scala/actors/ReactorCanReply.scala
+++ b/src/actors/scala/actors/ReactorCanReply.scala
@@ -71,7 +71,7 @@ private[actors] trait ReactorCanReply extends CanReply[Any, Any] {
this.send(msg, out)
- new Future[A](ftch) {
+ new Future[A] {
def apply() = {
if (!isSet)
fvalue = Some(res.get)
@@ -85,6 +85,7 @@ private[actors] trait ReactorCanReply extends CanReply[Any, Any] {
}
def isSet =
!fvalue.isEmpty
+ def inputChannel = ftch
}
}
}
diff --git a/test/files/jvm/t3407.check b/test/files/jvm/t3407.check
new file mode 100644
index 0000000000..a133c88bbe
--- /dev/null
+++ b/test/files/jvm/t3407.check
@@ -0,0 +1,10 @@
+result: 42
+result: 42
+result: 42
+result: 42
+result: 42
+result: 42
+result: 42
+result: 42
+result: 42
+result: 42
diff --git a/test/files/jvm/t3407.scala b/test/files/jvm/t3407.scala
new file mode 100644
index 0000000000..6c2ce85c71
--- /dev/null
+++ b/test/files/jvm/t3407.scala
@@ -0,0 +1,19 @@
+import scala.actors._, scala.actors.Actor._
+
+object Test {
+
+ def main(args: Array[String]) {
+ for (i <- 1 to 10) {
+ val ft = Futures.future { 42 }
+ println("result: " + ft())
+ }
+
+ for (i <- 1 to 10) {
+ receiveWithin(0) {
+ case TIMEOUT =>
+ case msg => println("unexpected: " + msg)
+ }
+ }
+ }
+
+}
diff --git a/test/files/jvm/t3412-channel.check b/test/files/jvm/t3412-channel.check
new file mode 100644
index 0000000000..954c6e835d
--- /dev/null
+++ b/test/files/jvm/t3412-channel.check
@@ -0,0 +1,10 @@
+6
+6
+6
+6
+6
+6
+6
+6
+6
+6
diff --git a/test/files/jvm/t3412-channel.scala b/test/files/jvm/t3412-channel.scala
new file mode 100644
index 0000000000..fcc439b488
--- /dev/null
+++ b/test/files/jvm/t3412-channel.scala
@@ -0,0 +1,38 @@
+import scala.actors._, scala.actors.Actor._, scala.actors.Futures._
+
+object Test {
+
+ def main(args: Array[String]) {
+
+ actor {
+ val C: Channel[Int] = new Channel[Int](self)
+
+ def respondAll(fts: List[Future[Int]], cnt: Int): Unit =
+ fts match {
+ case List() => C ! 0
+ case ft :: rest =>
+ if (cnt % 100 == 0)
+ println(ft())
+ respondAll(rest, cnt + 1)
+ }
+
+ actor {
+ val fts = for (_ <- 1 to 1000)
+ yield C !! (3, {case x: Int => x})
+
+ actor {
+ respondAll(fts.toList, 0)
+ }
+ }
+
+ loop {
+ C.react {
+ case 0 => exit()
+ case i => reply(i * 2)
+ }
+ }
+ }
+
+ }
+
+}
diff --git a/test/files/jvm/t3412.check b/test/files/jvm/t3412.check
new file mode 100644
index 0000000000..954c6e835d
--- /dev/null
+++ b/test/files/jvm/t3412.check
@@ -0,0 +1,10 @@
+6
+6
+6
+6
+6
+6
+6
+6
+6
+6
diff --git a/test/files/jvm/t3412.scala b/test/files/jvm/t3412.scala
new file mode 100644
index 0000000000..ced15ab5dc
--- /dev/null
+++ b/test/files/jvm/t3412.scala
@@ -0,0 +1,32 @@
+import scala.actors._, scala.actors.Actor._, scala.actors.Futures._
+
+object Test {
+
+ def main(args: Array[String]) {
+
+ val a = actor {
+ loop { react {
+ case i: Int => reply(i * 2)
+ case 'stop => exit()
+ } }
+ }
+
+ val fts = for (_ <- 1 to 1000)
+ yield a !! (3, {case x: Int => x})
+
+ def respondAll(fts: List[Future[Int]], cnt: Int): Unit =
+ fts match {
+ case List() => a ! 'stop
+ case ft :: rest =>
+ if (cnt % 100 == 0)
+ println(ft())
+ respondAll(rest, cnt + 1)
+ }
+
+ actor {
+ respondAll(fts.toList, 0)
+ }
+
+ }
+
+}