From 1dfce90246f7d334e34d110afb8b1517180995fc Mon Sep 17 00:00:00 2001 From: phaller Date: Tue, 22 May 2012 13:52:18 +0200 Subject: Move implicit ExecutionContext to be determined by lexical scope Port of a pull request originally submitted by @havocp. - declare the invariant that all app callbacks have an associated ExecutionContext provided at the place the callback is passed to a method on Future - always run callbacks in their associated EC - since all callbacks have their own EC, Promise does not need one - "internal" callbacks don't need to defer execution either since we know the ultimate app callback will do so, therefore we can use an immediate executor for these --- src/library/scala/concurrent/Future.scala | 102 ++++++++++++++++++------ src/library/scala/concurrent/Promise.scala | 14 ++-- src/library/scala/concurrent/impl/Future.scala | 2 - src/library/scala/concurrent/impl/Promise.scala | 21 +++-- 4 files changed, 97 insertions(+), 42 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4df2bb63af..c42393eee2 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -82,9 +82,29 @@ import language.higherKinds * {{{ * f flatMap { (x: Int) => g map { (y: Int) => x + y } } * }}} + * + * @define callbackInContext + * The provided callback always runs in the provided implicit + *`ExecutionContext`, though there is no guarantee that the + * `execute()` method on the `ExecutionContext` will be called once + * per callback or that `execute()` will be called in the current + * thread. That is, the implementation may run multiple callbacks + * in a batch within a single `execute()` and it may run + * `execute()` either immediately or asynchronously. */ trait Future[+T] extends Awaitable[T] { + // The executor within the lexical scope + // of the Future trait. Note that this will + // (modulo bugs) _never_ execute a callback + // other than those below in this same file. + // As a nice side benefit, having this implicit + // here forces an ambiguity in those methods + // that also have an executor parameter, which + // keeps us from accidentally forgetting to use + // the executor parameter. + private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor + /* Callbacks */ /** When this future is completed successfully (i.e. with a value), @@ -95,11 +115,12 @@ trait Future[+T] extends Awaitable[T] { * this will either be applied immediately or be scheduled asynchronously. * * $multipleCallbacks + * $callbackInContext */ - def onSuccess[U](pf: PartialFunction[T, U]): Unit = onComplete { + def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete { case Right(v) if pf isDefinedAt v => pf(v) case _ => - } + }(executor) /** When this future is completed with a failure (i.e. with a throwable), * apply the provided callback to the throwable. @@ -112,11 +133,12 @@ trait Future[+T] extends Awaitable[T] { * Will not be called in case that the future is completed with a value. * * $multipleCallbacks + * $callbackInContext */ - def onFailure[U](callback: PartialFunction[Throwable, U]): Unit = onComplete { + def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t) case _ => - } + }(executor) /** When this future is completed, either through an exception, or a value, * apply the provided function. @@ -125,8 +147,9 @@ trait Future[+T] extends Awaitable[T] { * this will either be applied immediately or be scheduled asynchronously. * * $multipleCallbacks + * $callbackInContext */ - def onComplete[U](func: Either[Throwable, T] => U): Unit + def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit /* Miscellaneous */ @@ -182,10 +205,10 @@ trait Future[+T] extends Awaitable[T] { * * Will not be called if the future fails. */ - def foreach[U](f: T => U): Unit = onComplete { + def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { case Right(r) => f(r) case _ => // do nothing - } + }(executor) /** Creates a new future by applying the 's' function to the successful result of * this future, or the 'f' function to the failed result. If there is any non-fatal @@ -198,7 +221,7 @@ trait Future[+T] extends Awaitable[T] { * the returned future * @return a future that will be completed with the transformed value */ - def transform[S](s: T => S, f: Throwable => Throwable): Future[S] = { + def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = { val p = Promise[S]() onComplete { @@ -211,7 +234,7 @@ trait Future[+T] extends Awaitable[T] { } catch { case NonFatal(t) => p failure t } - } + }(executor) p.future } @@ -222,7 +245,7 @@ trait Future[+T] extends Awaitable[T] { * * $forComprehensionExamples */ - def map[S](f: T => S): Future[S] = { // transform(f, identity) + def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity) val p = Promise[S]() onComplete { @@ -235,7 +258,7 @@ trait Future[+T] extends Awaitable[T] { } catch { case NonFatal(t) => p failure t } - } + }(executor) p.future } @@ -247,21 +270,21 @@ trait Future[+T] extends Awaitable[T] { * * $forComprehensionExamples */ - def flatMap[S](f: T => Future[S]): Future[S] = { + def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { val p = Promise[S]() onComplete { case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] case Right(v) => try { - f(v) onComplete { + f(v).onComplete({ case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] case Right(v) => p success v - } + })(internalExecutor) } catch { case NonFatal(t) => p failure t } - } + }(executor) p.future } @@ -282,7 +305,7 @@ trait Future[+T] extends Awaitable[T] { * await(h, 0) // throw a NoSuchElementException * }}} */ - def filter(pred: T => Boolean): Future[T] = { + def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() onComplete { @@ -294,14 +317,14 @@ trait Future[+T] extends Awaitable[T] { } catch { case NonFatal(t) => p failure t } - } + }(executor) p.future } /** Used by for-comprehensions. */ - final def withFilter(p: T => Boolean): Future[T] = filter(p) + final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor) // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) { @@ -331,7 +354,7 @@ trait Future[+T] extends Awaitable[T] { * await(h, 0) // throw a NoSuchElementException * }}} */ - def collect[S](pf: PartialFunction[T, S]): Future[S] = { + def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = { val p = Promise[S]() onComplete { @@ -343,7 +366,7 @@ trait Future[+T] extends Awaitable[T] { } catch { case NonFatal(t) => p failure t } - } + }(executor) p.future } @@ -360,7 +383,7 @@ trait Future[+T] extends Awaitable[T] { * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3 * }}} */ - def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = { + def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = { val p = Promise[U]() onComplete { @@ -370,7 +393,7 @@ trait Future[+T] extends Awaitable[T] { case NonFatal(t) => p failure t } case otherwise => p complete otherwise - } + }(executor) p.future } @@ -388,7 +411,7 @@ trait Future[+T] extends Awaitable[T] { * future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue * }}} */ - def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { + def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = { val p = Promise[U]() onComplete { @@ -399,7 +422,7 @@ trait Future[+T] extends Awaitable[T] { case NonFatal(t) => p failure t } case otherwise => p complete otherwise - } + }(executor) p.future } @@ -498,12 +521,12 @@ trait Future[+T] extends Awaitable[T] { * } * }}} */ - def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = { + def andThen[U](pf: PartialFunction[Either[Throwable, T], U])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() onComplete { case r => try if (pf isDefinedAt r) pf(r) finally p complete r - } + }(executor) p.future } @@ -656,6 +679,33 @@ object Future { for (r <- fr; b <- fb) yield (r += b) }.map(_.result) + // This is used to run callbacks which are internal + // to scala.concurrent; our own callbacks are only + // ever used to eventually run another callback, + // and that other callback will have its own + // executor because all callbacks come with + // an executor. Our own callbacks never block + // and have no "expected" exceptions. + // As a result, this executor can do nothing; + // some other executor will always come after + // it (and sometimes one will be before it), + // and those will be performing the "real" + // dispatch to code outside scala.concurrent. + // Because this exists, ExecutionContext.defaultExecutionContext + // isn't instantiated by Future internals, so + // if some code for some reason wants to avoid + // ever starting up the default context, it can do so + // by just not ever using it itself. scala.concurrent + // doesn't need to create defaultExecutionContext as + // a side effect. + private[concurrent] object InternalCallbackExecutor extends ExecutionContext { + def execute(runnable: Runnable): Unit = + runnable.run() + def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = + throw new IllegalStateException("bug in scala.concurrent, called blocking() from internal callback") + def reportFailure(t: Throwable): Unit = + throw new IllegalStateException("problem in scala.concurrent internal callback", t) + } } diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 2e9de4a0d0..578642966f 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -25,6 +25,11 @@ package scala.concurrent */ trait Promise[T] { + // used for internal callbacks defined in + // the lexical scope of this trait; + // _never_ for application callbacks. + private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor + /** Future containing the value of this promise. */ def future: Future[T] @@ -106,26 +111,23 @@ object Promise { /** Creates a promise object which can be completed with a value. * * @tparam T the type of the value in the promise - * @param executor the execution context on which the promise is created on * @return the newly created `Promise` object */ - def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]() + def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]() /** Creates an already completed Promise with the specified exception. * * @tparam T the type of the value in the promise - * @param executor the execution context on which the promise is created on * @return the newly created `Promise` object */ - def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception)) + def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception)) /** Creates an already completed Promise with the specified result. * * @tparam T the type of the value in the promise - * @param executor the execution context on which the promise is created on * @return the newly created `Promise` object */ - def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Right(result)) + def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Right(result)) } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index a54e81bd05..47534e398b 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -17,8 +17,6 @@ import scala.collection.mutable.Stack private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { - implicit def executor: ExecutionContext - } private[concurrent] object Future { diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 78053f5117..1d573ef818 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -42,7 +42,7 @@ object Promise { /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => + class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => updateState(null, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU protected final def tryAwait(atMost: Duration): Boolean = { @@ -108,21 +108,26 @@ object Promise { }) match { case null => false case cs if cs.isEmpty => true - case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, resolved))); true + // this assumes that f(resolved) will go via dispatchFuture + // and notifyCompleted (see onComplete below) + case cs => cs.foreach(f => f(resolved)); true } } - def onComplete[U](func: Either[Throwable, T] => U): Unit = { + def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { + val bound: Either[Throwable, T] => Unit = (either: Either[Throwable, T]) => + Future.dispatchFuture(executor, () => notifyCompleted(func, either)) + @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { - case r: Either[_, _] => Future.dispatchFuture(executor, () => notifyCompleted(func, r.asInstanceOf[Either[Throwable, T]])) - case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback() + case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]]) + case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() } - private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { + private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) { try { func(result) } catch { @@ -135,7 +140,7 @@ object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { + final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { val value = Some(resolveEither(suppliedValue)) @@ -143,7 +148,7 @@ object Promise { def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete[U](func: Either[Throwable, T] => U): Unit = { + def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get // Avoid closing over "this" Future.dispatchFuture(executor, () => func(completedAs)) } -- cgit v1.2.3