summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorHeather Miller <heather.miller@epfl.ch>2012-01-19 09:26:40 +0100
committerHeather Miller <heather.miller@epfl.ch>2012-01-19 09:26:40 +0100
commit62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76 (patch)
tree349feb3c6483db483330336336896779292e9a1b /src/library
parent51a930f8595049babf5cf625e5f010c60bedc53b (diff)
downloadscala-62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76.tar.gz
scala-62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76.tar.bz2
scala-62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76.zip
Added implementations for any and find on collections of futures.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala42
-rw-r--r--src/library/scala/concurrent/Future.scala15
-rw-r--r--src/library/scala/concurrent/Promise.scala5
3 files changed, 55 insertions, 7 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 303489297f..5539b6858f 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -11,6 +11,7 @@ package scala.concurrent
import java.util.concurrent.{ Executors, Future => JFuture, Callable }
+import java.util.concurrent.atomic.{ AtomicInteger }
import scala.util.{ Duration, Timeout }
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
import scala.collection.generic.CanBuildFrom
@@ -45,6 +46,9 @@ sealed trait CanAwait
trait FutureUtilities {
+/** TODO some docs
+ *
+ */
def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
val builder = cbf(futures)
val p: Promise[Coll[T]] = promise[Coll[T]]
@@ -68,9 +72,43 @@ trait FutureUtilities {
p.future
}
-
-}
+/** TODO some docs
+ *
+ */
+ def any[T](futures: Traversable[Future[T]]): Future[T] = {
+ val futureResult = promise[T]()
+
+ val completeFirst: Either[Throwable, T] => Unit = futureElem => futureResult tryComplete futureElem
+
+ futures.foreach(_ onComplete completeFirst)
+
+ futureResult.future
+ }
+
+/** TODO some docs
+ *
+ */
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
+ if (futures.isEmpty) Promise.successful[Option[T]](None).future
+ else {
+ val result = promise[Option[T]]()
+ val ref = new AtomicInteger(futures.size)
+ val search: Either[Throwable, T] ⇒ Unit = {
+ v ⇒ v match {
+ case Right(r) ⇒ if (predicate(r)) result trySuccess Some(r)
+ case _ ⇒
+ }
+ if (ref.decrementAndGet == 0) result trySuccess None
+ }
+
+ futures.foreach(_ onComplete search)
+
+ result.future
+ }
+ }
+
+}
object FutureUtilitiesImpl extends FutureUtilities {
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 6b358e1e09..468683dcde 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -355,7 +355,8 @@ self =>
p.future
}
-
+
+/*
/** Creates a new future which holds the result of either this future or `that` future, depending on
* which future was completed first.
*
@@ -377,14 +378,16 @@ self =>
case Right(v) => p trySuccess v
}
this onComplete completePromise
- this onComplete completePromise
+ that onComplete completePromise
p.future
}
-
+
+*/
}
+
/** TODO some docs
*
* @define nonDeterministic
@@ -393,13 +396,17 @@ self =>
object Future {
// TODO make more modular by encoding all other helper methods within the execution context
- /**
+ /** TODO some docs
*/
def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] =
ec.futureUtilities.all[T, Coll](futures)
// move this to future companion object
@inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body)
+
+ def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.futureUtilities.any(futures)
+
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.futureUtilities.find(futures)(predicate)
}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index c3fa92053b..41a41dd611 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -113,6 +113,9 @@ trait Promise[T] {
object Promise {
-
+ def successful[T](result: T): Promise[T] = {
+ val p = promise[T]()
+ p.success(result)
+ }
}