summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/Future.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/concurrent/Future.scala')
-rw-r--r--src/library/scala/concurrent/Future.scala557
1 files changed, 410 insertions, 147 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index ebc1e76ca1..6c1c9a0c80 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -10,34 +10,30 @@ package scala.concurrent
import scala.language.higherKinds
-import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
-import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
-import java.lang.{ Iterable => JIterable }
-import java.util.{ LinkedList => JLinkedList }
-import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong, AtomicBoolean }
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
import scala.util.control.NonFatal
-import scala.Option
import scala.util.{Try, Success, Failure}
-
-import scala.annotation.tailrec
-import scala.collection.mutable.Builder
+import scala.concurrent.duration._
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
-
-/** The trait that represents futures.
+/** A `Future` represents a value which may or may not *currently* be available,
+ * but will be available at some point, or an exception if that value could not be made available.
*
- * Asynchronous computations that yield futures are created with the `Future` call:
+ * Asynchronous computations that yield futures are created with the `Future.apply` call and are computed using a supplied `ExecutionContext`,
+ * which can be backed by a Thread pool.
*
* {{{
+ * import ExecutionContext.Implicits.global
* val s = "Hello"
* val f: Future[String] = Future {
* s + " future!"
* }
- * f onSuccess {
- * case msg => println(msg)
+ * f foreach {
+ * msg => println(msg)
* }
* }}}
*
@@ -62,6 +58,10 @@ import scala.reflect.ClassTag
* If a future is failed with a `scala.runtime.NonLocalReturnControl`,
* it is completed with a value from that throwable instead.
*
+ * @define swallowsExceptions
+ * Since this method executes asynchronously and does not produce a return value,
+ * any non-fatal exceptions thrown will be reported to the `ExecutionContext`.
+ *
* @define nonDeterministic
* Note: using this method yields nondeterministic dataflow programs.
*
@@ -91,16 +91,10 @@ import scala.reflect.ClassTag
* thread. That is, the implementation may run multiple callbacks
* in a batch within a single `execute()` and it may run
* `execute()` either immediately or asynchronously.
+ * Completion of the Future must *happen-before* the invocation of the callback.
*/
trait Future[+T] extends Awaitable[T] {
-
- // The executor within the lexical scope
- // of the Future trait. Note that this will
- // (modulo bugs) _never_ execute a callback
- // other than those below in this same file.
- //
- // See the documentation on `InternalCallbackExecutor` for more details.
- private def internalExecutor = Future.InternalCallbackExecutor
+ import Future.{ InternalCallbackExecutor => internalExecutor }
/* Callbacks */
@@ -111,12 +105,18 @@ trait Future[+T] extends Awaitable[T] {
* If the future has already been completed with a value,
* this will either be applied immediately or be scheduled asynchronously.
*
+ * Note that the returned value of `pf` will be discarded.
+ *
+ * $swallowsExceptions
* $multipleCallbacks
* $callbackInContext
+ *
+ * @group Callbacks
*/
+ @deprecated("use `foreach` or `onComplete` instead (keep in mind that they take total rather than partial functions)", "2.12.0")
def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Success(v) =>
- pf.applyOrElse[T, Any](v, Predef.conforms[T]) // Exploiting the cached function to avoid MatchError
+ pf.applyOrElse[T, Any](v, Predef.identity[T]) // Exploiting the cached function to avoid MatchError
case _ =>
}
@@ -130,12 +130,18 @@ trait Future[+T] extends Awaitable[T] {
*
* Will not be called in case that the future is completed with a value.
*
+ * Note that the returned value of `pf` will be discarded.
+ *
+ * $swallowsExceptions
* $multipleCallbacks
* $callbackInContext
+ *
+ * @group Callbacks
*/
+ @deprecated("use `onComplete` or `failed.foreach` instead (keep in mind that they take total rather than partial functions)", "2.12.0")
def onFailure[U](@deprecatedName('callback) pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Failure(t) =>
- pf.applyOrElse[Throwable, Any](t, Predef.conforms[Throwable]) // Exploiting the cached function to avoid MatchError
+ pf.applyOrElse[Throwable, Any](t, Predef.identity[Throwable]) // Exploiting the cached function to avoid MatchError
case _ =>
}
@@ -145,63 +151,75 @@ trait Future[+T] extends Awaitable[T] {
* If the future has already been completed,
* this will either be applied immediately or be scheduled asynchronously.
*
+ * Note that the returned value of `f` will be discarded.
+ *
+ * $swallowsExceptions
* $multipleCallbacks
* $callbackInContext
+ *
+ * @tparam U only used to accept any return type of the given callback function
+ * @param f the function to be executed when this `Future` completes
+ * @group Callbacks
*/
def onComplete[U](@deprecatedName('func) f: Try[T] => U)(implicit executor: ExecutionContext): Unit
/* Miscellaneous */
- /** Returns whether the future has already been completed with
+ /** Returns whether the future had already been completed with
* a value or an exception.
*
* $nonDeterministic
*
- * @return `true` if the future is already completed, `false` otherwise
+ * @return `true` if the future was completed, `false` otherwise
+ * @group Polling
*/
def isCompleted: Boolean
- /** The value of this `Future`.
+ /** The current value of this `Future`.
+ *
+ * $nonDeterministic
*
- * If the future is not completed the returned value will be `None`.
- * If the future is completed the value will be `Some(Success(t))`
- * if it contains a valid result, or `Some(Failure(error))` if it contains
+ * If the future was not completed the returned value will be `None`.
+ * If the future was completed the value will be `Some(Success(t))`
+ * if it contained a valid result, or `Some(Failure(error))` if it contained
* an exception.
+ *
+ * @return `None` if the `Future` wasn't completed, `Some` if it was.
+ * @group Polling
*/
def value: Option[Try[T]]
/* Projections */
- /** Returns a failed projection of this future.
- *
- * The failed projection is a future holding a value of type `Throwable`.
+ /** The returned `Future` will be successfully completed with the `Throwable` of the original `Future`
+ * if the original `Future` fails.
*
- * It is completed with a value which is the throwable of the original future
- * in case the original future is failed.
+ * If the original `Future` is successful, the returned `Future` is failed with a `NoSuchElementException`.
*
- * It is failed with a `NoSuchElementException` if the original future is completed successfully.
- *
- * Blocking on this future returns a value if the original future is completed with an exception
- * and throws a corresponding exception if the original future fails.
+ * @return a failed projection of this `Future`.
+ * @group Transformations
*/
- def failed: Future[Throwable] = {
- implicit val ec = internalExecutor
- val p = Promise[Throwable]()
- onComplete {
- case Failure(t) => p success t
- case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
- }
- p.future
- }
+ def failed: Future[Throwable] =
+ transform({
+ case Failure(t) => Success(t)
+ case Success(v) => Failure(new NoSuchElementException("Future.failed not completed with a throwable."))
+ })(internalExecutor)
/* Monadic operations */
/** Asynchronously processes the value in the future once the value becomes available.
*
- * Will not be called if the future fails.
+ * WARNING: Will not be called if this future is never completed or if it is completed with a failure.
+ *
+ * $swallowsExceptions
+ *
+ * @tparam U only used to accept any return type of the given callback function
+ * @param f the function which will be executed if this `Future` completes with a result,
+ * the return value of `f` will be discarded.
+ * @group Callbacks
*/
def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { _ foreach f }
@@ -210,33 +228,63 @@ trait Future[+T] extends Awaitable[T] {
* exception thrown when 's' or 'f' is applied, that exception will be propagated
* to the resulting future.
*
- * @param s function that transforms a successful result of the receiver into a
- * successful result of the returned future
- * @param f function that transforms a failure of the receiver into a failure of
- * the returned future
- * @return a future that will be completed with the transformed value
- */
- def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = {
- val p = Promise[S]()
- // transform on Try has the wrong shape for us here
- onComplete {
- case Success(r) => p complete Try(s(r))
- case Failure(t) => p complete Try(throw f(t)) // will throw fatal errors!
+ * @tparam S the type of the returned `Future`
+ * @param s function that transforms a successful result of the receiver into a successful result of the returned future
+ * @param f function that transforms a failure of the receiver into a failure of the returned future
+ * @return a `Future` that will be completed with the transformed value
+ * @group Transformations
+ */
+ def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] =
+ transform {
+ case Success(r) => Try(s(r))
+ case Failure(t) => Try(throw f(t)) // will throw fatal errors!
}
- p.future
- }
+
+ /** Creates a new Future by applying the specified function to the result
+ * of this Future. If there is any non-fatal exception thrown when 'f'
+ * is applied then that exception will be propagated to the resulting future.
+ *
+ * @tparam S the type of the returned `Future`
+ * @param f function that transforms the result of this future
+ * @return a `Future` that will be completed with the transformed value
+ * @group Transformations
+ */
+ def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
+
+ /** Creates a new Future by applying the specified function, which produces a Future, to the result
+ * of this Future. If there is any non-fatal exception thrown when 'f'
+ * is applied then that exception will be propagated to the resulting future.
+ *
+ * @tparam S the type of the returned `Future`
+ * @param f function that transforms the result of this future
+ * @return a `Future` that will be completed with the transformed value
+ * @group Transformations
+ */
+ def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S]
+
/** Creates a new future by applying a function to the successful result of
* this future. If this future is completed with an exception then the new
* future will also contain this exception.
*
- * $forComprehensionExamples
+ * Example:
+ *
+ * {{{
+ * val f = Future { "The future" }
+ * val g = f map { x: String => x + " is now!" }
+ * }}}
+ *
+ * Note that a for comprehension involving a `Future`
+ * may expand to include a call to `map` and or `flatMap`
+ * and `withFilter`. See [[scala.concurrent.Future#flatMap]] for an example of such a comprehension.
+ *
+ *
+ * @tparam S the type of the returned `Future`
+ * @param f the function which will be applied to the successful result of this `Future`
+ * @return a `Future` which will be completed with the result of the application of the function
+ * @group Transformations
*/
- def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
- val p = Promise[S]()
- onComplete { v => p complete (v map f) }
- p.future
- }
+ def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f)
/** Creates a new future by applying a function to the successful result of
* this future, and returns the result of the function as the new future.
@@ -244,21 +292,25 @@ trait Future[+T] extends Awaitable[T] {
* also contain this exception.
*
* $forComprehensionExamples
+ *
+ * @tparam S the type of the returned `Future`
+ * @param f the function which will be applied to the successful result of this `Future`
+ * @return a `Future` which will be completed with the result of the application of the function
+ * @group Transformations
*/
- def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
- import impl.Promise.DefaultPromise
- val p = new DefaultPromise[S]()
- onComplete {
- case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
- case Success(v) => try f(v) match {
- // If possible, link DefaultPromises to avoid space leaks
- case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
- case fut => fut.onComplete(p.complete)(internalExecutor)
- } catch { case NonFatal(t) => p failure t }
- }
- p.future
+ def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = transformWith {
+ case Success(s) => f(s)
+ case Failure(_) => this.asInstanceOf[Future[S]]
}
+ /** Creates a new future with one level of nesting flattened, this method is equivalent
+ * to `flatMap(identity)`.
+ *
+ * @tparam S the type of the returned `Future`
+ * @group Transformations
+ */
+ def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(internalExecutor)
+
/** Creates a new future by filtering the value of the current future with a predicate.
*
* If the current future contains a value which satisfies the predicate, the new future will also hold that value.
@@ -271,16 +323,19 @@ trait Future[+T] extends Awaitable[T] {
* val f = Future { 5 }
* val g = f filter { _ % 2 == 1 }
* val h = f filter { _ % 2 == 0 }
- * Await.result(g, Duration.Zero) // evaluates to 5
+ * g foreach println // Eventually prints 5
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
+ *
+ * @param p the predicate to apply to the successful result of this `Future`
+ * @return a `Future` which will hold the successful result of this `Future` if it matches the predicate or a `NoSuchElementException`
+ * @group Transformations
*/
def filter(@deprecatedName('pred) p: T => Boolean)(implicit executor: ExecutionContext): Future[T] =
- map {
- r => if (p(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied")
- }
+ map { r => if (p(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied") }
/** Used by for-comprehensions.
+ * @group Transformations
*/
final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor)
@@ -300,9 +355,14 @@ trait Future[+T] extends Awaitable[T] {
* val h = f collect {
* case x if x > 0 => x * 2
* }
- * Await.result(g, Duration.Zero) // evaluates to 5
+ * g foreach println // Eventually prints 5
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
+ *
+ * @tparam S the type of the returned `Future`
+ * @param pf the `PartialFunction` to apply to the successful result of this `Future`
+ * @return a `Future` holding the result of application of the `PartialFunction` or a `NoSuchElementException`
+ * @group Transformations
*/
def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] =
map {
@@ -320,12 +380,14 @@ trait Future[+T] extends Awaitable[T] {
* Future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception
* Future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
* }}}
+ *
+ * @tparam U the type of the returned `Future`
+ * @param pf the `PartialFunction` to apply if this `Future` fails
+ * @return a `Future` with the successful value of this `Future` or the result of the `PartialFunction`
+ * @group Transformations
*/
- def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
- val p = Promise[U]()
- onComplete { v => p complete (v recover pf) }
- p.future
- }
+ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] =
+ transform { _ recover pf }
/** Creates a new future that will handle any matching throwable that this
* future might contain by assigning it a value of another future.
@@ -339,15 +401,17 @@ trait Future[+T] extends Awaitable[T] {
* val f = Future { Int.MaxValue }
* Future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
+ *
+ * @tparam U the type of the returned `Future`
+ * @param pf the `PartialFunction` to apply if this `Future` fails
+ * @return a `Future` with the successful value of this `Future` or the outcome of the `Future` returned by the `PartialFunction`
+ * @group Transformations
*/
- def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
- val p = Promise[U]()
- onComplete {
- case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this).onComplete(p.complete)(internalExecutor) catch { case NonFatal(t) => p failure t }
- case other => p complete other
+ def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] =
+ transformWith {
+ case Failure(t) => pf.applyOrElse(t, (_: Throwable) => this)
+ case Success(_) => this
}
- p.future
- }
/** Zips the values of `this` and `that` future, and creates
* a new future holding the tuple of their results.
@@ -356,17 +420,37 @@ trait Future[+T] extends Awaitable[T] {
* with the throwable stored in `this`.
* Otherwise, if `that` future fails, the resulting future is failed
* with the throwable stored in `that`.
+ *
+ * @tparam U the type of the other `Future`
+ * @param that the other `Future`
+ * @return a `Future` with the results of both futures or the failure of the first of them that failed
+ * @group Transformations
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
implicit val ec = internalExecutor
- val p = Promise[(T, U)]()
- onComplete {
- case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]]
- case Success(s) => that onComplete { c => p.complete(c map { s2 => (s, s2) }) }
- }
- p.future
+ flatMap { r1 => that.map(r2 => (r1, r2)) }
}
+ /** Zips the values of `this` and `that` future using a function `f`,
+ * and creates a new future holding the result.
+ *
+ * If `this` future fails, the resulting future is failed
+ * with the throwable stored in `this`.
+ * Otherwise, if `that` future fails, the resulting future is failed
+ * with the throwable stored in `that`.
+ * If the application of `f` throws a throwable, the resulting future
+ * is failed with that throwable if it is non-fatal.
+ *
+ * @tparam U the type of the other `Future`
+ * @tparam R the type of the resulting `Future`
+ * @param that the other `Future`
+ * @param f the function to apply to the results of `this` and `that`
+ * @return a `Future` with the result of the application of `f` to the results of `this` and `that`
+ * @group Transformations
+ */
+ def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
+ flatMap(r1 => that.map(r2 => f(r1, r2)))(internalExecutor)
+
/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
* If both futures are failed, the resulting future holds the throwable object of the first future.
@@ -378,24 +462,28 @@ trait Future[+T] extends Awaitable[T] {
* val f = Future { sys.error("failed") }
* val g = Future { 5 }
* val h = f fallbackTo g
- * Await.result(h, Duration.Zero) // evaluates to 5
+ * h foreach println // Eventually prints 5
* }}}
+ *
+ * @tparam U the type of the other `Future` and the resulting `Future`
+ * @param that the `Future` whose result we want to use if this `Future` fails.
+ * @return a `Future` with the successful result of this or that `Future` or the failure of this `Future` if both fail
+ * @group Transformations
*/
- def fallbackTo[U >: T](that: Future[U]): Future[U] = {
- implicit val ec = internalExecutor
- val p = Promise[U]()
- onComplete {
- case s @ Success(_) => p complete s
- case f @ Failure(_) => that onComplete {
- case s2 @ Success(_) => p complete s2
- case _ => p complete f // Use the first failure as the failure
- }
+ def fallbackTo[U >: T](that: Future[U]): Future[U] =
+ if (this eq that) this
+ else {
+ implicit val ec = internalExecutor
+ recoverWith { case _ => that } recoverWith { case _ => this }
}
- p.future
- }
/** Creates a new `Future[S]` which is completed with this `Future`'s result if
* that conforms to `S`'s erased type or a `ClassCastException` otherwise.
+ *
+ * @tparam S the type of the returned `Future`
+ * @param tag the `ClassTag` which will be used to cast the result of this `Future`
+ * @return a `Future` holding the casted result of this `Future` or a `ClassCastException` otherwise
+ * @group Transformations
*/
def mapTo[S](implicit tag: ClassTag[S]): Future[S] = {
implicit val ec = internalExecutor
@@ -429,15 +517,22 @@ trait Future[+T] extends Awaitable[T] {
* case Success(v) => println(v)
* }
* }}}
+ *
+ * $swallowsExceptions
+ *
+ * @tparam U only used to accept any return type of the given `PartialFunction`
+ * @param pf a `PartialFunction` which will be conditionally applied to the outcome of this `Future`
+ * @return a `Future` which will be completed with the exact same outcome as this `Future` but after the `PartialFunction` has been executed.
+ * @group Callbacks
*/
- def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = {
- val p = Promise[T]()
- onComplete {
- case r => try pf.applyOrElse[Try[T], Any](r, Predef.conforms[Try[T]]) finally p complete r
- }
- p.future
- }
+ def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] =
+ transform {
+ result =>
+ try pf.applyOrElse[Try[T], Any](result, Predef.identity[Try[T]])
+ catch { case NonFatal(t) => executor reportFailure t }
+ result
+ }
}
@@ -461,48 +556,122 @@ object Future {
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
+ /** A Future which is never completed.
+ */
+ final object never extends Future[Nothing] {
+
+ @throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
+ override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
+ atMost match {
+ case e if e eq Duration.Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
+ case Duration.Inf => new CountDownLatch(1).await()
+ case Duration.MinusInf => // Drop out
+ case f: FiniteDuration =>
+ if (f > Duration.Zero) new CountDownLatch(1).await(f.toNanos, TimeUnit.NANOSECONDS)
+ }
+ throw new TimeoutException(s"Future timed out after [$atMost]")
+ }
+
+ @throws(classOf[Exception])
+ override def result(atMost: Duration)(implicit permit: CanAwait): Nothing = {
+ ready(atMost)
+ throw new TimeoutException(s"Future timed out after [$atMost]")
+ }
+
+ override def onSuccess[U](pf: PartialFunction[Nothing, U])(implicit executor: ExecutionContext): Unit = ()
+ override def onFailure[U](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = ()
+ override def onComplete[U](f: Try[Nothing] => U)(implicit executor: ExecutionContext): Unit = ()
+ override def isCompleted: Boolean = false
+ override def value: Option[Try[Nothing]] = None
+ override def failed: Future[Throwable] = this
+ override def foreach[U](f: Nothing => U)(implicit executor: ExecutionContext): Unit = ()
+ override def transform[S](s: Nothing => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = this
+ override def transform[S](f: Try[Nothing] => Try[S])(implicit executor: ExecutionContext): Future[S] = this
+ override def transformWith[S](f: Try[Nothing] => Future[S])(implicit executor: ExecutionContext): Future[S] = this
+ override def map[S](f: Nothing => S)(implicit executor: ExecutionContext): Future[S] = this
+ override def flatMap[S](f: Nothing => Future[S])(implicit executor: ExecutionContext): Future[S] = this
+ override def flatten[S](implicit ev: Nothing <:< Future[S]): Future[S] = this
+ override def filter(p: Nothing => Boolean)(implicit executor: ExecutionContext): Future[Nothing] = this
+ override def collect[S](pf: PartialFunction[Nothing, S])(implicit executor: ExecutionContext): Future[S] = this
+ override def recover[U >: Nothing](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = this
+ override def recoverWith[U >: Nothing](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = this
+ override def zip[U](that: Future[U]): Future[(Nothing, U)] = this
+ override def zipWith[U, R](that: Future[U])(f: (Nothing, U) => R)(implicit executor: ExecutionContext): Future[R] = this
+ override def fallbackTo[U >: Nothing](that: Future[U]): Future[U] = this
+ override def mapTo[S](implicit tag: ClassTag[S]): Future[S] = this
+ override def andThen[U](pf: PartialFunction[Try[Nothing], U])(implicit executor: ExecutionContext): Future[Nothing] = this
+
+ override def toString: String = "Future(<never>)"
+ }
+
+ /** A Future which is always completed with the Unit value.
+ */
+ val unit: Future[Unit] = successful(())
+
/** Creates an already completed Future with the specified exception.
*
- * @tparam T the type of the value in the future
- * @return the newly created `Future` object
+ * @tparam T the type of the value in the future
+ * @param exception the non-null instance of `Throwable`
+ * @return the newly created `Future` instance
*/
def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
/** Creates an already completed Future with the specified result.
*
* @tparam T the type of the value in the future
- * @return the newly created `Future` object
+ * @param result the given successful value
+ * @return the newly created `Future` instance
*/
def successful[T](result: T): Future[T] = Promise.successful(result).future
/** Creates an already completed Future with the specified result or exception.
*
- * @tparam T the type of the value in the promise
- * @return the newly created `Future` object
+ * @tparam T the type of the value in the `Future`
+ * @param result the result of the returned `Future` instance
+ * @return the newly created `Future` instance
*/
def fromTry[T](result: Try[T]): Future[T] = Promise.fromTry(result).future
- /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ /** Starts an asynchronous computation and returns a `Future` instance with the result of that computation.
+ *
+ * The following expressions are equivalent:
+ *
+ * {{{
+ * val f1 = Future(expr)
+ * val f2 = Future.unit.map(_ => expr)
+ * }}}
*
* The result becomes available once the asynchronous computation is completed.
*
- * @tparam T the type of the result
- * @param body the asynchronous computation
+ * @tparam T the type of the result
+ * @param body the asynchronous computation
* @param executor the execution context on which the future is run
- * @return the `Future` holding the result of the computation
+ * @return the `Future` holding the result of the computation
*/
- def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)
+ def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] =
+ unit.map(_ => body)
- /** Simple version of `Future.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`.
- * Useful for reducing many `Future`s into a single `Future`.
+ /** Simple version of `Future.traverse`. Asynchronously and non-blockingly transforms a `TraversableOnce[Future[A]]`
+ * into a `Future[TraversableOnce[A]]`. Useful for reducing many `Future`s into a single `Future`.
+ *
+ * @tparam A the type of the value inside the Futures
+ * @tparam M the type of the `TraversableOnce` of Futures
+ * @param in the `TraversableOnce` of Futures which will be sequenced
+ * @return the `Future` of the `TraversableOnce` of results
*/
def sequence[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
in.foldLeft(successful(cbf(in))) {
- (fr, fa) => for (r <- fr; a <- fa) yield (r += a)
+ (fr, fa) => fr.zipWith(fa)(_ += _)
}.map(_.result())(InternalCallbackExecutor)
}
- /** Returns a new `Future` to the result of the first future in the list that is completed.
+ /** Asynchronously and non-blockingly returns a new `Future` to the result of the first future
+ * in the list that is completed. This means no matter if it is completed as a success or as a failure.
+ *
+ * @tparam T the type of the value in the future
+ * @param futures the `TraversableOnce` of Futures in which to find the first completed
+ * @return the `Future` holding the result of the future that is first to be completed
*/
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
@@ -511,8 +680,15 @@ object Future {
p.future
}
- /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate.
+ /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result
+ * of the first `Future` with a result that matches the predicate.
+ *
+ * @tparam T the type of the value in the future
+ * @param futures the `TraversableOnce` of Futures to search
+ * @param p the predicate which indicates if it's a match
+ * @return the `Future` holding the optional result of the search
*/
+ @deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
val futuresBuffer = futures.toBuffer
if (futuresBuffer.isEmpty) successful[Option[T]](None)
@@ -536,46 +712,133 @@ object Future {
}
}
- /** A non-blocking fold over the specified futures, with the start value of the given zero.
+
+ /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result
+ * of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored.
+ *
+ * @tparam T the type of the value in the future
+ * @param futures the `scala.collection.immutable.Iterable` of Futures to search
+ * @param p the predicate which indicates if it's a match
+ * @return the `Future` holding the optional result of the search
+ */
+ def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
+ def searchNext(i: Iterator[Future[T]]): Future[Option[T]] =
+ if (!i.hasNext) successful[Option[T]](None)
+ else {
+ i.next().transformWith {
+ case Success(r) if p(r) => successful(Some(r))
+ case other => searchNext(i)
+ }
+ }
+ searchNext(futures.iterator)
+ }
+
+ /** A non-blocking, asynchronous left fold over the specified futures,
+ * with the start value of the given zero.
+ * The fold is performed asynchronously in left-to-right order as the futures become completed.
+ * The result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ *
+ * Example:
+ * {{{
+ * val futureSum = Future.foldLeft(futures)(0)(_ + _)
+ * }}}
+ *
+ * @tparam T the type of the value of the input Futures
+ * @tparam R the type of the value of the returned `Future`
+ * @param futures the `scala.collection.immutable.Iterable` of Futures to be folded
+ * @param zero the start value of the fold
+ * @param op the fold operation to be applied to the zero and futures
+ * @return the `Future` holding the result of the fold
+ */
+ def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
+ foldNext(futures.iterator, zero, op)
+
+ private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] =
+ if (!i.hasNext) successful(prevValue)
+ else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) }
+
+ /** A non-blocking, asynchronous fold over the specified futures, with the start value of the given zero.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*
* Example:
* {{{
- * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
+ * val futureSum = Future.fold(futures)(0)(_ + _)
* }}}
+ *
+ * @tparam T the type of the value of the input Futures
+ * @tparam R the type of the value of the returned `Future`
+ * @param futures the `TraversableOnce` of Futures to be folded
+ * @param zero the start value of the fold
+ * @param op the fold operation to be applied to the zero and futures
+ * @return the `Future` holding the result of the fold
*/
+ @deprecated("use Future.foldLeft instead", "2.12.0")
def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(@deprecatedName('foldFun) op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) successful(zero)
else sequence(futures).map(_.foldLeft(zero)(op))
}
- /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first.
+ /** Initiates a non-blocking, asynchronous, fold over the supplied futures
+ * where the fold-zero is the result value of the `Future` that's completed first.
*
* Example:
* {{{
- * val result = Await.result(Future.reduce(futures)(_ + _), 5 seconds)
+ * val futureSum = Future.reduce(futures)(_ + _)
* }}}
+ * @tparam T the type of the value of the input Futures
+ * @tparam R the type of the value of the returned `Future`
+ * @param futures the `TraversableOnce` of Futures to be reduced
+ * @param op the reduce operation which is applied to the results of the futures
+ * @return the `Future` holding the result of the reduce
*/
+ @deprecated("use Future.reduceLeft instead", "2.12.0")
def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection"))
else sequence(futures).map(_ reduceLeft op)
}
- /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`.
+ /** Initiates a non-blocking, asynchronous, left reduction over the supplied futures
+ * where the zero is the result value of the first `Future`.
+ *
+ * Example:
+ * {{{
+ * val futureSum = Future.reduceLeft(futures)(_ + _)
+ * }}}
+ * @tparam T the type of the value of the input Futures
+ * @tparam R the type of the value of the returned `Future`
+ * @param futures the `scala.collection.immutable.Iterable` of Futures to be reduced
+ * @param op the reduce operation which is applied to the results of the futures
+ * @return the `Future` holding the result of the reduce
+ */
+ def reduceLeft[T, R >: T](futures: scala.collection.immutable.Iterable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
+ val i = futures.iterator
+ if (!i.hasNext) failed(new NoSuchElementException("reduceLeft attempted on empty collection"))
+ else i.next() flatMap { v => foldNext(i, v, op) }
+ }
+
+ /** Asynchronously and non-blockingly transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]`
+ * using the provided function `A => Future[B]`.
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel:
*
* {{{
* val myFutureList = Future.traverse(myList)(x => Future(myFunc(x)))
* }}}
+ * @tparam A the type of the value inside the Futures in the `TraversableOnce`
+ * @tparam B the type of the value of the returned `Future`
+ * @tparam M the type of the `TraversableOnce` of Futures
+ * @param in the `TraversableOnce` of Futures which will be sequenced
+ * @param fn the function to apply to the `TraversableOnce` of Futures to produce the results
+ * @return the `Future` of the `TraversableOnce` of results
*/
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
- in.foldLeft(successful(cbf(in))) { (fr, a) =>
- val fb = fn(a)
- for (r <- fr; b <- fb) yield (r += b)
- }.map(_.result())
+ in.foldLeft(successful(cbf(in))) {
+ (fr, a) => fr.zipWith(fn(a))(_ += _)
+ }.map(_.result())(InternalCallbackExecutor)
+
// This is used to run callbacks which are internal
// to scala.concurrent; our own callbacks are only