diff options
Diffstat (limited to 'src/library/scala/concurrent/Future.scala')
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 488 |
1 files changed, 354 insertions, 134 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index ebc1e76ca1..d9d3d572e8 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -10,34 +10,27 @@ package scala.concurrent import scala.language.higherKinds -import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } -import java.lang.{ Iterable => JIterable } -import java.util.{ LinkedList => JLinkedList } -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong, AtomicBoolean } +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger import scala.util.control.NonFatal -import scala.Option import scala.util.{Try, Success, Failure} - -import scala.annotation.tailrec -import scala.collection.mutable.Builder +import scala.concurrent.duration._ import scala.collection.generic.CanBuildFrom import scala.reflect.ClassTag - /** The trait that represents futures. * - * Asynchronous computations that yield futures are created with the `Future` call: + * Asynchronous computations that yield futures are created with the `Future.apply` call: * * {{{ * val s = "Hello" * val f: Future[String] = Future { * s + " future!" * } - * f onSuccess { - * case msg => println(msg) + * f foreach { + * msg => println(msg) * } * }}} * @@ -62,6 +55,10 @@ import scala.reflect.ClassTag * If a future is failed with a `scala.runtime.NonLocalReturnControl`, * it is completed with a value from that throwable instead. * + * @define swallowsExceptions + * Since this method executes asynchronously and does not produce a return value, + * any non-fatal exceptions thrown will be reported to the `ExecutionContext`. + * * @define nonDeterministic * Note: using this method yields nondeterministic dataflow programs. * @@ -93,14 +90,7 @@ import scala.reflect.ClassTag * `execute()` either immediately or asynchronously. */ trait Future[+T] extends Awaitable[T] { - - // The executor within the lexical scope - // of the Future trait. Note that this will - // (modulo bugs) _never_ execute a callback - // other than those below in this same file. - // - // See the documentation on `InternalCallbackExecutor` for more details. - private def internalExecutor = Future.InternalCallbackExecutor + import Future.{ InternalCallbackExecutor => internalExecutor } /* Callbacks */ @@ -111,9 +101,11 @@ trait Future[+T] extends Awaitable[T] { * If the future has already been completed with a value, * this will either be applied immediately or be scheduled asynchronously. * + * $swallowsExceptions * $multipleCallbacks * $callbackInContext */ + @deprecated("use `foreach` or `onComplete` instead (keep in mind that they take total rather than partial functions)", "2.12") def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete { case Success(v) => pf.applyOrElse[T, Any](v, Predef.conforms[T]) // Exploiting the cached function to avoid MatchError @@ -130,9 +122,11 @@ trait Future[+T] extends Awaitable[T] { * * Will not be called in case that the future is completed with a value. * + * $swallowsExceptions * $multipleCallbacks * $callbackInContext */ + @deprecated("use `onComplete` or `failed.foreach` instead (keep in mind that they take total rather than partial functions)", "2.12") def onFailure[U](@deprecatedName('callback) pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { case Failure(t) => pf.applyOrElse[Throwable, Any](t, Predef.conforms[Throwable]) // Exploiting the cached function to avoid MatchError @@ -145,8 +139,12 @@ trait Future[+T] extends Awaitable[T] { * If the future has already been completed, * this will either be applied immediately or be scheduled asynchronously. * + * $swallowsExceptions * $multipleCallbacks * $callbackInContext + * + * @tparam U only used to accept any return type of the given callback function + * @param f the function to be executed when this `Future` completes */ def onComplete[U](@deprecatedName('func) f: Try[T] => U)(implicit executor: ExecutionContext): Unit @@ -162,46 +160,47 @@ trait Future[+T] extends Awaitable[T] { */ def isCompleted: Boolean - /** The value of this `Future`. + /** The current value of this `Future`. + * + * $nonDeterministic * * If the future is not completed the returned value will be `None`. * If the future is completed the value will be `Some(Success(t))` * if it contains a valid result, or `Some(Failure(error))` if it contains * an exception. + * + * @return `None` if the `Future` wasn't completed, `Some` if it was. */ def value: Option[Try[T]] /* Projections */ - /** Returns a failed projection of this future. + /** The returned `Future` will be successfully completed with the `Throwable` of the original `Future` + * if the original `Future` fails. * - * The failed projection is a future holding a value of type `Throwable`. + * If the original `Future` is successful, the returned `Future` is failed with a `NoSuchElementException`. * - * It is completed with a value which is the throwable of the original future - * in case the original future is failed. - * - * It is failed with a `NoSuchElementException` if the original future is completed successfully. - * - * Blocking on this future returns a value if the original future is completed with an exception - * and throws a corresponding exception if the original future fails. + * @return a failed projection of this `Future`. */ - def failed: Future[Throwable] = { - implicit val ec = internalExecutor - val p = Promise[Throwable]() - onComplete { - case Failure(t) => p success t - case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) - } - p.future - } + def failed: Future[Throwable] = + transform({ + case Failure(t) => Success(t) + case Success(v) => Failure(new NoSuchElementException("Future.failed not completed with a throwable.")) + })(internalExecutor) /* Monadic operations */ /** Asynchronously processes the value in the future once the value becomes available. * - * Will not be called if the future fails. + * WARNING: Will not be called if this future is never completed or if it is completed with a failure. + * + * $swallowsExceptions + * + * @tparam U only used to accept any return type of the given callback function + * @param f the function which will be executed if this `Future` completes with a result, + * the return value of `f` will be discarded. */ def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { _ foreach f } @@ -210,33 +209,59 @@ trait Future[+T] extends Awaitable[T] { * exception thrown when 's' or 'f' is applied, that exception will be propagated * to the resulting future. * - * @param s function that transforms a successful result of the receiver into a - * successful result of the returned future - * @param f function that transforms a failure of the receiver into a failure of - * the returned future - * @return a future that will be completed with the transformed value - */ - def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = { - val p = Promise[S]() - // transform on Try has the wrong shape for us here - onComplete { - case Success(r) => p complete Try(s(r)) - case Failure(t) => p complete Try(throw f(t)) // will throw fatal errors! + * @tparam S the type of the returned `Future` + * @param s function that transforms a successful result of the receiver into a successful result of the returned future + * @param f function that transforms a failure of the receiver into a failure of the returned future + * @return a `Future` that will be completed with the transformed value + */ + def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = + transform { + case Success(r) => Try(s(r)) + case Failure(t) => Try(throw f(t)) // will throw fatal errors! } - p.future - } + + /** Creates a new Future by applying the specified function to the result + * of this Future. If there is any non-fatal exception thrown when 'f' + * is applied then that exception will be propagated to the resulting future. + * + * @tparam S the type of the returned `Future` + * @param f function that transforms the result of this future + * @return a `Future` that will be completed with the transformed value + */ + def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] + + /** Creates a new Future by applying the specified function, which produces a Future, to the result + * of this Future. If there is any non-fatal exception thrown when 'f' + * is applied then that exception will be propagated to the resulting future. + * + * @tparam S the type of the returned `Future` + * @param f function that transforms the result of this future + * @return a `Future` that will be completed with the transformed value + */ + def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] + /** Creates a new future by applying a function to the successful result of * this future. If this future is completed with an exception then the new * future will also contain this exception. * - * $forComprehensionExamples + * Example: + * + * {{{ + * val f = Future { "The future" } + * val g = f map { x: String => x + " is now!" } + * }}} + * + * Note that a for comprehension involving a `Future` + * may expand to include a call to `map` and or `flatMap` + * and `withFilter`. See [[scala.concurrent.Future#flatMap]] for an example of such a comprehension. + * + * + * @tparam S the type of the returned `Future` + * @param f the function which will be applied to the successful result of this `Future` + * @return a `Future` which will be completed with the result of the application of the function */ - def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity) - val p = Promise[S]() - onComplete { v => p complete (v map f) } - p.future - } + def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_.map(f)) /** Creates a new future by applying a function to the successful result of * this future, and returns the result of the function as the new future. @@ -244,21 +269,23 @@ trait Future[+T] extends Awaitable[T] { * also contain this exception. * * $forComprehensionExamples + * + * @tparam S the type of the returned `Future` + * @param f the function which will be applied to the successful result of this `Future` + * @return a `Future` which will be completed with the result of the application of the function */ - def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { - import impl.Promise.DefaultPromise - val p = new DefaultPromise[S]() - onComplete { - case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - case Success(v) => try f(v) match { - // If possible, link DefaultPromises to avoid space leaks - case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p) - case fut => fut.onComplete(p.complete)(internalExecutor) - } catch { case NonFatal(t) => p failure t } - } - p.future + def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = transformWith { + case Success(s) => f(s) + case Failure(_) => this.asInstanceOf[Future[S]] } + /** Creates a new future with one level of nesting flattened, this method is equivalent + * to `flatMap(identity)`. + * + * @tparam S the type of the returned `Future` + */ + def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(internalExecutor) + /** Creates a new future by filtering the value of the current future with a predicate. * * If the current future contains a value which satisfies the predicate, the new future will also hold that value. @@ -271,14 +298,15 @@ trait Future[+T] extends Awaitable[T] { * val f = Future { 5 } * val g = f filter { _ % 2 == 1 } * val h = f filter { _ % 2 == 0 } - * Await.result(g, Duration.Zero) // evaluates to 5 + * g foreach println // Eventually prints 5 * Await.result(h, Duration.Zero) // throw a NoSuchElementException * }}} + * + * @param p the predicate to apply to the successful result of this `Future` + * @return a `Future` which will hold the successful result of this `Future` if it matches the predicate or a `NoSuchElementException` */ def filter(@deprecatedName('pred) p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = - map { - r => if (p(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied") - } + map { r => if (p(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied") } /** Used by for-comprehensions. */ @@ -300,9 +328,13 @@ trait Future[+T] extends Awaitable[T] { * val h = f collect { * case x if x > 0 => x * 2 * } - * Await.result(g, Duration.Zero) // evaluates to 5 + * g foreach println // Eventually prints 5 * Await.result(h, Duration.Zero) // throw a NoSuchElementException * }}} + * + * @tparam S the type of the returned `Future` + * @param pf the `PartialFunction` to apply to the successful result of this `Future` + * @return a `Future` holding the result of application of the `PartialFunction` or a `NoSuchElementException` */ def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = map { @@ -320,12 +352,13 @@ trait Future[+T] extends Awaitable[T] { * Future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception * Future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3 * }}} + * + * @tparam U the type of the returned `Future` + * @param pf the `PartialFunction` to apply if this `Future` fails + * @return a `Future` with the successful value of this `Future` or the result of the `PartialFunction` */ - def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = { - val p = Promise[U]() - onComplete { v => p complete (v recover pf) } - p.future - } + def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = + transform { _ recover pf } /** Creates a new future that will handle any matching throwable that this * future might contain by assigning it a value of another future. @@ -339,15 +372,16 @@ trait Future[+T] extends Awaitable[T] { * val f = Future { Int.MaxValue } * Future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue * }}} + * + * @tparam U the type of the returned `Future` + * @param pf the `PartialFunction` to apply if this `Future` fails + * @return a `Future` with the successful value of this `Future` or the outcome of the `Future` returned by the `PartialFunction` */ - def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = { - val p = Promise[U]() - onComplete { - case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this).onComplete(p.complete)(internalExecutor) catch { case NonFatal(t) => p failure t } - case other => p complete other + def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = + transformWith { + case Failure(t) => pf.applyOrElse(t, (_: Throwable) => this) + case Success(_) => this } - p.future - } /** Zips the values of `this` and `that` future, and creates * a new future holding the tuple of their results. @@ -356,17 +390,35 @@ trait Future[+T] extends Awaitable[T] { * with the throwable stored in `this`. * Otherwise, if `that` future fails, the resulting future is failed * with the throwable stored in `that`. + * + * @tparam U the type of the other `Future` + * @param that the other `Future` + * @return a `Future` with the results of both futures or the failure of the first of them that failed */ def zip[U](that: Future[U]): Future[(T, U)] = { implicit val ec = internalExecutor - val p = Promise[(T, U)]() - onComplete { - case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]] - case Success(s) => that onComplete { c => p.complete(c map { s2 => (s, s2) }) } - } - p.future + flatMap { r1 => that.map(r2 => (r1, r2)) } } + /** Zips the values of `this` and `that` future using a function `f`, + * and creates a new future holding the result. + * + * If `this` future fails, the resulting future is failed + * with the throwable stored in `this`. + * Otherwise, if `that` future fails, the resulting future is failed + * with the throwable stored in `that`. + * If the application of `f` throws a throwable, the resulting future + * is failed with that throwable if it is non-fatal. + * + * @tparam U the type of the other `Future` + * @tparam R the type of the resulting `Future` + * @param that the other `Future` + * @param f the function to apply to the results of `this` and `that` + * @return a `Future` with the result of the application of `f` to the results of `this` and `that` + */ + def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = + flatMap(r1 => that.map(r2 => f(r1, r2)))(internalExecutor) + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, * the result of the `that` future if `that` is completed successfully. * If both futures are failed, the resulting future holds the throwable object of the first future. @@ -378,24 +430,26 @@ trait Future[+T] extends Awaitable[T] { * val f = Future { sys.error("failed") } * val g = Future { 5 } * val h = f fallbackTo g - * Await.result(h, Duration.Zero) // evaluates to 5 + * h foreach println // Eventually prints 5 * }}} + * + * @tparam U the type of the other `Future` and the resulting `Future` + * @param that the `Future` whose result we want to use if this `Future` fails. + * @return a `Future` with the successful result of this or that `Future` or the failure of this `Future` if both fail */ - def fallbackTo[U >: T](that: Future[U]): Future[U] = { - implicit val ec = internalExecutor - val p = Promise[U]() - onComplete { - case s @ Success(_) => p complete s - case f @ Failure(_) => that onComplete { - case s2 @ Success(_) => p complete s2 - case _ => p complete f // Use the first failure as the failure - } + def fallbackTo[U >: T](that: Future[U]): Future[U] = + if (this eq that) this + else { + implicit val ec = internalExecutor + recoverWith { case _ => that } recoverWith { case _ => this } } - p.future - } /** Creates a new `Future[S]` which is completed with this `Future`'s result if * that conforms to `S`'s erased type or a `ClassCastException` otherwise. + * + * @tparam S the type of the returned `Future` + * @param tag the `ClassTag` which will be used to cast the result of this `Future` + * @return a `Future` holding the casted result of this `Future` or a `ClassCastException` otherwise */ def mapTo[S](implicit tag: ClassTag[S]): Future[S] = { implicit val ec = internalExecutor @@ -429,15 +483,19 @@ trait Future[+T] extends Awaitable[T] { * case Success(v) => println(v) * } * }}} + * + * @tparam U only used to accept any return type of the given `PartialFunction` + * @param pf a `PartialFunction` which will be conditionally applied to the outcome of this `Future` + * @return a `Future` which will be completed with the exact same outcome as this `Future` but after the `PartialFunction` has been executed. */ - def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = { - val p = Promise[T]() - onComplete { - case r => try pf.applyOrElse[Try[T], Any](r, Predef.conforms[Try[T]]) finally p complete r - } - p.future - } + def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = + transform { + result => + try pf.applyOrElse[Try[T], Any](result, Predef.conforms[Try[T]]) + catch { case NonFatal(t) => executor reportFailure t } + result + } } @@ -461,40 +519,102 @@ object Future { classOf[Unit] -> classOf[scala.runtime.BoxedUnit] ) + /** A Future which is never completed. + */ + final object never extends Future[Nothing] { + + @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) + override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { + atMost match { + case e if e eq Duration.Undefined => throw new IllegalArgumentException("cannot wait for Undefined period") + case Duration.Inf => new CountDownLatch(1).await() + case Duration.MinusInf => // Drop out + case f: FiniteDuration => + if (f > Duration.Zero) new CountDownLatch(1).await(f.toNanos, TimeUnit.NANOSECONDS) + } + throw new TimeoutException(s"Future timed out after [$atMost]") + } + + @throws(classOf[Exception]) + override def result(atMost: Duration)(implicit permit: CanAwait): Nothing = { + ready(atMost) + throw new TimeoutException(s"Future timed out after [$atMost]") + } + + override def onSuccess[U](pf: PartialFunction[Nothing, U])(implicit executor: ExecutionContext): Unit = () + override def onFailure[U](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = () + override def onComplete[U](f: Try[Nothing] => U)(implicit executor: ExecutionContext): Unit = () + override def isCompleted: Boolean = false + override def value: Option[Try[Nothing]] = None + override def failed: Future[Throwable] = this + override def foreach[U](f: Nothing => U)(implicit executor: ExecutionContext): Unit = () + override def transform[S](s: Nothing => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = this + override def transform[S](f: Try[Nothing] => Try[S])(implicit executor: ExecutionContext): Future[S] = this + override def transformWith[S](f: Try[Nothing] => Future[S])(implicit executor: ExecutionContext): Future[S] = this + override def map[S](f: Nothing => S)(implicit executor: ExecutionContext): Future[S] = this + override def flatMap[S](f: Nothing => Future[S])(implicit executor: ExecutionContext): Future[S] = this + override def flatten[S](implicit ev: Nothing <:< Future[S]): Future[S] = this + override def filter(p: Nothing => Boolean)(implicit executor: ExecutionContext): Future[Nothing] = this + override def collect[S](pf: PartialFunction[Nothing, S])(implicit executor: ExecutionContext): Future[S] = this + override def recover[U >: Nothing](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = this + override def recoverWith[U >: Nothing](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = this + override def zip[U](that: Future[U]): Future[(Nothing, U)] = this + override def zipWith[U, R](that: Future[U])(f: (Nothing, U) => R)(implicit executor: ExecutionContext): Future[R] = this + override def fallbackTo[U >: Nothing](that: Future[U]): Future[U] = this + override def mapTo[S](implicit tag: ClassTag[S]): Future[S] = this + override def andThen[U](pf: PartialFunction[Try[Nothing], U])(implicit executor: ExecutionContext): Future[Nothing] = this + + override def toString: String = "Future(<never>)" + } + + /** A Future which is always completed with the Unit value. + */ + val unit: Future[Unit] = successful(()) + /** Creates an already completed Future with the specified exception. * - * @tparam T the type of the value in the future - * @return the newly created `Future` object + * @tparam T the type of the value in the future + * @param exception the non-null instance of `Throwable` + * @return the newly created `Future` instance */ def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future /** Creates an already completed Future with the specified result. * * @tparam T the type of the value in the future - * @return the newly created `Future` object + * @param result the given successful value + * @return the newly created `Future` instance */ def successful[T](result: T): Future[T] = Promise.successful(result).future /** Creates an already completed Future with the specified result or exception. * - * @tparam T the type of the value in the promise - * @return the newly created `Future` object + * @tparam T the type of the value in the `Future` + * @param result the result of the returned `Future` instance + * @return the newly created `Future` instance */ def fromTry[T](result: Try[T]): Future[T] = Promise.fromTry(result).future - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + /** Starts an asynchronous computation and returns a `Future` instance with the result of that computation. * * The result becomes available once the asynchronous computation is completed. * - * @tparam T the type of the result - * @param body the asynchronous computation + * @tparam T the type of the result + * @param body the asynchronous computation * @param executor the execution context on which the future is run - * @return the `Future` holding the result of the computation + * @return the `Future` holding the result of the computation */ - def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body) + def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = + unit.map(_ => body) - /** Simple version of `Future.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`. - * Useful for reducing many `Future`s into a single `Future`. + /** Simple version of `Future.traverse`. Asynchronously and non-blockingly transforms a `TraversableOnce[Future[A]]` + * into a `Future[TraversableOnce[A]]`. Useful for reducing many `Future`s into a single `Future`. + * + * @tparam A the type of the value inside the Futures + * @tparam M the type of the `TraversableOnce` of Futures + * @param in the `TraversableOnce` of Futures which will be sequenced + * @return the `Future` of the `TraversableOnce` of results */ def sequence[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { in.foldLeft(successful(cbf(in))) { @@ -502,7 +622,12 @@ object Future { }.map(_.result())(InternalCallbackExecutor) } - /** Returns a new `Future` to the result of the first future in the list that is completed. + /** Asynchronously and non-blockingly returns a new `Future` to the result of the first future + * in the list that is completed. This means no matter if it is completed as a success or as a failure. + * + * @tparam T the type of the value in the future + * @param futures the `TraversableOnce` of Futures in which to find the first completed + * @return the `Future` holding the result of the future that is first to be completed */ def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() @@ -511,8 +636,15 @@ object Future { p.future } - /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate. + /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result + * of the first `Future` with a result that matches the predicate. + * + * @tparam T the type of the value in the future + * @param futures the `TraversableOnce` of Futures to search + * @param p the predicate which indicates if it's a match + * @return the `Future` holding the optional result of the search */ + @deprecated("Use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12") def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { val futuresBuffer = futures.toBuffer if (futuresBuffer.isEmpty) successful[Option[T]](None) @@ -536,40 +668,127 @@ object Future { } } - /** A non-blocking fold over the specified futures, with the start value of the given zero. + + /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result + * of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored. + * + * @tparam T the type of the value in the future + * @param futures the `scala.collection.immutable.Iterable` of Futures to search + * @param p the predicate which indicates if it's a match + * @return the `Future` holding the optional result of the search + */ + def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { + def searchNext(i: Iterator[Future[T]]): Future[Option[T]] = + if (!i.hasNext) successful[Option[T]](None) + else { + i.next().transformWith { + case Success(r) if p(r) => successful(Some(r)) + case other => searchNext(i) + } + } + searchNext(futures.iterator) + } + + /** A non-blocking, asynchronous left fold over the specified futures, + * with the start value of the given zero. + * The fold is performed asynchronously in left-to-right order as the futures become completed. + * The result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * + * Example: + * {{{ + * val futureSum = Future.foldLeft(futures)(0)(_ + _) + * }}} + * + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `scala.collection.immutable.Iterable` of Futures to be folded + * @param zero the start value of the fold + * @param op the fold operation to be applied to the zero and futures + * @return the `Future` holding the result of the fold + */ + def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = + foldNext(futures.iterator, zero, op) + + private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = + if (!i.hasNext) successful(prevValue) + else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) } + + /** A non-blocking, asynchronous fold over the specified futures, with the start value of the given zero. * The fold is performed on the thread where the last future is completed, * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. * * Example: * {{{ - * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) + * val futureSum = Future.fold(futures)(0)(_ + _) * }}} + * + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `TraversableOnce` of Futures to be folded + * @param zero the start value of the fold + * @param op the fold operation to be applied to the zero and futures + * @return the `Future` holding the result of the fold */ + @deprecated("Use Future.foldLeft instead", "2.12") def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(@deprecatedName('foldFun) op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) successful(zero) else sequence(futures).map(_.foldLeft(zero)(op)) } - /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first. + /** Initiates a non-blocking, asynchronous, fold over the supplied futures + * where the fold-zero is the result value of the `Future` that's completed first. * * Example: * {{{ - * val result = Await.result(Future.reduce(futures)(_ + _), 5 seconds) + * val futureSum = Future.reduce(futures)(_ + _) * }}} + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `TraversableOnce` of Futures to be reduced + * @param op the reduce operation which is applied to the results of the futures + * @return the `Future` holding the result of the reduce */ + @deprecated("Use Future.reduceLeft instead", "2.12") def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection")) else sequence(futures).map(_ reduceLeft op) } - /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`. + /** Initiates a non-blocking, asynchronous, left reduction over the supplied futures + * where the zero is the result value of the first `Future`. + * + * Example: + * {{{ + * val futureSum = Future.reduceLeft(futures)(_ + _) + * }}} + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `scala.collection.immutable.Iterable` of Futures to be reduced + * @param op the reduce operation which is applied to the results of the futures + * @return the `Future` holding the result of the reduce + */ + def reduceLeft[T, R >: T](futures: scala.collection.immutable.Iterable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + val i = futures.iterator + if (!i.hasNext) failed(new NoSuchElementException("reduceLeft attempted on empty collection")) + else i.next() flatMap { v => foldNext(i, v, op) } + } + + /** Asynchronously and non-blockingly transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` + * using the provided function `A => Future[B]`. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel: * * {{{ * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x))) * }}} + * @tparam A the type of the value inside the Futures in the `TraversableOnce` + * @tparam B the type of the value of the returned `Future` + * @tparam M the type of the `TraversableOnce` of Futures + * @param in the `TraversableOnce` of Futures which will be sequenced + * @param fn the function to apply to the `TraversableOnce` of Futures to produce the results + * @return the `Future` of the `TraversableOnce` of results */ def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = in.foldLeft(successful(cbf(in))) { (fr, a) => @@ -577,6 +796,7 @@ object Future { for (r <- fr; b <- fb) yield (r += b) }.map(_.result()) + // This is used to run callbacks which are internal // to scala.concurrent; our own callbacks are only // ever used to eventually run another callback, |