diff options
author | Heather Miller <heather.miller@epfl.ch> | 2012-01-19 09:26:40 +0100 |
---|---|---|
committer | Heather Miller <heather.miller@epfl.ch> | 2012-01-19 09:26:40 +0100 |
commit | 62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76 (patch) | |
tree | 349feb3c6483db483330336336896779292e9a1b /src/library | |
parent | 51a930f8595049babf5cf625e5f010c60bedc53b (diff) | |
download | scala-62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76.tar.gz scala-62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76.tar.bz2 scala-62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76.zip |
Added implementations for any and find on collections of futures.
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 42 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 15 | ||||
-rw-r--r-- | src/library/scala/concurrent/Promise.scala | 5 |
3 files changed, 55 insertions, 7 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 303489297f..5539b6858f 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -11,6 +11,7 @@ package scala.concurrent import java.util.concurrent.{ Executors, Future => JFuture, Callable } +import java.util.concurrent.atomic.{ AtomicInteger } import scala.util.{ Duration, Timeout } import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } import scala.collection.generic.CanBuildFrom @@ -45,6 +46,9 @@ sealed trait CanAwait trait FutureUtilities { +/** TODO some docs + * + */ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { val builder = cbf(futures) val p: Promise[Coll[T]] = promise[Coll[T]] @@ -68,9 +72,43 @@ trait FutureUtilities { p.future } - -} +/** TODO some docs + * + */ + def any[T](futures: Traversable[Future[T]]): Future[T] = { + val futureResult = promise[T]() + + val completeFirst: Either[Throwable, T] => Unit = futureElem => futureResult tryComplete futureElem + + futures.foreach(_ onComplete completeFirst) + + futureResult.future + } + +/** TODO some docs + * + */ + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { + if (futures.isEmpty) Promise.successful[Option[T]](None).future + else { + val result = promise[Option[T]]() + val ref = new AtomicInteger(futures.size) + val search: Either[Throwable, T] ⇒ Unit = { + v ⇒ v match { + case Right(r) ⇒ if (predicate(r)) result trySuccess Some(r) + case _ ⇒ + } + if (ref.decrementAndGet == 0) result trySuccess None + } + + futures.foreach(_ onComplete search) + + result.future + } + } + +} object FutureUtilitiesImpl extends FutureUtilities { } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 6b358e1e09..468683dcde 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -355,7 +355,8 @@ self => p.future } - + +/* /** Creates a new future which holds the result of either this future or `that` future, depending on * which future was completed first. * @@ -377,14 +378,16 @@ self => case Right(v) => p trySuccess v } this onComplete completePromise - this onComplete completePromise + that onComplete completePromise p.future } - + +*/ } + /** TODO some docs * * @define nonDeterministic @@ -393,13 +396,17 @@ self => object Future { // TODO make more modular by encoding all other helper methods within the execution context - /** + /** TODO some docs */ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] = ec.futureUtilities.all[T, Coll](futures) // move this to future companion object @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) + + def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.futureUtilities.any(futures) + + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.futureUtilities.find(futures)(predicate) } diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index c3fa92053b..41a41dd611 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -113,6 +113,9 @@ trait Promise[T] { object Promise { - + def successful[T](result: T): Promise[T] = { + val p = promise[T]() + p.success(result) + } } |