summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdriaan Moors <adriaan@lightbend.com>2016-05-23 16:33:31 -0700
committerAdriaan Moors <adriaan@lightbend.com>2016-05-23 16:33:31 -0700
commit095295a89a1fb1c3eb27d581662ed1a2fba5d2c5 (patch)
treeb51a9f5710fcfb1f86eccdc0713c41ff738e57ec
parentd169d48a31a38749328be195855e175eeaf5b454 (diff)
parent481a39010f63ccc5811c1d532eb510fd07e265a8 (diff)
downloadscala-095295a89a1fb1c3eb27d581662ed1a2fba5d2c5.tar.gz
scala-095295a89a1fb1c3eb27d581662ed1a2fba5d2c5.tar.bz2
scala-095295a89a1fb1c3eb27d581662ed1a2fba5d2c5.zip
Merge pull request #5164 from viktorklang/wip-future-docs-√
Improve Future documentation (+ minor code cleanups)
-rw-r--r--src/library/scala/concurrent/Future.scala159
-rw-r--r--test/files/jvm/scala-concurrent-tck.check2
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala31
3 files changed, 133 insertions, 59 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index d9d3d572e8..8abd7feeb7 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -20,11 +20,14 @@ import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
-/** The trait that represents futures.
+/** A `Future` represents a value which may or may not *currently* be available,
+ * but will be available at some point, or an exception if that value could not be made available.
*
- * Asynchronous computations that yield futures are created with the `Future.apply` call:
+ * Asynchronous computations that yield futures are created with the `Future.apply` call and are computed using a supplied `ExecutionContext`,
+ * which can be backed by a Thread pool.
*
* {{{
+ * import ExecutionContext.Implicits.global
* val s = "Hello"
* val f: Future[String] = Future {
* s + " future!"
@@ -88,6 +91,7 @@ import scala.reflect.ClassTag
* thread. That is, the implementation may run multiple callbacks
* in a batch within a single `execute()` and it may run
* `execute()` either immediately or asynchronously.
+ * Completion of the Future must *happen-before* the invocation of the callback.
*/
trait Future[+T] extends Awaitable[T] {
import Future.{ InternalCallbackExecutor => internalExecutor }
@@ -101,9 +105,13 @@ trait Future[+T] extends Awaitable[T] {
* If the future has already been completed with a value,
* this will either be applied immediately or be scheduled asynchronously.
*
+ * Note that the returned value of `pf` will be discarded.
+ *
* $swallowsExceptions
* $multipleCallbacks
* $callbackInContext
+ *
+ * @group Callbacks
*/
@deprecated("use `foreach` or `onComplete` instead (keep in mind that they take total rather than partial functions)", "2.12")
def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
@@ -122,9 +130,13 @@ trait Future[+T] extends Awaitable[T] {
*
* Will not be called in case that the future is completed with a value.
*
+ * Note that the returned value of `pf` will be discarded.
+ *
* $swallowsExceptions
* $multipleCallbacks
* $callbackInContext
+ *
+ * @group Callbacks
*/
@deprecated("use `onComplete` or `failed.foreach` instead (keep in mind that they take total rather than partial functions)", "2.12")
def onFailure[U](@deprecatedName('callback) pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
@@ -139,24 +151,28 @@ trait Future[+T] extends Awaitable[T] {
* If the future has already been completed,
* this will either be applied immediately or be scheduled asynchronously.
*
+ * Note that the returned value of `f` will be discarded.
+ *
* $swallowsExceptions
* $multipleCallbacks
* $callbackInContext
*
* @tparam U only used to accept any return type of the given callback function
* @param f the function to be executed when this `Future` completes
+ * @group Callbacks
*/
def onComplete[U](@deprecatedName('func) f: Try[T] => U)(implicit executor: ExecutionContext): Unit
/* Miscellaneous */
- /** Returns whether the future has already been completed with
+ /** Returns whether the future had already been completed with
* a value or an exception.
*
* $nonDeterministic
*
- * @return `true` if the future is already completed, `false` otherwise
+ * @return `true` if the future was completed, `false` otherwise
+ * @group Polling
*/
def isCompleted: Boolean
@@ -164,12 +180,13 @@ trait Future[+T] extends Awaitable[T] {
*
* $nonDeterministic
*
- * If the future is not completed the returned value will be `None`.
- * If the future is completed the value will be `Some(Success(t))`
- * if it contains a valid result, or `Some(Failure(error))` if it contains
+ * If the future was not completed the returned value will be `None`.
+ * If the future was completed the value will be `Some(Success(t))`
+ * if it contained a valid result, or `Some(Failure(error))` if it contained
* an exception.
*
* @return `None` if the `Future` wasn't completed, `Some` if it was.
+ * @group Polling
*/
def value: Option[Try[T]]
@@ -182,6 +199,7 @@ trait Future[+T] extends Awaitable[T] {
* If the original `Future` is successful, the returned `Future` is failed with a `NoSuchElementException`.
*
* @return a failed projection of this `Future`.
+ * @group Transformations
*/
def failed: Future[Throwable] =
transform({
@@ -201,6 +219,7 @@ trait Future[+T] extends Awaitable[T] {
* @tparam U only used to accept any return type of the given callback function
* @param f the function which will be executed if this `Future` completes with a result,
* the return value of `f` will be discarded.
+ * @group Callbacks
*/
def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { _ foreach f }
@@ -209,10 +228,11 @@ trait Future[+T] extends Awaitable[T] {
* exception thrown when 's' or 'f' is applied, that exception will be propagated
* to the resulting future.
*
- * @tparam S the type of the returned `Future`
- * @param s function that transforms a successful result of the receiver into a successful result of the returned future
- * @param f function that transforms a failure of the receiver into a failure of the returned future
- * @return a `Future` that will be completed with the transformed value
+ * @tparam S the type of the returned `Future`
+ * @param s function that transforms a successful result of the receiver into a successful result of the returned future
+ * @param f function that transforms a failure of the receiver into a failure of the returned future
+ * @return a `Future` that will be completed with the transformed value
+ * @group Transformations
*/
def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] =
transform {
@@ -224,9 +244,10 @@ trait Future[+T] extends Awaitable[T] {
* of this Future. If there is any non-fatal exception thrown when 'f'
* is applied then that exception will be propagated to the resulting future.
*
- * @tparam S the type of the returned `Future`
- * @param f function that transforms the result of this future
- * @return a `Future` that will be completed with the transformed value
+ * @tparam S the type of the returned `Future`
+ * @param f function that transforms the result of this future
+ * @return a `Future` that will be completed with the transformed value
+ * @group Transformations
*/
def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
@@ -234,9 +255,10 @@ trait Future[+T] extends Awaitable[T] {
* of this Future. If there is any non-fatal exception thrown when 'f'
* is applied then that exception will be propagated to the resulting future.
*
- * @tparam S the type of the returned `Future`
- * @param f function that transforms the result of this future
- * @return a `Future` that will be completed with the transformed value
+ * @tparam S the type of the returned `Future`
+ * @param f function that transforms the result of this future
+ * @return a `Future` that will be completed with the transformed value
+ * @group Transformations
*/
def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S]
@@ -257,11 +279,12 @@ trait Future[+T] extends Awaitable[T] {
* and `withFilter`. See [[scala.concurrent.Future#flatMap]] for an example of such a comprehension.
*
*
- * @tparam S the type of the returned `Future`
- * @param f the function which will be applied to the successful result of this `Future`
- * @return a `Future` which will be completed with the result of the application of the function
+ * @tparam S the type of the returned `Future`
+ * @param f the function which will be applied to the successful result of this `Future`
+ * @return a `Future` which will be completed with the result of the application of the function
+ * @group Transformations
*/
- def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_.map(f))
+ def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f)
/** Creates a new future by applying a function to the successful result of
* this future, and returns the result of the function as the new future.
@@ -270,9 +293,10 @@ trait Future[+T] extends Awaitable[T] {
*
* $forComprehensionExamples
*
- * @tparam S the type of the returned `Future`
- * @param f the function which will be applied to the successful result of this `Future`
- * @return a `Future` which will be completed with the result of the application of the function
+ * @tparam S the type of the returned `Future`
+ * @param f the function which will be applied to the successful result of this `Future`
+ * @return a `Future` which will be completed with the result of the application of the function
+ * @group Transformations
*/
def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = transformWith {
case Success(s) => f(s)
@@ -282,7 +306,8 @@ trait Future[+T] extends Awaitable[T] {
/** Creates a new future with one level of nesting flattened, this method is equivalent
* to `flatMap(identity)`.
*
- * @tparam S the type of the returned `Future`
+ * @tparam S the type of the returned `Future`
+ * @group Transformations
*/
def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(internalExecutor)
@@ -302,13 +327,15 @@ trait Future[+T] extends Awaitable[T] {
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
*
- * @param p the predicate to apply to the successful result of this `Future`
- * @return a `Future` which will hold the successful result of this `Future` if it matches the predicate or a `NoSuchElementException`
+ * @param p the predicate to apply to the successful result of this `Future`
+ * @return a `Future` which will hold the successful result of this `Future` if it matches the predicate or a `NoSuchElementException`
+ * @group Transformations
*/
def filter(@deprecatedName('pred) p: T => Boolean)(implicit executor: ExecutionContext): Future[T] =
map { r => if (p(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied") }
/** Used by for-comprehensions.
+ * @group Transformations
*/
final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor)
@@ -332,9 +359,10 @@ trait Future[+T] extends Awaitable[T] {
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
*
- * @tparam S the type of the returned `Future`
- *  @param pf the `PartialFunction` to apply to the successful result of this `Future`
- * @return a `Future` holding the result of application of the `PartialFunction` or a `NoSuchElementException`
+ * @tparam S the type of the returned `Future`
+ * @param pf the `PartialFunction` to apply to the successful result of this `Future`
+ * @return a `Future` holding the result of application of the `PartialFunction` or a `NoSuchElementException`
+ * @group Transformations
*/
def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] =
map {
@@ -353,9 +381,10 @@ trait Future[+T] extends Awaitable[T] {
* Future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
* }}}
*
- * @tparam U the type of the returned `Future`
- * @param pf the `PartialFunction` to apply if this `Future` fails
- * @return a `Future` with the successful value of this `Future` or the result of the `PartialFunction`
+ * @tparam U the type of the returned `Future`
+ * @param pf the `PartialFunction` to apply if this `Future` fails
+ * @return a `Future` with the successful value of this `Future` or the result of the `PartialFunction`
+ * @group Transformations
*/
def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] =
transform { _ recover pf }
@@ -373,9 +402,10 @@ trait Future[+T] extends Awaitable[T] {
* Future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
*
- * @tparam U the type of the returned `Future`
- * @param pf the `PartialFunction` to apply if this `Future` fails
- * @return a `Future` with the successful value of this `Future` or the outcome of the `Future` returned by the `PartialFunction`
+ * @tparam U the type of the returned `Future`
+ * @param pf the `PartialFunction` to apply if this `Future` fails
+ * @return a `Future` with the successful value of this `Future` or the outcome of the `Future` returned by the `PartialFunction`
+ * @group Transformations
*/
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] =
transformWith {
@@ -391,9 +421,10 @@ trait Future[+T] extends Awaitable[T] {
* Otherwise, if `that` future fails, the resulting future is failed
* with the throwable stored in `that`.
*
- * @tparam U the type of the other `Future`
- * @param that the other `Future`
- * @return a `Future` with the results of both futures or the failure of the first of them that failed
+ * @tparam U the type of the other `Future`
+ * @param that the other `Future`
+ * @return a `Future` with the results of both futures or the failure of the first of them that failed
+ * @group Transformations
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
implicit val ec = internalExecutor
@@ -410,11 +441,12 @@ trait Future[+T] extends Awaitable[T] {
* If the application of `f` throws a throwable, the resulting future
* is failed with that throwable if it is non-fatal.
*
- * @tparam U the type of the other `Future`
- * @tparam R the type of the resulting `Future`
- * @param that the other `Future`
- * @param f the function to apply to the results of `this` and `that`
- * @return a `Future` with the result of the application of `f` to the results of `this` and `that`
+ * @tparam U the type of the other `Future`
+ * @tparam R the type of the resulting `Future`
+ * @param that the other `Future`
+ * @param f the function to apply to the results of `this` and `that`
+ * @return a `Future` with the result of the application of `f` to the results of `this` and `that`
+ * @group Transformations
*/
def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] =
flatMap(r1 => that.map(r2 => f(r1, r2)))(internalExecutor)
@@ -433,9 +465,10 @@ trait Future[+T] extends Awaitable[T] {
* h foreach println // Eventually prints 5
* }}}
*
- * @tparam U the type of the other `Future` and the resulting `Future`
- * @param that the `Future` whose result we want to use if this `Future` fails.
- * @return a `Future` with the successful result of this or that `Future` or the failure of this `Future` if both fail
+ * @tparam U the type of the other `Future` and the resulting `Future`
+ * @param that the `Future` whose result we want to use if this `Future` fails.
+ * @return a `Future` with the successful result of this or that `Future` or the failure of this `Future` if both fail
+ * @group Transformations
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] =
if (this eq that) this
@@ -447,9 +480,10 @@ trait Future[+T] extends Awaitable[T] {
/** Creates a new `Future[S]` which is completed with this `Future`'s result if
* that conforms to `S`'s erased type or a `ClassCastException` otherwise.
*
- * @tparam S the type of the returned `Future`
- * @param tag the `ClassTag` which will be used to cast the result of this `Future`
- * @return a `Future` holding the casted result of this `Future` or a `ClassCastException` otherwise
+ * @tparam S the type of the returned `Future`
+ * @param tag the `ClassTag` which will be used to cast the result of this `Future`
+ * @return a `Future` holding the casted result of this `Future` or a `ClassCastException` otherwise
+ * @group Transformations
*/
def mapTo[S](implicit tag: ClassTag[S]): Future[S] = {
implicit val ec = internalExecutor
@@ -484,9 +518,12 @@ trait Future[+T] extends Awaitable[T] {
* }
* }}}
*
- * @tparam U only used to accept any return type of the given `PartialFunction`
- * @param pf a `PartialFunction` which will be conditionally applied to the outcome of this `Future`
- * @return a `Future` which will be completed with the exact same outcome as this `Future` but after the `PartialFunction` has been executed.
+ * $swallowsExceptions
+ *
+ * @tparam U only used to accept any return type of the given `PartialFunction`
+ * @param pf a `PartialFunction` which will be conditionally applied to the outcome of this `Future`
+ * @return a `Future` which will be completed with the exact same outcome as this `Future` but after the `PartialFunction` has been executed.
+ * @group Callbacks
*/
def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] =
transform {
@@ -598,6 +635,13 @@ object Future {
/** Starts an asynchronous computation and returns a `Future` instance with the result of that computation.
*
+ * The following expressions are equivalent:
+ *
+ * {{{
+ * val f1 = Future(expr)
+ * val f2 = Future.unit.map(_ => expr)
+ * }}}
+ *
* The result becomes available once the asynchronous computation is completed.
*
* @tparam T the type of the result
@@ -618,7 +662,7 @@ object Future {
*/
def sequence[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
in.foldLeft(successful(cbf(in))) {
- (fr, fa) => for (r <- fr; a <- fa) yield (r += a)
+ (fr, fa) => fr.zipWith(fa)(_ += _)
}.map(_.result())(InternalCallbackExecutor)
}
@@ -791,10 +835,9 @@ object Future {
* @return the `Future` of the `TraversableOnce` of results
*/
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
- in.foldLeft(successful(cbf(in))) { (fr, a) =>
- val fb = fn(a)
- for (r <- fr; b <- fb) yield (r += b)
- }.map(_.result())
+ in.foldLeft(successful(cbf(in))) {
+ (fr, a) => fr.zipWith(fn(a))(_ += _)
+ }.map(_.result())(InternalCallbackExecutor)
// This is used to run callbacks which are internal
diff --git a/test/files/jvm/scala-concurrent-tck.check b/test/files/jvm/scala-concurrent-tck.check
index 9aef07d1e5..8aec46e5d6 100644
--- a/test/files/jvm/scala-concurrent-tck.check
+++ b/test/files/jvm/scala-concurrent-tck.check
@@ -1 +1 @@
-warning: there were 73 deprecation warnings; re-run with -deprecation for details
+warning: there were 75 deprecation warnings; re-run with -deprecation for details
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index 8069028cf5..7197c1d853 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -90,6 +90,25 @@ trait FutureCallbacks extends TestBase {
promise.success(-1)
}
+ def stressTestNumberofCallbacks(): Unit = once {
+ done =>
+ val promise = Promise[Unit]
+ val otherPromise = Promise[Unit]
+ def attachMeaninglessCallbacksTo(f: Future[Any]): Unit = (1 to 1000).foreach(_ => f.onComplete(_ => ()))
+ attachMeaninglessCallbacksTo(promise.future)
+ val future = promise.future.flatMap { _ =>
+ attachMeaninglessCallbacksTo(otherPromise.future)
+ otherPromise.future
+ }
+ val numbers = new java.util.concurrent.ConcurrentHashMap[Int, Unit]()
+ (0 to 10000) foreach { x => numbers.put(x, ()) }
+ Future.sequence((0 to 10000) map { x => future.andThen({ case _ => numbers.remove(x) }) }) onComplete {
+ _ => done(numbers.isEmpty)
+ }
+ promise.success(())
+ otherPromise.success(())
+ }
+
testOnSuccess()
testOnSuccessWhenCompleted()
testOnSuccessWhenFailed()
@@ -100,6 +119,7 @@ trait FutureCallbacks extends TestBase {
//testOnFailureWhenSpecialThrowable(7, new InterruptedException)
testThatNestedCallbacksDoNotYieldStackOverflow()
testOnFailureWhenTimeoutException()
+ stressTestNumberofCallbacks()
}
@@ -283,6 +303,16 @@ def testTransformFailure(): Unit = once {
g onFailure { case t => done(t.getMessage() == "expected") }
}
+ def testFlatMapDelayed(): Unit = once {
+ done =>
+ val f = Future { 5 }
+ val p = Promise[Int]
+ val g = f flatMap { _ => p.future }
+ g onSuccess { case x => done(x == 10) }
+ g onFailure { case _ => done(false) }
+ p.success(10)
+ }
+
def testFilterSuccess(): Unit = once {
done =>
val f = Future { 4 }
@@ -458,6 +488,7 @@ def testTransformFailure(): Unit = once {
testMapFailure()
testFlatMapSuccess()
testFlatMapFailure()
+ testFlatMapDelayed()
testFilterSuccess()
testFilterFailure()
testCollectSuccess()