From 802162b5b1260bde50aafdc5ae9534c54472b109 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Thu, 29 Mar 2012 10:25:28 +0200 Subject: Add methods in the Future companion object. --- src/library/scala/concurrent/Future.scala | 109 +++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 2 deletions(-) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index fa4c61c227..d73801aa90 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -134,8 +134,26 @@ self => /** Creates a new promise. */ def newPromise[S]: Promise[S] - - + + /** Returns whether the future has already been completed with + * a value or an exception. + * + * $nonDeterministic + * + * @return `true` if the future is already completed, `false` otherwise + */ + def isCompleted: Boolean + + /** The value of this `Future`. + * + * If the future is not completed the returned value will be `None`. + * If the future is completed the value will be `Some(Success(t))` + * if it contains a valid result, or `Some(Failure(error))` if it contains + * an exception. + */ + def value: Option[Try[T]] + + /* Projections */ /** Returns a failed projection of this future. @@ -474,6 +492,93 @@ self => object Future { def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) + + import scala.collection.mutable.Builder + import scala.collection.generic.CanBuildFrom + + /** Simple version of `Futures.traverse`. Transforms a `Traversable[Future[A]]` into a `Future[Traversable[A]]`. + * Useful for reducing many `Future`s into a single `Future`. + */ + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { + in.foldLeft(Promise.successful(cbf(in)).future) { + (fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a) + } map (_.result) + } + + /** Returns a `Future` to the result of the first future in the list that is completed. + */ + def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { + val p = Promise[T]() + + val completeFirst: Try[T] => Unit = p tryComplete _ + futures.foreach(_ onComplete completeFirst) + + p.future + } + + /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate. + */ + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { + if (futures.isEmpty) Promise.successful[Option[T]](None).future + else { + val result = Promise[Option[T]]() + val ref = new AtomicInteger(futures.size) + val search: Try[T] => Unit = v => try { + v match { + case Success(r) => if (predicate(r)) result tryComplete Success(Some(r)) + case _ => + } + } finally { + if (ref.decrementAndGet == 0) + result tryComplete Success(None) + } + + futures.foreach(_ onComplete search) + + result.future + } + } + + /** A non-blocking fold over the specified futures, with the start value of the given zero. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * + * Example: + * {{{ + * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) + * }}} + */ + def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + if (futures.isEmpty) Promise.successful(zero).future + else sequence(futures).map(_.foldLeft(zero)(foldFun)) + } + + /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first. + * + * Example: + * {{{ + * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds) + * }}} + */ + def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future + else sequence(futures).map(_ reduceLeft op) + } + + /** Transforms a `Traversable[A]` into a `Future[Traversable[B]]` using the provided function `A => Future[B]`. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel: + * + * {{{ + * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x))) + * }}} + */ + def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = + in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) => + val fb = fn(a.asInstanceOf[A]) + for (r <- fr; b <- fb) yield (r += b) + }.map(_.result) } -- cgit v1.2.3