diff options
Diffstat (limited to 'src/library/scala/concurrent')
19 files changed, 861 insertions, 619 deletions
diff --git a/src/library/scala/concurrent/BatchingExecutor.scala b/src/library/scala/concurrent/BatchingExecutor.scala index a0d7aaea47..fd31f3470e 100644 --- a/src/library/scala/concurrent/BatchingExecutor.scala +++ b/src/library/scala/concurrent/BatchingExecutor.scala @@ -103,7 +103,7 @@ private[concurrent] trait BatchingExecutor extends Executor { override def execute(runnable: Runnable): Unit = { if (batchable(runnable)) { // If we can batch the runnable _tasksLocal.get match { - case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch + case null => unbatchedExecute(new Batch(runnable :: Nil)) // If we aren't in batching mode yet, enqueue batch case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch } } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala index 747cc393c3..2b8ed4c7ca 100644 --- a/src/library/scala/concurrent/BlockContext.scala +++ b/src/library/scala/concurrent/BlockContext.scala @@ -41,7 +41,7 @@ package scala.concurrent trait BlockContext { /** Used internally by the framework; - * Designates (and eventually executes) a thunk which potentially blocks the calling `Thread`. + * Designates (and eventually executes) a thunk which potentially blocks the calling `java.lang.Thread`. * * Clients must use `scala.concurrent.blocking` or `scala.concurrent.Await` instead. */ @@ -53,9 +53,16 @@ object BlockContext { override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk } + /** + * @return the `BlockContext` that will be used if no other is found. + **/ + def defaultBlockContext: BlockContext = DefaultBlockContext + private val contextLocal = new ThreadLocal[BlockContext]() - /** Obtain the current thread's current `BlockContext`. */ + /** + @return the `BlockContext` that would be used for the current `java.lang.Thread` at this point + **/ def current: BlockContext = contextLocal.get match { case null => Thread.currentThread match { case ctx: BlockContext => ctx @@ -64,7 +71,9 @@ object BlockContext { case some => some } - /** Pushes a current `BlockContext` while executing `body`. */ + /** + * Installs a current `BlockContext` around executing `body`. + **/ def withBlockContext[T](blockContext: BlockContext)(body: => T): T = { val old = contextLocal.get // can be null try { diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index e380c55880..f46f294387 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -11,7 +11,6 @@ package scala.concurrent import java.util.concurrent.{ ExecutorService, Executor } import scala.annotation.implicitNotFound -import scala.util.Try /** * An `ExecutionContext` can execute program logic asynchronously, @@ -26,21 +25,20 @@ import scala.util.Try * and an implicit `ExecutionContext`. The implicit `ExecutionContext` * will be used to execute the callback. * - * It is possible to simply import + * While it is possible to simply import * `scala.concurrent.ExecutionContext.Implicits.global` to obtain an - * implicit `ExecutionContext`. This global context is a reasonable - * default thread pool. - * - * However, application developers should carefully consider where they - * want to set policy; ideally, one place per application (or per - * logically-related section of code) will make a decision about - * which `ExecutionContext` to use. That is, you might want to avoid - * hardcoding `scala.concurrent.ExecutionContext.Implicits.global` all - * over the place in your code. - * One approach is to add `(implicit ec: ExecutionContext)` - * to methods which need an `ExecutionContext`. Then import a specific - * context in one place for the entire application or module, - * passing it implicitly to individual methods. + * implicit `ExecutionContext`, application developers should carefully + * consider where they want to set execution policy; + * ideally, one place per application—or per logically related section of code— + * will make a decision about which `ExecutionContext` to use. + * That is, you will mostly want to avoid hardcoding, especially via an import, + * `scala.concurrent.ExecutionContext.Implicits.global`. + * The recommended approach is to add `(implicit ec: ExecutionContext)` to methods, + * or class constructor parameters, which need an `ExecutionContext`. + * + * Then locally import a specific `ExecutionContext` in one place for the entire + * application or module, passing it implicitly to individual methods. + * Alternatively define a local implicit val with the required `ExecutionContext`. * * A custom `ExecutionContext` may be appropriate to execute code * which blocks on IO or performs long-running computations. @@ -72,22 +70,24 @@ trait ExecutionContext { */ def reportFailure(@deprecatedName('t) cause: Throwable): Unit - /** Prepares for the execution of a task. Returns the prepared execution context. - * - * `prepare` should be called at the site where an `ExecutionContext` is received (for - * example, through an implicit method parameter). The returned execution context may - * then be used to execute tasks. The role of `prepare` is to save any context relevant - * to an execution's ''call site'', so that this context may be restored at the - * ''execution site''. (These are often different: for example, execution may be - * suspended through a `Promise`'s future until the `Promise` is completed, which may - * be done in another thread, on another stack.) - * - * Note: a valid implementation of `prepare` is one that simply returns `this`. - * - * @return the prepared execution context - */ + /** Prepares for the execution of a task. Returns the prepared + * execution context. The recommended implementation of + * `prepare` is to return `this`. + * + * This method should no longer be overridden or called. It was + * originally expected that `prepare` would be called by + * all libraries that consume ExecutionContexts, in order to + * capture thread local context. However, this usage has proven + * difficult to implement in practice and instead it is + * now better to avoid using `prepare` entirely. + * + * Instead, if an `ExecutionContext` needs to capture thread + * local context, it should capture that context when it is + * constructed, so that it doesn't need any additional + * preparation later. + */ + @deprecated("preparation of ExecutionContexts will be removed", "2.12.0") def prepare(): ExecutionContext = this - } /** @@ -110,13 +110,22 @@ object ExecutionContext { * The explicit global `ExecutionContext`. Invoke `global` when you want to provide the global * `ExecutionContext` explicitly. * - * The default `ExecutionContext` implementation is backed by a work-stealing thread pool. By default, - * the thread pool uses a target number of worker threads equal to the number of - * [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]]. + * The default `ExecutionContext` implementation is backed by a work-stealing thread pool. + * It can be configured via the following [[scala.sys.SystemProperties]]: + * + * `scala.concurrent.context.minThreads` = defaults to "1" + * `scala.concurrent.context.numThreads` = defaults to "x1" (i.e. the current number of available processors * 1) + * `scala.concurrent.context.maxThreads` = defaults to "x1" (i.e. the current number of available processors * 1) + * `scala.concurrent.context.maxExtraThreads` = defaults to "256" + * + * The pool size of threads is then `numThreads` bounded by `minThreads` on the lower end and `maxThreads` on the high end. + * + * The `maxExtraThreads` is the maximum number of extra threads to have at any given time to evade deadlock, + * see [[scala.concurrent.BlockContext]]. * * @return the global `ExecutionContext` */ - def global: ExecutionContextExecutor = Implicits.global + def global: ExecutionContextExecutor = Implicits.global.asInstanceOf[ExecutionContextExecutor] object Implicits { /** @@ -127,7 +136,7 @@ object ExecutionContext { * the thread pool uses a target number of worker threads equal to the number of * [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]]. */ - implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor) + implicit lazy val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor) } /** Creates an `ExecutionContext` from the given `ExecutorService`. diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index ebc1e76ca1..6c1c9a0c80 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -10,34 +10,30 @@ package scala.concurrent import scala.language.higherKinds -import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } -import java.lang.{ Iterable => JIterable } -import java.util.{ LinkedList => JLinkedList } -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong, AtomicBoolean } +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger import scala.util.control.NonFatal -import scala.Option import scala.util.{Try, Success, Failure} - -import scala.annotation.tailrec -import scala.collection.mutable.Builder +import scala.concurrent.duration._ import scala.collection.generic.CanBuildFrom import scala.reflect.ClassTag - -/** The trait that represents futures. +/** A `Future` represents a value which may or may not *currently* be available, + * but will be available at some point, or an exception if that value could not be made available. * - * Asynchronous computations that yield futures are created with the `Future` call: + * Asynchronous computations that yield futures are created with the `Future.apply` call and are computed using a supplied `ExecutionContext`, + * which can be backed by a Thread pool. * * {{{ + * import ExecutionContext.Implicits.global * val s = "Hello" * val f: Future[String] = Future { * s + " future!" * } - * f onSuccess { - * case msg => println(msg) + * f foreach { + * msg => println(msg) * } * }}} * @@ -62,6 +58,10 @@ import scala.reflect.ClassTag * If a future is failed with a `scala.runtime.NonLocalReturnControl`, * it is completed with a value from that throwable instead. * + * @define swallowsExceptions + * Since this method executes asynchronously and does not produce a return value, + * any non-fatal exceptions thrown will be reported to the `ExecutionContext`. + * * @define nonDeterministic * Note: using this method yields nondeterministic dataflow programs. * @@ -91,16 +91,10 @@ import scala.reflect.ClassTag * thread. That is, the implementation may run multiple callbacks * in a batch within a single `execute()` and it may run * `execute()` either immediately or asynchronously. + * Completion of the Future must *happen-before* the invocation of the callback. */ trait Future[+T] extends Awaitable[T] { - - // The executor within the lexical scope - // of the Future trait. Note that this will - // (modulo bugs) _never_ execute a callback - // other than those below in this same file. - // - // See the documentation on `InternalCallbackExecutor` for more details. - private def internalExecutor = Future.InternalCallbackExecutor + import Future.{ InternalCallbackExecutor => internalExecutor } /* Callbacks */ @@ -111,12 +105,18 @@ trait Future[+T] extends Awaitable[T] { * If the future has already been completed with a value, * this will either be applied immediately or be scheduled asynchronously. * + * Note that the returned value of `pf` will be discarded. + * + * $swallowsExceptions * $multipleCallbacks * $callbackInContext + * + * @group Callbacks */ + @deprecated("use `foreach` or `onComplete` instead (keep in mind that they take total rather than partial functions)", "2.12.0") def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete { case Success(v) => - pf.applyOrElse[T, Any](v, Predef.conforms[T]) // Exploiting the cached function to avoid MatchError + pf.applyOrElse[T, Any](v, Predef.identity[T]) // Exploiting the cached function to avoid MatchError case _ => } @@ -130,12 +130,18 @@ trait Future[+T] extends Awaitable[T] { * * Will not be called in case that the future is completed with a value. * + * Note that the returned value of `pf` will be discarded. + * + * $swallowsExceptions * $multipleCallbacks * $callbackInContext + * + * @group Callbacks */ + @deprecated("use `onComplete` or `failed.foreach` instead (keep in mind that they take total rather than partial functions)", "2.12.0") def onFailure[U](@deprecatedName('callback) pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { case Failure(t) => - pf.applyOrElse[Throwable, Any](t, Predef.conforms[Throwable]) // Exploiting the cached function to avoid MatchError + pf.applyOrElse[Throwable, Any](t, Predef.identity[Throwable]) // Exploiting the cached function to avoid MatchError case _ => } @@ -145,63 +151,75 @@ trait Future[+T] extends Awaitable[T] { * If the future has already been completed, * this will either be applied immediately or be scheduled asynchronously. * + * Note that the returned value of `f` will be discarded. + * + * $swallowsExceptions * $multipleCallbacks * $callbackInContext + * + * @tparam U only used to accept any return type of the given callback function + * @param f the function to be executed when this `Future` completes + * @group Callbacks */ def onComplete[U](@deprecatedName('func) f: Try[T] => U)(implicit executor: ExecutionContext): Unit /* Miscellaneous */ - /** Returns whether the future has already been completed with + /** Returns whether the future had already been completed with * a value or an exception. * * $nonDeterministic * - * @return `true` if the future is already completed, `false` otherwise + * @return `true` if the future was completed, `false` otherwise + * @group Polling */ def isCompleted: Boolean - /** The value of this `Future`. + /** The current value of this `Future`. + * + * $nonDeterministic * - * If the future is not completed the returned value will be `None`. - * If the future is completed the value will be `Some(Success(t))` - * if it contains a valid result, or `Some(Failure(error))` if it contains + * If the future was not completed the returned value will be `None`. + * If the future was completed the value will be `Some(Success(t))` + * if it contained a valid result, or `Some(Failure(error))` if it contained * an exception. + * + * @return `None` if the `Future` wasn't completed, `Some` if it was. + * @group Polling */ def value: Option[Try[T]] /* Projections */ - /** Returns a failed projection of this future. - * - * The failed projection is a future holding a value of type `Throwable`. + /** The returned `Future` will be successfully completed with the `Throwable` of the original `Future` + * if the original `Future` fails. * - * It is completed with a value which is the throwable of the original future - * in case the original future is failed. + * If the original `Future` is successful, the returned `Future` is failed with a `NoSuchElementException`. * - * It is failed with a `NoSuchElementException` if the original future is completed successfully. - * - * Blocking on this future returns a value if the original future is completed with an exception - * and throws a corresponding exception if the original future fails. + * @return a failed projection of this `Future`. + * @group Transformations */ - def failed: Future[Throwable] = { - implicit val ec = internalExecutor - val p = Promise[Throwable]() - onComplete { - case Failure(t) => p success t - case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) - } - p.future - } + def failed: Future[Throwable] = + transform({ + case Failure(t) => Success(t) + case Success(v) => Failure(new NoSuchElementException("Future.failed not completed with a throwable.")) + })(internalExecutor) /* Monadic operations */ /** Asynchronously processes the value in the future once the value becomes available. * - * Will not be called if the future fails. + * WARNING: Will not be called if this future is never completed or if it is completed with a failure. + * + * $swallowsExceptions + * + * @tparam U only used to accept any return type of the given callback function + * @param f the function which will be executed if this `Future` completes with a result, + * the return value of `f` will be discarded. + * @group Callbacks */ def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { _ foreach f } @@ -210,33 +228,63 @@ trait Future[+T] extends Awaitable[T] { * exception thrown when 's' or 'f' is applied, that exception will be propagated * to the resulting future. * - * @param s function that transforms a successful result of the receiver into a - * successful result of the returned future - * @param f function that transforms a failure of the receiver into a failure of - * the returned future - * @return a future that will be completed with the transformed value - */ - def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = { - val p = Promise[S]() - // transform on Try has the wrong shape for us here - onComplete { - case Success(r) => p complete Try(s(r)) - case Failure(t) => p complete Try(throw f(t)) // will throw fatal errors! + * @tparam S the type of the returned `Future` + * @param s function that transforms a successful result of the receiver into a successful result of the returned future + * @param f function that transforms a failure of the receiver into a failure of the returned future + * @return a `Future` that will be completed with the transformed value + * @group Transformations + */ + def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = + transform { + case Success(r) => Try(s(r)) + case Failure(t) => Try(throw f(t)) // will throw fatal errors! } - p.future - } + + /** Creates a new Future by applying the specified function to the result + * of this Future. If there is any non-fatal exception thrown when 'f' + * is applied then that exception will be propagated to the resulting future. + * + * @tparam S the type of the returned `Future` + * @param f function that transforms the result of this future + * @return a `Future` that will be completed with the transformed value + * @group Transformations + */ + def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] + + /** Creates a new Future by applying the specified function, which produces a Future, to the result + * of this Future. If there is any non-fatal exception thrown when 'f' + * is applied then that exception will be propagated to the resulting future. + * + * @tparam S the type of the returned `Future` + * @param f function that transforms the result of this future + * @return a `Future` that will be completed with the transformed value + * @group Transformations + */ + def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] + /** Creates a new future by applying a function to the successful result of * this future. If this future is completed with an exception then the new * future will also contain this exception. * - * $forComprehensionExamples + * Example: + * + * {{{ + * val f = Future { "The future" } + * val g = f map { x: String => x + " is now!" } + * }}} + * + * Note that a for comprehension involving a `Future` + * may expand to include a call to `map` and or `flatMap` + * and `withFilter`. See [[scala.concurrent.Future#flatMap]] for an example of such a comprehension. + * + * + * @tparam S the type of the returned `Future` + * @param f the function which will be applied to the successful result of this `Future` + * @return a `Future` which will be completed with the result of the application of the function + * @group Transformations */ - def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity) - val p = Promise[S]() - onComplete { v => p complete (v map f) } - p.future - } + def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f) /** Creates a new future by applying a function to the successful result of * this future, and returns the result of the function as the new future. @@ -244,21 +292,25 @@ trait Future[+T] extends Awaitable[T] { * also contain this exception. * * $forComprehensionExamples + * + * @tparam S the type of the returned `Future` + * @param f the function which will be applied to the successful result of this `Future` + * @return a `Future` which will be completed with the result of the application of the function + * @group Transformations */ - def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { - import impl.Promise.DefaultPromise - val p = new DefaultPromise[S]() - onComplete { - case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - case Success(v) => try f(v) match { - // If possible, link DefaultPromises to avoid space leaks - case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p) - case fut => fut.onComplete(p.complete)(internalExecutor) - } catch { case NonFatal(t) => p failure t } - } - p.future + def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = transformWith { + case Success(s) => f(s) + case Failure(_) => this.asInstanceOf[Future[S]] } + /** Creates a new future with one level of nesting flattened, this method is equivalent + * to `flatMap(identity)`. + * + * @tparam S the type of the returned `Future` + * @group Transformations + */ + def flatten[S](implicit ev: T <:< Future[S]): Future[S] = flatMap(ev)(internalExecutor) + /** Creates a new future by filtering the value of the current future with a predicate. * * If the current future contains a value which satisfies the predicate, the new future will also hold that value. @@ -271,16 +323,19 @@ trait Future[+T] extends Awaitable[T] { * val f = Future { 5 } * val g = f filter { _ % 2 == 1 } * val h = f filter { _ % 2 == 0 } - * Await.result(g, Duration.Zero) // evaluates to 5 + * g foreach println // Eventually prints 5 * Await.result(h, Duration.Zero) // throw a NoSuchElementException * }}} + * + * @param p the predicate to apply to the successful result of this `Future` + * @return a `Future` which will hold the successful result of this `Future` if it matches the predicate or a `NoSuchElementException` + * @group Transformations */ def filter(@deprecatedName('pred) p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = - map { - r => if (p(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied") - } + map { r => if (p(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied") } /** Used by for-comprehensions. + * @group Transformations */ final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor) @@ -300,9 +355,14 @@ trait Future[+T] extends Awaitable[T] { * val h = f collect { * case x if x > 0 => x * 2 * } - * Await.result(g, Duration.Zero) // evaluates to 5 + * g foreach println // Eventually prints 5 * Await.result(h, Duration.Zero) // throw a NoSuchElementException * }}} + * + * @tparam S the type of the returned `Future` + * @param pf the `PartialFunction` to apply to the successful result of this `Future` + * @return a `Future` holding the result of application of the `PartialFunction` or a `NoSuchElementException` + * @group Transformations */ def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = map { @@ -320,12 +380,14 @@ trait Future[+T] extends Awaitable[T] { * Future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception * Future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3 * }}} + * + * @tparam U the type of the returned `Future` + * @param pf the `PartialFunction` to apply if this `Future` fails + * @return a `Future` with the successful value of this `Future` or the result of the `PartialFunction` + * @group Transformations */ - def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = { - val p = Promise[U]() - onComplete { v => p complete (v recover pf) } - p.future - } + def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = + transform { _ recover pf } /** Creates a new future that will handle any matching throwable that this * future might contain by assigning it a value of another future. @@ -339,15 +401,17 @@ trait Future[+T] extends Awaitable[T] { * val f = Future { Int.MaxValue } * Future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue * }}} + * + * @tparam U the type of the returned `Future` + * @param pf the `PartialFunction` to apply if this `Future` fails + * @return a `Future` with the successful value of this `Future` or the outcome of the `Future` returned by the `PartialFunction` + * @group Transformations */ - def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = { - val p = Promise[U]() - onComplete { - case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this).onComplete(p.complete)(internalExecutor) catch { case NonFatal(t) => p failure t } - case other => p complete other + def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = + transformWith { + case Failure(t) => pf.applyOrElse(t, (_: Throwable) => this) + case Success(_) => this } - p.future - } /** Zips the values of `this` and `that` future, and creates * a new future holding the tuple of their results. @@ -356,17 +420,37 @@ trait Future[+T] extends Awaitable[T] { * with the throwable stored in `this`. * Otherwise, if `that` future fails, the resulting future is failed * with the throwable stored in `that`. + * + * @tparam U the type of the other `Future` + * @param that the other `Future` + * @return a `Future` with the results of both futures or the failure of the first of them that failed + * @group Transformations */ def zip[U](that: Future[U]): Future[(T, U)] = { implicit val ec = internalExecutor - val p = Promise[(T, U)]() - onComplete { - case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]] - case Success(s) => that onComplete { c => p.complete(c map { s2 => (s, s2) }) } - } - p.future + flatMap { r1 => that.map(r2 => (r1, r2)) } } + /** Zips the values of `this` and `that` future using a function `f`, + * and creates a new future holding the result. + * + * If `this` future fails, the resulting future is failed + * with the throwable stored in `this`. + * Otherwise, if `that` future fails, the resulting future is failed + * with the throwable stored in `that`. + * If the application of `f` throws a throwable, the resulting future + * is failed with that throwable if it is non-fatal. + * + * @tparam U the type of the other `Future` + * @tparam R the type of the resulting `Future` + * @param that the other `Future` + * @param f the function to apply to the results of `this` and `that` + * @return a `Future` with the result of the application of `f` to the results of `this` and `that` + * @group Transformations + */ + def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = + flatMap(r1 => that.map(r2 => f(r1, r2)))(internalExecutor) + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, * the result of the `that` future if `that` is completed successfully. * If both futures are failed, the resulting future holds the throwable object of the first future. @@ -378,24 +462,28 @@ trait Future[+T] extends Awaitable[T] { * val f = Future { sys.error("failed") } * val g = Future { 5 } * val h = f fallbackTo g - * Await.result(h, Duration.Zero) // evaluates to 5 + * h foreach println // Eventually prints 5 * }}} + * + * @tparam U the type of the other `Future` and the resulting `Future` + * @param that the `Future` whose result we want to use if this `Future` fails. + * @return a `Future` with the successful result of this or that `Future` or the failure of this `Future` if both fail + * @group Transformations */ - def fallbackTo[U >: T](that: Future[U]): Future[U] = { - implicit val ec = internalExecutor - val p = Promise[U]() - onComplete { - case s @ Success(_) => p complete s - case f @ Failure(_) => that onComplete { - case s2 @ Success(_) => p complete s2 - case _ => p complete f // Use the first failure as the failure - } + def fallbackTo[U >: T](that: Future[U]): Future[U] = + if (this eq that) this + else { + implicit val ec = internalExecutor + recoverWith { case _ => that } recoverWith { case _ => this } } - p.future - } /** Creates a new `Future[S]` which is completed with this `Future`'s result if * that conforms to `S`'s erased type or a `ClassCastException` otherwise. + * + * @tparam S the type of the returned `Future` + * @param tag the `ClassTag` which will be used to cast the result of this `Future` + * @return a `Future` holding the casted result of this `Future` or a `ClassCastException` otherwise + * @group Transformations */ def mapTo[S](implicit tag: ClassTag[S]): Future[S] = { implicit val ec = internalExecutor @@ -429,15 +517,22 @@ trait Future[+T] extends Awaitable[T] { * case Success(v) => println(v) * } * }}} + * + * $swallowsExceptions + * + * @tparam U only used to accept any return type of the given `PartialFunction` + * @param pf a `PartialFunction` which will be conditionally applied to the outcome of this `Future` + * @return a `Future` which will be completed with the exact same outcome as this `Future` but after the `PartialFunction` has been executed. + * @group Callbacks */ - def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = { - val p = Promise[T]() - onComplete { - case r => try pf.applyOrElse[Try[T], Any](r, Predef.conforms[Try[T]]) finally p complete r - } - p.future - } + def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = + transform { + result => + try pf.applyOrElse[Try[T], Any](result, Predef.identity[Try[T]]) + catch { case NonFatal(t) => executor reportFailure t } + result + } } @@ -461,48 +556,122 @@ object Future { classOf[Unit] -> classOf[scala.runtime.BoxedUnit] ) + /** A Future which is never completed. + */ + final object never extends Future[Nothing] { + + @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) + override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { + atMost match { + case e if e eq Duration.Undefined => throw new IllegalArgumentException("cannot wait for Undefined period") + case Duration.Inf => new CountDownLatch(1).await() + case Duration.MinusInf => // Drop out + case f: FiniteDuration => + if (f > Duration.Zero) new CountDownLatch(1).await(f.toNanos, TimeUnit.NANOSECONDS) + } + throw new TimeoutException(s"Future timed out after [$atMost]") + } + + @throws(classOf[Exception]) + override def result(atMost: Duration)(implicit permit: CanAwait): Nothing = { + ready(atMost) + throw new TimeoutException(s"Future timed out after [$atMost]") + } + + override def onSuccess[U](pf: PartialFunction[Nothing, U])(implicit executor: ExecutionContext): Unit = () + override def onFailure[U](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = () + override def onComplete[U](f: Try[Nothing] => U)(implicit executor: ExecutionContext): Unit = () + override def isCompleted: Boolean = false + override def value: Option[Try[Nothing]] = None + override def failed: Future[Throwable] = this + override def foreach[U](f: Nothing => U)(implicit executor: ExecutionContext): Unit = () + override def transform[S](s: Nothing => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = this + override def transform[S](f: Try[Nothing] => Try[S])(implicit executor: ExecutionContext): Future[S] = this + override def transformWith[S](f: Try[Nothing] => Future[S])(implicit executor: ExecutionContext): Future[S] = this + override def map[S](f: Nothing => S)(implicit executor: ExecutionContext): Future[S] = this + override def flatMap[S](f: Nothing => Future[S])(implicit executor: ExecutionContext): Future[S] = this + override def flatten[S](implicit ev: Nothing <:< Future[S]): Future[S] = this + override def filter(p: Nothing => Boolean)(implicit executor: ExecutionContext): Future[Nothing] = this + override def collect[S](pf: PartialFunction[Nothing, S])(implicit executor: ExecutionContext): Future[S] = this + override def recover[U >: Nothing](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = this + override def recoverWith[U >: Nothing](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = this + override def zip[U](that: Future[U]): Future[(Nothing, U)] = this + override def zipWith[U, R](that: Future[U])(f: (Nothing, U) => R)(implicit executor: ExecutionContext): Future[R] = this + override def fallbackTo[U >: Nothing](that: Future[U]): Future[U] = this + override def mapTo[S](implicit tag: ClassTag[S]): Future[S] = this + override def andThen[U](pf: PartialFunction[Try[Nothing], U])(implicit executor: ExecutionContext): Future[Nothing] = this + + override def toString: String = "Future(<never>)" + } + + /** A Future which is always completed with the Unit value. + */ + val unit: Future[Unit] = successful(()) + /** Creates an already completed Future with the specified exception. * - * @tparam T the type of the value in the future - * @return the newly created `Future` object + * @tparam T the type of the value in the future + * @param exception the non-null instance of `Throwable` + * @return the newly created `Future` instance */ def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future /** Creates an already completed Future with the specified result. * * @tparam T the type of the value in the future - * @return the newly created `Future` object + * @param result the given successful value + * @return the newly created `Future` instance */ def successful[T](result: T): Future[T] = Promise.successful(result).future /** Creates an already completed Future with the specified result or exception. * - * @tparam T the type of the value in the promise - * @return the newly created `Future` object + * @tparam T the type of the value in the `Future` + * @param result the result of the returned `Future` instance + * @return the newly created `Future` instance */ def fromTry[T](result: Try[T]): Future[T] = Promise.fromTry(result).future - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + /** Starts an asynchronous computation and returns a `Future` instance with the result of that computation. + * + * The following expressions are equivalent: + * + * {{{ + * val f1 = Future(expr) + * val f2 = Future.unit.map(_ => expr) + * }}} * * The result becomes available once the asynchronous computation is completed. * - * @tparam T the type of the result - * @param body the asynchronous computation + * @tparam T the type of the result + * @param body the asynchronous computation * @param executor the execution context on which the future is run - * @return the `Future` holding the result of the computation + * @return the `Future` holding the result of the computation */ - def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body) + def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = + unit.map(_ => body) - /** Simple version of `Future.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`. - * Useful for reducing many `Future`s into a single `Future`. + /** Simple version of `Future.traverse`. Asynchronously and non-blockingly transforms a `TraversableOnce[Future[A]]` + * into a `Future[TraversableOnce[A]]`. Useful for reducing many `Future`s into a single `Future`. + * + * @tparam A the type of the value inside the Futures + * @tparam M the type of the `TraversableOnce` of Futures + * @param in the `TraversableOnce` of Futures which will be sequenced + * @return the `Future` of the `TraversableOnce` of results */ def sequence[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { in.foldLeft(successful(cbf(in))) { - (fr, fa) => for (r <- fr; a <- fa) yield (r += a) + (fr, fa) => fr.zipWith(fa)(_ += _) }.map(_.result())(InternalCallbackExecutor) } - /** Returns a new `Future` to the result of the first future in the list that is completed. + /** Asynchronously and non-blockingly returns a new `Future` to the result of the first future + * in the list that is completed. This means no matter if it is completed as a success or as a failure. + * + * @tparam T the type of the value in the future + * @param futures the `TraversableOnce` of Futures in which to find the first completed + * @return the `Future` holding the result of the future that is first to be completed */ def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() @@ -511,8 +680,15 @@ object Future { p.future } - /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate. + /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result + * of the first `Future` with a result that matches the predicate. + * + * @tparam T the type of the value in the future + * @param futures the `TraversableOnce` of Futures to search + * @param p the predicate which indicates if it's a match + * @return the `Future` holding the optional result of the search */ + @deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0") def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { val futuresBuffer = futures.toBuffer if (futuresBuffer.isEmpty) successful[Option[T]](None) @@ -536,46 +712,133 @@ object Future { } } - /** A non-blocking fold over the specified futures, with the start value of the given zero. + + /** Asynchronously and non-blockingly returns a `Future` that will hold the optional result + * of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored. + * + * @tparam T the type of the value in the future + * @param futures the `scala.collection.immutable.Iterable` of Futures to search + * @param p the predicate which indicates if it's a match + * @return the `Future` holding the optional result of the search + */ + def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { + def searchNext(i: Iterator[Future[T]]): Future[Option[T]] = + if (!i.hasNext) successful[Option[T]](None) + else { + i.next().transformWith { + case Success(r) if p(r) => successful(Some(r)) + case other => searchNext(i) + } + } + searchNext(futures.iterator) + } + + /** A non-blocking, asynchronous left fold over the specified futures, + * with the start value of the given zero. + * The fold is performed asynchronously in left-to-right order as the futures become completed. + * The result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * + * Example: + * {{{ + * val futureSum = Future.foldLeft(futures)(0)(_ + _) + * }}} + * + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `scala.collection.immutable.Iterable` of Futures to be folded + * @param zero the start value of the fold + * @param op the fold operation to be applied to the zero and futures + * @return the `Future` holding the result of the fold + */ + def foldLeft[T, R](futures: scala.collection.immutable.Iterable[Future[T]])(zero: R)(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = + foldNext(futures.iterator, zero, op) + + private[this] def foldNext[T, R](i: Iterator[Future[T]], prevValue: R, op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = + if (!i.hasNext) successful(prevValue) + else i.next().flatMap { value => foldNext(i, op(prevValue, value), op) } + + /** A non-blocking, asynchronous fold over the specified futures, with the start value of the given zero. * The fold is performed on the thread where the last future is completed, * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. * * Example: * {{{ - * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) + * val futureSum = Future.fold(futures)(0)(_ + _) * }}} + * + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `TraversableOnce` of Futures to be folded + * @param zero the start value of the fold + * @param op the fold operation to be applied to the zero and futures + * @return the `Future` holding the result of the fold */ + @deprecated("use Future.foldLeft instead", "2.12.0") def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(@deprecatedName('foldFun) op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) successful(zero) else sequence(futures).map(_.foldLeft(zero)(op)) } - /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first. + /** Initiates a non-blocking, asynchronous, fold over the supplied futures + * where the fold-zero is the result value of the `Future` that's completed first. * * Example: * {{{ - * val result = Await.result(Future.reduce(futures)(_ + _), 5 seconds) + * val futureSum = Future.reduce(futures)(_ + _) * }}} + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `TraversableOnce` of Futures to be reduced + * @param op the reduce operation which is applied to the results of the futures + * @return the `Future` holding the result of the reduce */ + @deprecated("use Future.reduceLeft instead", "2.12.0") def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) failed(new NoSuchElementException("reduce attempted on empty collection")) else sequence(futures).map(_ reduceLeft op) } - /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`. + /** Initiates a non-blocking, asynchronous, left reduction over the supplied futures + * where the zero is the result value of the first `Future`. + * + * Example: + * {{{ + * val futureSum = Future.reduceLeft(futures)(_ + _) + * }}} + * @tparam T the type of the value of the input Futures + * @tparam R the type of the value of the returned `Future` + * @param futures the `scala.collection.immutable.Iterable` of Futures to be reduced + * @param op the reduce operation which is applied to the results of the futures + * @return the `Future` holding the result of the reduce + */ + def reduceLeft[T, R >: T](futures: scala.collection.immutable.Iterable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + val i = futures.iterator + if (!i.hasNext) failed(new NoSuchElementException("reduceLeft attempted on empty collection")) + else i.next() flatMap { v => foldNext(i, v, op) } + } + + /** Asynchronously and non-blockingly transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` + * using the provided function `A => Future[B]`. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel: * * {{{ * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x))) * }}} + * @tparam A the type of the value inside the Futures in the `TraversableOnce` + * @tparam B the type of the value of the returned `Future` + * @tparam M the type of the `TraversableOnce` of Futures + * @param in the `TraversableOnce` of Futures which will be sequenced + * @param fn the function to apply to the `TraversableOnce` of Futures to produce the results + * @return the `Future` of the `TraversableOnce` of results */ def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = - in.foldLeft(successful(cbf(in))) { (fr, a) => - val fb = fn(a) - for (r <- fr; b <- fb) yield (r += b) - }.map(_.result()) + in.foldLeft(successful(cbf(in))) { + (fr, a) => fr.zipWith(fn(a))(_ += _) + }.map(_.result())(InternalCallbackExecutor) + // This is used to run callbacks which are internal // to scala.concurrent; our own callbacks are only diff --git a/src/library/scala/concurrent/FutureTaskRunner.scala b/src/library/scala/concurrent/FutureTaskRunner.scala deleted file mode 100644 index 089e67cedd..0000000000 --- a/src/library/scala/concurrent/FutureTaskRunner.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2009-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - -import scala.language.{implicitConversions, higherKinds} - -/** The `FutureTaskRunner` trait is a base trait of task runners - * that provide some sort of future abstraction. - * - * @author Philipp Haller - */ -@deprecated("Use `ExecutionContext` instead.", "2.10.0") -private[scala] trait FutureTaskRunner extends TaskRunner { - - /** The type of the futures that the underlying task runner supports. - */ - type Future[T] - - /** An implicit conversion from futures to zero-parameter functions. - */ - implicit def futureAsFunction[S](x: Future[S]): () => S - - /** Submits a task to run which returns its result in a future. - */ - def submit[S](task: Task[S]): Future[S] - - /* Possibly blocks the current thread, for example, waiting for - * a lock or condition. - */ - @deprecated("Use `blocking` instead.", "2.10.0") - def managedBlock(blocker: ManagedBlocker): Unit - -} diff --git a/src/library/scala/concurrent/Lock.scala b/src/library/scala/concurrent/Lock.scala index 8d18da2d38..757fb94cc7 100644 --- a/src/library/scala/concurrent/Lock.scala +++ b/src/library/scala/concurrent/Lock.scala @@ -15,7 +15,7 @@ package scala.concurrent * @author Martin Odersky * @version 1.0, 10/03/2003 */ -@deprecated("Use java.util.concurrent.locks.Lock", "2.11.2") +@deprecated("use java.util.concurrent.locks.Lock", "2.11.2") class Lock { var available = true diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala deleted file mode 100644 index b5a6e21893..0000000000 --- a/src/library/scala/concurrent/ManagedBlocker.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - -/** The `ManagedBlocker` trait... - * - * @author Philipp Haller - */ -@deprecated("Use `blocking` instead.", "2.10.0") -private[scala] trait ManagedBlocker { - - /** - * Possibly blocks the current thread, for example waiting for - * a lock or condition. - * - * @return true if no additional blocking is necessary (i.e., - * if `isReleasable` would return `true`). - * @throws InterruptedException if interrupted while waiting - * (the method is not required to do so, but is allowed to). - */ - def block(): Boolean - - /** - * Returns `true` if blocking is unnecessary. - */ - def isReleasable: Boolean - -} diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 0f4e98db57..894b134e83 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -26,12 +26,6 @@ import scala.util.{ Try, Success, Failure } * Note: Using this method may result in non-deterministic concurrent programs. */ trait Promise[T] { - - // used for internal callbacks defined in - // the lexical scope of this trait; - // _never_ for application callbacks. - private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor - /** Future containing the value of this promise. */ def future: Future[T] @@ -73,7 +67,9 @@ trait Promise[T] { * @return This promise */ final def tryCompleteWith(other: Future[T]): this.type = { - other onComplete { this tryComplete _ } + if (other ne this.future) { // this tryCompleteWith this doesn't make much sense + other.onComplete(this tryComplete _)(Future.InternalCallbackExecutor) + } this } @@ -139,5 +135,5 @@ object Promise { * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ - def fromTry[T](result: Try[T]): Promise[T] = new impl.Promise.KeptPromise[T](result) + def fromTry[T](result: Try[T]): Promise[T] = impl.Promise.KeptPromise[T](result) } diff --git a/src/library/scala/concurrent/SyncChannel.scala b/src/library/scala/concurrent/SyncChannel.scala index ec584b3eb0..735598935c 100644 --- a/src/library/scala/concurrent/SyncChannel.scala +++ b/src/library/scala/concurrent/SyncChannel.scala @@ -31,10 +31,10 @@ class SyncChannel[A] { pendingReads = pendingReads.tail // let reader continue - readReq set data + readReq put data // resolve write request - writeReq set true + writeReq put true } else { // enqueue write request @@ -57,10 +57,10 @@ class SyncChannel[A] { pendingWrites = pendingWrites.tail // let writer continue - writeReq set true + writeReq.put(true) // resolve read request - readReq set data + readReq.put (data) } else { // enqueue read request diff --git a/src/library/scala/concurrent/SyncVar.scala b/src/library/scala/concurrent/SyncVar.scala index 1ee27b0f36..77bfa95119 100644 --- a/src/library/scala/concurrent/SyncVar.scala +++ b/src/library/scala/concurrent/SyncVar.scala @@ -19,17 +19,17 @@ import java.util.concurrent.TimeUnit */ class SyncVar[A] { private var isDefined: Boolean = false - private var value: Option[A] = None + private var value: A = _ /** - * Waits for this SyncVar to become defined and returns - * the result, without modifying the stored value. + * Wait for this SyncVar to become defined and then get + * the stored value without modifying it. * * @return value that is held in this container */ def get: A = synchronized { while (!isDefined) wait() - value.get + value } /** Waits `timeout` millis. If `timeout <= 0` just returns 0. @@ -44,11 +44,10 @@ class SyncVar[A] { if (elapsed < 0) 0 else TimeUnit.NANOSECONDS.toMillis(elapsed) } - /** Waits for this SyncVar to become defined at least for - * `timeout` milliseconds (possibly more), and gets its - * value. + /** Wait at least `timeout` milliseconds (possibly more) for this `SyncVar` + * to become defined and then get its value. * - * @param timeout the amount of milliseconds to wait, 0 means forever + * @param timeout time in milliseconds to wait * @return `None` if variable is undefined after `timeout`, `Some(value)` otherwise */ def get(timeout: Long): Option[A] = synchronized { @@ -61,12 +60,12 @@ class SyncVar[A] { val elapsed = waitMeasuringElapsed(rest) rest -= elapsed } - value + if (isDefined) Some(value) else None } /** - * Waits for this SyncVar to become defined and returns - * the result, unsetting the stored value before returning. + * Wait for this SyncVar to become defined and then get + * the stored value, unsetting it as a side effect. * * @return value that was held in this container */ @@ -75,12 +74,11 @@ class SyncVar[A] { finally unsetVal() } - /** Waits for this SyncVar to become defined at least for - * `timeout` milliseconds (possibly more), and takes its - * value by first reading and then removing the value from - * the SyncVar. + /** Wait at least `timeout` milliseconds (possibly more) for this `SyncVar` + * to become defined and then get the stored value, unsetting it + * as a side effect. * - * @param timeout the amount of milliseconds to wait, 0 means forever + * @param timeout the amount of milliseconds to wait * @return the value or a throws an exception if the timeout occurs * @throws NoSuchElementException on timeout */ @@ -93,18 +91,18 @@ class SyncVar[A] { // [Heather] the reason why: it doesn't take into consideration // whether or not the SyncVar is already defined. So, set has been // deprecated in order to eventually be able to make "setting" private - @deprecated("Use `put` instead, as `set` is potentially error-prone", "2.10.0") - // NOTE: Used by SBT 0.13.0-M2 and below + @deprecated("use `put` to ensure a value cannot be overwritten without a corresponding `take`", "2.10.0") + // NOTE: Used by sbt 0.13.0-M2 and below def set(x: A): Unit = setVal(x) - /** Places a value in the SyncVar. If the SyncVar already has a stored value, - * it waits until another thread takes it */ + /** Place a value in the SyncVar. If the SyncVar already has a stored value, + * wait until another thread takes it. */ def put(x: A): Unit = synchronized { while (isDefined) wait() setVal(x) } - /** Checks whether a value is stored in the synchronized variable */ + /** Check whether a value is stored in the synchronized variable. */ def isSet: Boolean = synchronized { isDefined } @@ -113,11 +111,11 @@ class SyncVar[A] { // [Heather] the reason why: it doesn't take into consideration // whether or not the SyncVar is already defined. So, unset has been // deprecated in order to eventually be able to make "unsetting" private - @deprecated("Use `take` instead, as `unset` is potentially error-prone", "2.10.0") - // NOTE: Used by SBT 0.13.0-M2 and below + @deprecated("use `take` to ensure a value is never discarded", "2.10.0") + // NOTE: Used by sbt 0.13.0-M2 and below def unset(): Unit = synchronized { isDefined = false - value = None + value = null.asInstanceOf[A] notifyAll() } @@ -126,7 +124,7 @@ class SyncVar[A] { // implementation of `set` was moved to `setVal` to achieve this private def setVal(x: A): Unit = synchronized { isDefined = true - value = Some(x) + value = x notifyAll() } @@ -135,8 +133,7 @@ class SyncVar[A] { // implementation of `unset` was moved to `unsetVal` to achieve this private def unsetVal(): Unit = synchronized { isDefined = false - value = None + value = null.asInstanceOf[A] notifyAll() } - } diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala deleted file mode 100644 index 1ea23b35e8..0000000000 --- a/src/library/scala/concurrent/TaskRunner.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - -import scala.language.{higherKinds, implicitConversions} - -/** The `TaskRunner` trait... - * - * @author Philipp Haller - */ -@deprecated("Use `ExecutionContext` instead.", "2.10.0") -private[scala] trait TaskRunner { - - type Task[T] - - implicit def functionAsTask[S](fun: () => S): Task[S] - - def execute[S](task: Task[S]): Unit - - def shutdown(): Unit -} diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala deleted file mode 100644 index 7784681f71..0000000000 --- a/src/library/scala/concurrent/ThreadPoolRunner.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - -import java.util.concurrent.{ExecutorService, Callable, TimeUnit} -import scala.language.implicitConversions - -/** The `ThreadPoolRunner` trait uses a `java.util.concurrent.ExecutorService` - * to run submitted tasks. - * - * @author Philipp Haller - */ -@deprecated("Use `ExecutionContext` instead.", "2.10.0") -private[scala] trait ThreadPoolRunner extends FutureTaskRunner { - - type Task[T] = Callable[T] with Runnable - type Future[T] = java.util.concurrent.Future[T] - - private class RunCallable[S](fun: () => S) extends Runnable with Callable[S] { - def run() = fun() - def call() = fun() - } - - implicit def functionAsTask[S](fun: () => S): Task[S] = - new RunCallable(fun) - - implicit def futureAsFunction[S](x: Future[S]): () => S = - () => x.get() - - protected def executor: ExecutorService - - def submit[S](task: Task[S]): Future[S] = { - executor.submit[S](task) - } - - def execute[S](task: Task[S]) { - executor execute task - } - - @deprecated("Use `blocking` instead.", "2.10.0") - def managedBlock(blocker: ManagedBlocker) { - blocker.block() - } - -} diff --git a/src/library/scala/concurrent/duration/Duration.scala b/src/library/scala/concurrent/duration/Duration.scala index e68a897f82..d912f614c2 100644 --- a/src/library/scala/concurrent/duration/Duration.scala +++ b/src/library/scala/concurrent/duration/Duration.scala @@ -9,8 +9,6 @@ package scala.concurrent.duration import java.lang.{ Double => JDouble, Long => JLong } -import scala.language.implicitConversions -import scala.language.postfixOps object Duration { @@ -49,7 +47,7 @@ object Duration { * whitespace is allowed before, between and after the parts. Infinities are * designated by `"Inf"`, `"PlusInf"`, `"+Inf"` and `"-Inf"` or `"MinusInf"`. * - * @throws NumberFormatException if format is not parseable + * @throws NumberFormatException if format is not parsable */ def apply(s: String): Duration = { val s1: String = s filterNot (_.isWhitespace) @@ -57,7 +55,7 @@ object Duration { case "Inf" | "PlusInf" | "+Inf" => Inf case "MinusInf" | "-Inf" => MinusInf case _ => - val unitName = s1.reverse takeWhile (_.isLetter) reverse; + val unitName = s1.reverse.takeWhile(_.isLetter).reverse; timeUnit get unitName match { case Some(unit) => val valueStr = s1 dropRight unitName.length @@ -87,11 +85,11 @@ object Duration { // TimeUnit => standard label protected[duration] val timeUnitName: Map[TimeUnit, String] = - timeUnitLabels.toMap mapValues (s => words(s).last) toMap + timeUnitLabels.toMap.mapValues(s => words(s).last).toMap // Label => TimeUnit protected[duration] val timeUnit: Map[String, TimeUnit] = - timeUnitLabels flatMap { case (unit, names) => expandLabels(names) map (_ -> unit) } toMap + timeUnitLabels.flatMap{ case (unit, names) => expandLabels(names) map (_ -> unit) }.toMap /** * Extract length and time unit out of a string, where the format must match the description for [[Duration$.apply(s:String)* apply(String)]]. @@ -122,7 +120,7 @@ object Duration { def fromNanos(nanos: Double): Duration = { if (nanos.isInfinite) if (nanos > 0) Inf else MinusInf - else if (nanos.isNaN) + else if (JDouble.isNaN(nanos)) Undefined else if (nanos > Long.MaxValue || nanos < Long.MinValue) throw new IllegalArgumentException("trying to construct too large duration with " + nanos + "ns") @@ -198,11 +196,11 @@ object Duration { } def *(factor: Double): Duration = - if (factor == 0d || factor.isNaN) Undefined + if (factor == 0d || JDouble.isNaN(factor)) Undefined else if (factor < 0d) -this else this def /(divisor: Double): Duration = - if (divisor.isNaN || divisor.isInfinite) Undefined + if (JDouble.isNaN(divisor) || divisor.isInfinite) Undefined else if ((divisor compare 0d) < 0) -this else this def /(divisor: Duration): Double = divisor match { @@ -287,7 +285,7 @@ object Duration { * whitespace is allowed before, between and after the parts. Infinities are * designated by `"Inf"`, `"PlusInf"`, `"+Inf"` and `"-Inf"` or `"MinusInf"`. * - * @throws NumberFormatException if format is not parseable + * @throws NumberFormatException if format is not parsable */ def create(s: String): Duration = apply(s) @@ -629,13 +627,13 @@ final class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duratio def *(factor: Double) = if (!factor.isInfinite) fromNanos(toNanos * factor) - else if (factor.isNaN) Undefined + else if (JDouble.isNaN(factor)) Undefined else if ((factor > 0) ^ (this < Zero)) Inf else MinusInf def /(divisor: Double) = if (!divisor.isInfinite) fromNanos(toNanos / divisor) - else if (divisor.isNaN) Undefined + else if (JDouble.isNaN(divisor)) Undefined else Zero // if this is made a constant, then scalac will elide the conditional and always return +0.0, SI-6331 @@ -708,7 +706,7 @@ final class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duratio final def isFinite() = true - final def toCoarsest: Duration = { + final override def toCoarsest: FiniteDuration = { def loop(length: Long, unit: TimeUnit): FiniteDuration = { def coarserOrThis(coarser: TimeUnit, divider: Int) = if (length % divider == 0) loop(length / divider, coarser) diff --git a/src/library/scala/concurrent/forkjoin/package.scala b/src/library/scala/concurrent/forkjoin/package.scala new file mode 100644 index 0000000000..889890e30b --- /dev/null +++ b/src/library/scala/concurrent/forkjoin/package.scala @@ -0,0 +1,60 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2015, LAMP/EPFL and Typesafe, Inc. ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent +import java.util.{concurrent => juc} +import java.util.Collection + +package object forkjoin { + @deprecated("use java.util.concurrent.ForkJoinPool directly, instead of this alias", "2.12.0") + type ForkJoinPool = juc.ForkJoinPool + @deprecated("use java.util.concurrent.ForkJoinPool directly, instead of this alias", "2.12.0") + object ForkJoinPool { + type ForkJoinWorkerThreadFactory = juc.ForkJoinPool.ForkJoinWorkerThreadFactory + type ManagedBlocker = juc.ForkJoinPool.ManagedBlocker + + val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory + def managedBlock(blocker: ManagedBlocker): Unit = juc.ForkJoinPool.managedBlock(blocker) + } + + @deprecated("use java.util.concurrent.ForkJoinTask directly, instead of this alias", "2.12.0") + type ForkJoinTask[T] = juc.ForkJoinTask[T] + @deprecated("use java.util.concurrent.ForkJoinTask directly, instead of this alias", "2.12.0") + object ForkJoinTask extends scala.Serializable { + def adapt(runnable: Runnable): ForkJoinTask[_] = juc.ForkJoinTask.adapt(runnable) + def adapt[T](callable: juc.Callable[_ <: T]): ForkJoinTask[T] = juc.ForkJoinTask.adapt(callable) + def adapt[T](runnable: Runnable, result: T): ForkJoinTask[T] = juc.ForkJoinTask.adapt(runnable, result) + def getPool(): ForkJoinPool = juc.ForkJoinTask.getPool + def getQueuedTaskCount(): Int = juc.ForkJoinTask.getQueuedTaskCount + def getSurplusQueuedTaskCount(): Int = juc.ForkJoinTask.getSurplusQueuedTaskCount + def helpQuiesce(): Unit = juc.ForkJoinTask.helpQuiesce + def inForkJoinPool(): Boolean = juc.ForkJoinTask.inForkJoinPool + def invokeAll[T <: ForkJoinTask[_]](tasks: Collection[T]): Collection[T] = juc.ForkJoinTask.invokeAll(tasks) + def invokeAll[T](t1: ForkJoinTask[T]): Unit = juc.ForkJoinTask.invokeAll(t1) + def invokeAll[T](tasks: ForkJoinTask[T]*): Unit = juc.ForkJoinTask.invokeAll(tasks: _*) + } + + @deprecated("use java.util.concurrent.ForkJoinWorkerThread directly, instead of this alias", "2.12.0") + type ForkJoinWorkerThread = juc.ForkJoinWorkerThread + @deprecated("use java.util.concurrent.LinkedTransferQueue directly, instead of this alias", "2.12.0") + type LinkedTransferQueue[T] = juc.LinkedTransferQueue[T] + @deprecated("use java.util.concurrent.RecursiveAction directly, instead of this alias", "2.12.0") + type RecursiveAction = juc.RecursiveAction + @deprecated("use java.util.concurrent.RecursiveTask directly, instead of this alias", "2.12.0") + type RecursiveTask[T] = juc.RecursiveTask[T] + + @deprecated("use java.util.concurrent.ThreadLocalRandom directly, instead of this alias", "2.12.0") + type ThreadLocalRandom = juc.ThreadLocalRandom + @deprecated("use java.util.concurrent.ThreadLocalRandom directly, instead of this alias", "2.12.0") + object ThreadLocalRandom extends scala.Serializable { + // For source compatibility, current must declare the empty argument list. + // Having no argument list makes more sense since it doesn't have any side effects, + // but existing callers will break if they invoked it as `current()`. + def current() = juc.ThreadLocalRandom.current + } +} diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java deleted file mode 100644 index c2520a1692..0000000000 --- a/src/library/scala/concurrent/impl/AbstractPromise.java +++ /dev/null @@ -1,17 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2015, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent.impl; - -import java.util.concurrent.atomic.AtomicReference; - -@Deprecated // Since 2.11.8. Extend java.util.concurrent.atomic.AtomicReference instead. -abstract class AbstractPromise extends AtomicReference<Object> { - protected final boolean updateState(Object oldState, Object newState) { return compareAndSet(oldState, newState); } - protected final Object getState() { return get(); } -} diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 479720287c..19233d7531 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -8,55 +8,87 @@ package scala.concurrent.impl - - -import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor } +import java.util.concurrent.{ ForkJoinPool, ForkJoinWorkerThread, ForkJoinTask, Callable, Executor, ExecutorService, ThreadFactory, TimeUnit } +import java.util.concurrent.atomic.AtomicInteger import java.util.Collection -import scala.concurrent.forkjoin._ -import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } -import scala.util.control.NonFatal +import scala.concurrent.{ BlockContext, ExecutionContext, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } +import scala.annotation.tailrec +private[scala] class ExecutionContextImpl private[impl] (val executor: Executor, val reporter: Throwable => Unit) extends ExecutionContextExecutor { + require(executor ne null, "Executor must not be null") + override def execute(runnable: Runnable) = executor execute runnable + override def reportFailure(t: Throwable) = reporter(t) +} -private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor { - // Placed here since the creation of the executor needs to read this val - private[this] val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler { - def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause) - } - val executor: Executor = es match { - case null => createExecutorService - case some => some - } +private[concurrent] object ExecutionContextImpl { // Implement BlockContext on FJP threads - class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { + final class DefaultThreadFactory( + daemonic: Boolean, + maxThreads: Int, + prefix: String, + uncaught: Thread.UncaughtExceptionHandler) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { + + require(prefix ne null, "DefaultThreadFactory.prefix must be non null") + require(maxThreads > 0, "DefaultThreadFactory.maxThreads must be greater than 0") + + private final val currentNumberOfThreads = new AtomicInteger(0) + + @tailrec private final def reserveThread(): Boolean = currentNumberOfThreads.get() match { + case `maxThreads` | Int.`MaxValue` => false + case other => currentNumberOfThreads.compareAndSet(other, other + 1) || reserveThread() + } + + @tailrec private final def deregisterThread(): Boolean = currentNumberOfThreads.get() match { + case 0 => false + case other => currentNumberOfThreads.compareAndSet(other, other - 1) || deregisterThread() + } + def wire[T <: Thread](thread: T): T = { thread.setDaemon(daemonic) - thread.setUncaughtExceptionHandler(uncaughtExceptionHandler) + thread.setUncaughtExceptionHandler(uncaught) + thread.setName(prefix + "-" + thread.getId()) thread } - def newThread(runnable: Runnable): Thread = wire(new Thread(runnable)) - - def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { - override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { - var result: T = null.asInstanceOf[T] - ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { - @volatile var isdone = false - override def block(): Boolean = { - result = try thunk finally { isdone = true } - true + // As per ThreadFactory contract newThread should return `null` if cannot create new thread. + def newThread(runnable: Runnable): Thread = + if (reserveThread()) + wire(new Thread(new Runnable { + // We have to decrement the current thread count when the thread exits + override def run() = try runnable.run() finally deregisterThread() + })) else null + + def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = + if (reserveThread()) { + wire(new ForkJoinWorkerThread(fjp) with BlockContext { + // We have to decrement the current thread count when the thread exits + final override def onTermination(exception: Throwable): Unit = deregisterThread() + final override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { + var result: T = null.asInstanceOf[T] + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { + @volatile var isdone = false + override def block(): Boolean = { + result = try { + // When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads + BlockContext.withBlockContext(BlockContext.defaultBlockContext) { thunk } + } finally { + isdone = true + } + + true + } + override def isReleasable = isdone + }) + result } - override def isReleasable = isdone }) - result - } - }) + } else null } - def createExecutorService: ExecutorService = { - + def createDefaultExecutorService(reporter: Throwable => Unit): ExecutorService = { def getInt(name: String, default: String) = (try System.getProperty(name, default) catch { case e: SecurityException => default }) match { @@ -65,87 +97,79 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: } def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling) + val numThreads = getInt("scala.concurrent.context.numThreads", "x1") + // The hard limit on the number of active threads that the thread factory will produce + // SI-8955 Deadlocks can happen if maxNoOfThreads is too low, although we're currently not sure + // about what the exact threshold is. numThreads + 256 is conservatively high. + val maxNoOfThreads = getInt("scala.concurrent.context.maxThreads", "x1") val desiredParallelism = range( getInt("scala.concurrent.context.minThreads", "1"), - getInt("scala.concurrent.context.numThreads", "x1"), - getInt("scala.concurrent.context.maxThreads", "x1")) - - val threadFactory = new DefaultThreadFactory(daemonic = true) - - try { - new ForkJoinPool( - desiredParallelism, - threadFactory, - uncaughtExceptionHandler, - true) // Async all the way baby - } catch { - case NonFatal(t) => - System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to ThreadPoolExecutor") - t.printStackTrace(System.err) - val exec = new ThreadPoolExecutor( - desiredParallelism, - desiredParallelism, - 5L, - TimeUnit.MINUTES, - new LinkedBlockingQueue[Runnable], - threadFactory - ) - exec.allowCoreThreadTimeOut(true) - exec - } - } + numThreads, + maxNoOfThreads) - def execute(runnable: Runnable): Unit = executor match { - case fj: ForkJoinPool => - val fjt: ForkJoinTask[_] = runnable match { - case t: ForkJoinTask[_] => t - case r => new ExecutionContextImpl.AdaptedForkJoinTask(r) - } - Thread.currentThread match { - case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork() - case _ => fj execute fjt - } - case generic => generic execute runnable - } + // The thread factory must provide additional threads to support managed blocking. + val maxExtraThreads = getInt("scala.concurrent.context.maxExtraThreads", "256") - def reportFailure(t: Throwable) = reporter(t) -} + val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause) + } + val threadFactory = new ExecutionContextImpl.DefaultThreadFactory(daemonic = true, + maxThreads = maxNoOfThreads + maxExtraThreads, + prefix = "scala-execution-context-global", + uncaught = uncaughtExceptionHandler) -private[concurrent] object ExecutionContextImpl { + new ForkJoinPool(desiredParallelism, threadFactory, uncaughtExceptionHandler, true) { + override def execute(runnable: Runnable): Unit = { + val fjt: ForkJoinTask[_] = runnable match { + case t: ForkJoinTask[_] => t + case r => new ExecutionContextImpl.AdaptedForkJoinTask(r) + } + Thread.currentThread match { + case fjw: ForkJoinWorkerThread if fjw.getPool eq this => fjt.fork() + case _ => super.execute(fjt) + } + } + } + } final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] { - final override def setRawResult(u: Unit): Unit = () - final override def getRawResult(): Unit = () - final override def exec(): Boolean = try { runnable.run(); true } catch { - case anything: Throwable ⇒ - val t = Thread.currentThread - t.getUncaughtExceptionHandler match { - case null ⇒ - case some ⇒ some.uncaughtException(t, anything) - } - throw anything - } + final override def setRawResult(u: Unit): Unit = () + final override def getRawResult(): Unit = () + final override def exec(): Boolean = try { runnable.run(); true } catch { + case anything: Throwable => + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null => + case some => some.uncaughtException(t, anything) } + throw anything + } + } - def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter) - def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService = - new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService { - final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService] - override def execute(command: Runnable) = executor.execute(command) - override def shutdown() { asExecutorService.shutdown() } - override def shutdownNow() = asExecutorService.shutdownNow() - override def isShutdown = asExecutorService.isShutdown - override def isTerminated = asExecutorService.isTerminated - override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit) - override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable) - override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t) - override def submit(runnable: Runnable) = asExecutorService.submit(runnable) - override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables) - override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit) - override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables) - override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit) + def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = + new ExecutionContextImpl(Option(e).getOrElse(createDefaultExecutorService(reporter)), reporter) + + def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): + ExecutionContextImpl with ExecutionContextExecutorService = { + new ExecutionContextImpl(Option(es).getOrElse(createDefaultExecutorService(reporter)), reporter) + with ExecutionContextExecutorService { + final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService] + override def execute(command: Runnable) = executor.execute(command) + override def shutdown() { asExecutorService.shutdown() } + override def shutdownNow() = asExecutorService.shutdownNow() + override def isShutdown = asExecutorService.isShutdown + override def isTerminated = asExecutorService.isTerminated + override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit) + override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable) + override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t) + override def submit(runnable: Runnable) = asExecutorService.submit(runnable) + override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables) + override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit) + override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables) + override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit) + } } } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala deleted file mode 100644 index 042d32c234..0000000000 --- a/src/library/scala/concurrent/impl/Future.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent.impl - - - -import scala.concurrent.ExecutionContext -import scala.util.control.NonFatal -import scala.util.{ Success, Failure } - - -private[concurrent] object Future { - class PromiseCompletingRunnable[T](body: => T) extends Runnable { - val promise = new Promise.DefaultPromise[T]() - - override def run() = { - promise complete { - try Success(body) catch { case NonFatal(e) => Failure(e) } - } - } - } - - def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { - val runnable = new PromiseCompletingRunnable(body) - executor.prepare.execute(runnable) - runnable.promise.future - } -} diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 6d2fc5c87c..7fcc8c9f2d 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -8,17 +8,41 @@ package scala.concurrent.impl -import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException, blocking } +import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException } import scala.concurrent.Future.InternalCallbackExecutor -import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration, NANOSECONDS } +import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.annotation.tailrec import scala.util.control.NonFatal import scala.util.{ Try, Success, Failure } -import java.io.ObjectInputStream + import java.util.concurrent.locks.AbstractQueuedSynchronizer +import java.util.concurrent.atomic.AtomicReference private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this + + import scala.concurrent.Future + import scala.concurrent.impl.Promise.DefaultPromise + + override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { + val p = new DefaultPromise[S]() + onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } + p.future + } + + // If possible, link DefaultPromises to avoid space leaks + override def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = { + val p = new DefaultPromise[S]() + onComplete { + v => try f(v) match { + case fut if fut eq this => p complete v.asInstanceOf[Try[S]] + case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p) + case fut => p completeWith fut + } catch { case NonFatal(t) => p failure t } + } + p.future + } + override def toString: String = value match { case Some(result) => "Future("+result+")" case None => "Future(<not completed>)" @@ -27,7 +51,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with sc /* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`. */ -private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { +private final class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it var value: Try[T] = null @@ -93,7 +117,7 @@ private[concurrent] object Promise { * incomplete, or as complete with the same result value. * * A DefaultPromise stores its state entirely in the AnyRef cell exposed by - * AbstractPromise. The type of object stored in the cell fully describes the + * AtomicReference. The type of object stored in the cell fully describes the * current state of the promise. * * 1. List[CallbackRunnable] - The promise is incomplete and has zero or more callbacks @@ -154,8 +178,9 @@ private[concurrent] object Promise { * DefaultPromises, and `linkedRootOf` is currently only designed to be called * by Future.flatMap. */ - class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => - updateState(null, Nil) // The promise is incomplete and has no callbacks + // Left non-final to enable addition of extra fields by Java/Scala converters + // in scala-java8-compat. + class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T] { /** Get the root promise for this promise, compressing the link chain to that * promise if necessary. @@ -171,14 +196,23 @@ private[concurrent] object Promise { * be garbage collected. Also, subsequent calls to this method should be * faster as the link chain will be shorter. */ - @tailrec - private def compressedRoot(): DefaultPromise[T] = { - getState match { - case linked: DefaultPromise[_] => - val target = linked.asInstanceOf[DefaultPromise[T]].root - if (linked eq target) target else if (updateState(linked, target)) target else compressedRoot() + private def compressedRoot(): DefaultPromise[T] = + get() match { + case linked: DefaultPromise[_] => compressedRoot(linked) case _ => this } + + @tailrec + private[this] final def compressedRoot(linked: DefaultPromise[_]): DefaultPromise[T] = { + val target = linked.asInstanceOf[DefaultPromise[T]].root + if (linked eq target) target + else if (compareAndSet(linked, target)) target + else { + get() match { + case newLinked: DefaultPromise[_] => compressedRoot(newLinked) + case _ => this + } + } } /** Get the promise at the root of the chain of linked promises. Used by `compressedRoot()`. @@ -186,18 +220,16 @@ private[concurrent] object Promise { * to compress the link chain whenever possible. */ @tailrec - private def root: DefaultPromise[T] = { - getState match { + private def root: DefaultPromise[T] = + get() match { case linked: DefaultPromise[_] => linked.asInstanceOf[DefaultPromise[T]].root case _ => this } - } /** Try waiting for this promise to be completed. */ protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) { import Duration.Undefined - import scala.concurrent.Future.InternalCallbackExecutor atMost match { case e if e eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period") case Duration.Inf => @@ -218,33 +250,33 @@ private[concurrent] object Promise { @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) - def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + final def ready(atMost: Duration)(implicit permit: CanAwait): this.type = if (tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost + "]") @throws(classOf[Exception]) - def result(atMost: Duration)(implicit permit: CanAwait): T = + final def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here def value: Option[Try[T]] = value0 @tailrec - private def value0: Option[Try[T]] = getState match { + private def value0: Option[Try[T]] = get() match { case c: Try[_] => Some(c.asInstanceOf[Try[T]]) - case _: DefaultPromise[_] => compressedRoot().value0 + case dp: DefaultPromise[_] => compressedRoot(dp).value0 case _ => None } - override def isCompleted: Boolean = isCompleted0 + override final def isCompleted: Boolean = isCompleted0 @tailrec - private def isCompleted0: Boolean = getState match { + private def isCompleted0: Boolean = get() match { case _: Try[_] => true - case _: DefaultPromise[_] => compressedRoot().isCompleted0 + case dp: DefaultPromise[_] => compressedRoot(dp).isCompleted0 case _ => false } - def tryComplete(value: Try[T]): Boolean = { + final def tryComplete(value: Try[T]): Boolean = { val resolved = resolveTry(value) tryCompleteAndGetListeners(resolved) match { case null => false @@ -258,37 +290,34 @@ private[concurrent] object Promise { */ @tailrec private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = { - getState match { + get() match { case raw: List[_] => val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] - if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v) - case _: DefaultPromise[_] => - compressedRoot().tryCompleteAndGetListeners(v) + if (compareAndSet(cur, v)) cur else tryCompleteAndGetListeners(v) + case dp: DefaultPromise[_] => compressedRoot(dp).tryCompleteAndGetListeners(v) case _ => null } } - def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { - val preparedEC = executor.prepare() - val runnable = new CallbackRunnable[T](preparedEC, func) - dispatchOrAddCallback(runnable) - } + final def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = + dispatchOrAddCallback(new CallbackRunnable[T](executor.prepare(), func)) /** Tries to add the callback, if already completed, it dispatches the callback to be executed. * Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks - * to the root promise when linking two promises togehter. + * to the root promise when linking two promises together. */ @tailrec private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = { - getState match { + get() match { case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) - case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable) - case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable) + case dp: DefaultPromise[_] => compressedRoot(dp).dispatchOrAddCallback(runnable) + case listeners: List[_] => if (compareAndSet(listeners, runnable :: listeners)) () + else dispatchOrAddCallback(runnable) } } /** Link this promise to the root of another promise using `link()`. Should only be - * be called by Future.flatMap. + * be called by transformWith. */ protected[concurrent] final def linkRootOf(target: DefaultPromise[T]): Unit = link(target.compressedRoot()) @@ -303,18 +332,17 @@ private[concurrent] object Promise { */ @tailrec private def link(target: DefaultPromise[T]): Unit = if (this ne target) { - getState match { + get() match { case r: Try[_] => - if (!target.tryComplete(r.asInstanceOf[Try[T]])) { - // Currently linking is done from Future.flatMap, which should ensure only - // one promise can be completed. Therefore this situation is unexpected. + if (!target.tryComplete(r.asInstanceOf[Try[T]])) throw new IllegalStateException("Cannot link completed promises together") - } - case _: DefaultPromise[_] => - compressedRoot().link(target) - case listeners: List[_] => if (updateState(listeners, target)) { - if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) - } else link(target) + case dp: DefaultPromise[_] => + compressedRoot(dp).link(target) + case listeners: List[_] if compareAndSet(listeners, target) => + if (listeners.nonEmpty) + listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) + case _ => + link(target) } } } @@ -323,23 +351,58 @@ private[concurrent] object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] { + object KeptPromise { + import scala.concurrent.Future + import scala.reflect.ClassTag + + private[this] sealed trait Kept[T] extends Promise[T] { + def result: Try[T] + + override def value: Option[Try[T]] = Some(result) - val value = Some(resolveTry(suppliedValue)) + override def isCompleted: Boolean = true - override def isCompleted: Boolean = true + override def tryComplete(value: Try[T]): Boolean = false - def tryComplete(value: Try[T]): Boolean = false + override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = + (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result) - def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { - val completedAs = value.get - val preparedEC = executor.prepare() - (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs) + override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + + override def result(atMost: Duration)(implicit permit: CanAwait): T = result.get } - def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + private[this] final class Successful[T](val result: Success[T]) extends Kept[T] { + override def onFailure[U](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = () + override def failed: Future[Throwable] = KeptPromise(Failure(new NoSuchElementException("Future.failed not completed with a throwable."))).future + override def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = this + override def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = this + override def fallbackTo[U >: T](that: Future[U]): Future[U] = this + } - def result(atMost: Duration)(implicit permit: CanAwait): T = value.get.get + private[this] final class Failed[T](val result: Failure[T]) extends Kept[T] { + private[this] final def thisAs[S]: Future[S] = future.asInstanceOf[Future[S]] + + override def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = () + override def failed: Future[Throwable] = KeptPromise(Success(result.exception)).future + override def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = () + override def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = thisAs[S] + override def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = thisAs[S] + override def flatten[S](implicit ev: T <:< Future[S]): Future[S] = thisAs[S] + override def filter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = this + override def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = thisAs[S] + override def zip[U](that: Future[U]): Future[(T, U)] = thisAs[(T,U)] + override def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = thisAs[R] + override def fallbackTo[U >: T](that: Future[U]): Future[U] = + if (this eq that) this else that.recoverWith({ case _ => this })(InternalCallbackExecutor) + override def mapTo[S](implicit tag: ClassTag[S]): Future[S] = thisAs[S] + } + + def apply[T](result: Try[T]): scala.concurrent.Promise[T] = + resolveTry(result) match { + case s @ Success(_) => new Successful(s) + case f @ Failure(_) => new Failed(f) + } } } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index d159dda414..0695ee3351 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -20,14 +20,33 @@ import scala.annotation.implicitNotFound * [[http://docs.scala-lang.org/overviews/core/futures.html]]. * * == Common Imports == - * + * * When working with Futures, you will often find that importing the whole concurrent - * package is convenient, furthermore you are likely to need an implicit ExecutionContext - * in scope for many operations involving Futures and Promises: - * + * package is convenient: + * * {{{ * import scala.concurrent._ - * import ExecutionContext.Implicits.global + * }}} + * + * When using things like `Future`s, it is often required to have an implicit `ExecutionContext` + * in scope. The general advice for these implicits are as follows. + * + * If the code in question is a class or method definition, and no `ExecutionContext` is available, + * request one from the caller by adding an implicit parameter list: + * + * {{{ + * def myMethod(myParam: MyType)(implicit ec: ExecutionContext) = … + * //Or + * class MyClass(myParam: MyType)(implicit ec: ExecutionContext) { … } + * }}} + * + * This allows the caller of the method, or creator of the instance of the class, to decide which + * `ExecutionContext` should be used. + * + * For typical REPL usage and experimentation, importing the global `ExecutionContext` is often desired. + * + * {{{ + * import scala.concurrent.ExcutionContext.Implicits.global * }}} * * == Specifying Durations == @@ -41,7 +60,7 @@ import scala.annotation.implicitNotFound * }}} * * == Using Futures For Non-blocking Computation == - * + * * Basic use of futures is easy with the factory method on Future, which executes a * provided function asynchronously, handing you back a future result of that function * without blocking the current thread. In order to create the Future you will need @@ -50,7 +69,7 @@ import scala.annotation.implicitNotFound * {{{ * import scala.concurrent._ * import ExecutionContext.Implicits.global // implicit execution context - * + * * val firstZebra: Future[Int] = Future { * val source = scala.io.Source.fromFile("/etc/dictionaries-common/words") * source.toSeq.indexOfSlice("zebra") @@ -80,7 +99,7 @@ import scala.annotation.implicitNotFound * animalRange.onSuccess { * case x if x > 500000 => println("It's a long way from Aardvark to Zebra") * } - * }}} + * }}} */ package object concurrent { type ExecutionException = java.util.concurrent.ExecutionException @@ -96,7 +115,7 @@ package object concurrent { * @param executor the execution context on which the future is run * @return the `Future` holding the result of the computation */ - @deprecated("Use `Future { ... }` instead.", "2.11.0") + @deprecated("use `Future { ... }` instead", "2.11.0") // removal planned for 2.13.0 def future[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = Future[T](body) @@ -105,7 +124,7 @@ package object concurrent { * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ - @deprecated("Use `Promise[T]()` instead.", "2.11.0") + @deprecated("use `Promise[T]()` instead", "2.11.0") // removal planned for 2.13.0 def promise[T](): Promise[T] = Promise[T]() @@ -140,17 +159,20 @@ package concurrent { /** * `Await` is what is used to ensure proper handling of blocking for `Awaitable` instances. * - * While occasionally useful, e.g. for testing, it is recommended that you avoid Await - * when possible in favor of callbacks and combinators like onComplete and use in - * for comprehensions. Await will block the thread on which it runs, and could cause - * performance and deadlock issues. + * While occasionally useful, e.g. for testing, it is recommended that you avoid Await whenever possible— + * instead favoring combinators and/or callbacks. + * Await's `result` and `ready` methods will block the calling thread's execution until they return, + * which will cause performance degradation, and possibly, deadlock issues. */ object Await { /** * Await the "completed" state of an `Awaitable`. * * Although this method is blocking, the internal use of [[scala.concurrent.blocking blocking]] ensures that - * the underlying [[ExecutionContext]] is prepared to properly manage the blocking. + * the underlying [[ExecutionContext]] is given an opportunity to properly manage the blocking. + * + * WARNING: It is strongly discouraged to supply lengthy timeouts since the progress of the calling thread will be + * suspended—blocked—until either the `Awaitable` becomes ready or the timeout expires. * * @param awaitable * the `Awaitable` to be awaited @@ -172,7 +194,10 @@ package concurrent { * Await and return the result (of type `T`) of an `Awaitable`. * * Although this method is blocking, the internal use of [[scala.concurrent.blocking blocking]] ensures that - * the underlying [[ExecutionContext]] to properly detect blocking and ensure that there are no deadlocks. + * the underlying [[ExecutionContext]] is given an opportunity to properly manage the blocking. + * + * WARNING: It is strongly discouraged to supply lengthy timeouts since the progress of the calling thread will be + * suspended—blocked—until either the `Awaitable` has a result or the timeout expires. * * @param awaitable * the `Awaitable` to be awaited |