summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 19:55:41 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 19:55:41 +0100
commit778e7d1a1b87431449cbe7335ca3a66fbe7c8366 (patch)
treef78a8d28830f7ffc775f18c443efc213200dc2ea /src
parentebc3636a1b6dd64425630afa83eb398a8d7c43a4 (diff)
downloadscala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.tar.gz
scala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.tar.bz2
scala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.zip
Add implicit conversion for futures that enables calling nondeterministic methods.
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala77
-rw-r--r--src/library/scala/concurrent/Future.scala32
-rw-r--r--src/library/scala/concurrent/Promise.scala4
-rw-r--r--src/library/scala/concurrent/package.scala100
4 files changed, 113 insertions, 100 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index ebd5bf6bd3..0657121de2 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -10,13 +10,16 @@ package scala.concurrent
+import java.util.concurrent.atomic.{ AtomicInteger }
import java.util.concurrent.{ Executors, Future => JFuture, Callable }
import scala.util.Duration
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+import scala.collection.generic.CanBuildFrom
+import collection._
-trait ExecutionContext extends ExecutionContextBase {
+trait ExecutionContext {
protected implicit object CanAwaitEvidence extends CanAwait
@@ -34,6 +37,78 @@ trait ExecutionContext extends ExecutionContextBase {
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
+ /* implementations follow */
+
+ private implicit val executionContext = this
+
+ /** TODO some docs
+ *
+ */
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
+ val buffer = new mutable.ArrayBuffer[T]
+ val counter = new AtomicInteger(1) // how else could we do this?
+ val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
+ var idx = 0
+
+ def tryFinish() = if (counter.decrementAndGet() == 0) {
+ val builder = cbf(futures)
+ builder ++= buffer
+ p success builder.result
+ }
+
+ for (f <- futures) {
+ val currentIndex = idx
+ buffer += null.asInstanceOf[T]
+ counter.incrementAndGet()
+ f onComplete {
+ case Left(t) =>
+ p tryFailure t
+ case Right(v) =>
+ buffer(currentIndex) = v
+ tryFinish()
+ }
+ idx += 1
+ }
+
+ tryFinish()
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def any[T](futures: Traversable[Future[T]]): Future[T] = {
+ val p = promise[T]
+ val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem
+
+ futures foreach (_ onComplete completeFirst)
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
+ if (futures.isEmpty) Promise.successful[Option[T]](None).future
+ else {
+ val result = promise[Option[T]]
+ val count = new AtomicInteger(futures.size)
+ val search: Either[Throwable, T] => Unit = {
+ v => v match {
+ case Right(r) => if (predicate(r)) result trySuccess Some(r)
+ case _ =>
+ }
+ if (count.decrementAndGet() == 0) result trySuccess None
+ }
+
+ futures.foreach(_ onComplete search)
+
+ result.future
+ }
+ }
+
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index e68e6077bb..92e50b1f89 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -77,9 +77,6 @@ import scala.collection.generic.CanBuildFrom
* {{{
* f flatMap { (x: Int) => g map { (y: Int) => x + y } }
* }}}
- *
- * @define nonDeterministic
- * Note: using this method yields nondeterministic dataflow programs.
*/
trait Future[+T] extends Awaitable[T] {
self =>
@@ -336,6 +333,8 @@ self =>
*
* Using this method will not cause concurrent programs to become nondeterministic.
*
+ *
+ *
* Example:
* {{{
* val f = future { sys.error("failed") }
@@ -358,33 +357,6 @@ self =>
p.future
}
- /** Creates a new future which holds the result of either this future or `that` future, depending on
- * which future was completed first.
- *
- * $nonDeterministic
- *
- * Example:
- * {{{
- * val f = future { sys.error("failed") }
- * val g = future { 5 }
- * val h = f any g
- * await(0) h // evaluates to either 5 or throws a runtime exception
- * }}}
- */
- def either[U >: T](that: Future[U]): Future[U] = {
- val p = newPromise[U]
-
- val completePromise: PartialFunction[Either[Throwable, U], _] = {
- case Left(t) => p tryFailure t
- case Right(v) => p trySuccess v
- }
-
- this onComplete completePromise
- that onComplete completePromise
-
- p.future
- }
-
}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 43abe566de..e9b83cba73 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -34,8 +34,6 @@ trait Promise[T] {
*/
def future: Future[T]
- private def throwCompleted = throw new IllegalStateException("Promise already completed.")
-
/** Completes the promise with either an exception or a value.
*
* @param result Either the value or the exception to complete the promise with.
@@ -106,6 +104,8 @@ trait Promise[T] {
case _ => new ExecutionException(t)
}
+ private def throwCompleted = throw new IllegalStateException("Promise already completed.")
+
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index ee8f484379..73da8469e6 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -10,10 +10,7 @@ package scala
-import java.util.concurrent.atomic.{ AtomicInteger }
-import scala.util.{ Timeout, Duration }
-import collection._
-import scala.collection.generic.CanBuildFrom
+import scala.util.Duration
@@ -114,6 +111,19 @@ package object concurrent {
}
}
+ /** Importing this object allows using some concurrency primitives
+ * on futures and promises that can yield nondeterministic programs.
+ *
+ * While program determinism is broken when using these primitives,
+ * some programs cannot be written without them (e.g. multiple client threads
+ * cannot send requests to a server thread through regular promises and futures).
+ */
+ object nondeterministic {
+
+ implicit def future2nondeterministic[T](f: Future[T]) = new NondeterministicFuture[T](f)
+
+ }
+
}
@@ -130,79 +140,35 @@ package concurrent {
def this(origin: Future[_]) = this(origin, "Future timed out.")
}
- trait ExecutionContextBase {
- self: ExecutionContext =>
+ private[concurrent] class NondeterministicFuture[+T](self: Future[T]) {
- private implicit val executionContext = self
-
- /** TODO some docs
+ /** Creates a new future which holds the result of either this future or `that` future, depending on
+ * which future was completed first.
+ *
+ * $nonDeterministic
*
+ * Example:
+ * {{{
+ * val f = future { sys.error("failed") }
+ * val g = future { 5 }
+ * val h = f either g
+ * await(0) h // evaluates to either 5 or throws a runtime exception
+ * }}}
*/
- def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
- val buffer = new mutable.ArrayBuffer[T]
- val counter = new AtomicInteger(1) // how else could we do this?
- val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
- var idx = 0
-
- def tryFinish() = if (counter.decrementAndGet() == 0) {
- val builder = cbf(futures)
- builder ++= buffer
- p success builder.result
- }
+ def either[U >: T](that: Future[U]): Future[U] = {
+ val p = self.newPromise[U]
- for (f <- futures) {
- val currentIndex = idx
- buffer += null.asInstanceOf[T]
- counter.incrementAndGet()
- f onComplete {
- case Left(t) =>
- p tryFailure t
- case Right(v) =>
- buffer(currentIndex) = v
- tryFinish()
- }
- idx += 1
+ val completePromise: PartialFunction[Either[Throwable, U], _] = {
+ case Left(t) => p tryFailure t
+ case Right(v) => p trySuccess v
}
- tryFinish()
+ self onComplete completePromise
+ that onComplete completePromise
p.future
}
- /** TODO some docs
- *
- */
- def any[T](futures: Traversable[Future[T]]): Future[T] = {
- val p = promise[T]
- val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem
-
- futures foreach (_ onComplete completeFirst)
-
- p.future
- }
-
- /** TODO some docs
- *
- */
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
- if (futures.isEmpty) Promise.successful[Option[T]](None).future
- else {
- val result = promise[Option[T]]
- val count = new AtomicInteger(futures.size)
- val search: Either[Throwable, T] => Unit = {
- v => v match {
- case Right(r) => if (predicate(r)) result trySuccess Some(r)
- case _ =>
- }
- if (count.decrementAndGet() == 0) result trySuccess None
- }
-
- futures.foreach(_ onComplete search)
-
- result.future
- }
- }
-
}
}