From 08a74e55c15a8102d2b9184b29c5bc7474a4dd91 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 14 Jan 2013 11:53:55 +0100 Subject: Fix SI-6932 by enabling linearization of callback execution for the internal execution context of Future --- .../scala/concurrent/BatchingExecutor.scala | 126 +++++++++++++++++++++ src/library/scala/concurrent/Future.scala | 6 +- test/files/jvm/scala-concurrent-tck.scala | 7 ++ 3 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 src/library/scala/concurrent/BatchingExecutor.scala diff --git a/src/library/scala/concurrent/BatchingExecutor.scala b/src/library/scala/concurrent/BatchingExecutor.scala new file mode 100644 index 0000000000..3ab4bd88c5 --- /dev/null +++ b/src/library/scala/concurrent/BatchingExecutor.scala @@ -0,0 +1,126 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import java.util.concurrent.{ Executor } +import scala.concurrent._ +import scala.annotation.tailrec + +/** + * All Batchables are automatically batched when submitted to a BatchingExecutor + */ +private[concurrent] trait Batchable extends Runnable { + def isBatchable: Boolean +} + +/** + * Mixin trait for an Executor + * which groups multiple nested `Runnable.run()` calls + * into a single Runnable passed to the original + * Executor. This can be a useful optimization + * because it bypasses the original context's task + * queue and keeps related (nested) code on a single + * thread which may improve CPU affinity. However, + * if tasks passed to the Executor are blocking + * or expensive, this optimization can prevent work-stealing + * and make performance worse. Also, some ExecutionContext + * may be fast enough natively that this optimization just + * adds overhead. + * The default ExecutionContext.global is already batching + * or fast enough not to benefit from it; while + * `fromExecutor` and `fromExecutorService` do NOT add + * this optimization since they don't know whether the underlying + * executor will benefit from it. + * A batching executor can create deadlocks if code does + * not use `scala.concurrent.blocking` when it should, + * because tasks created within other tasks will block + * on the outer task completing. + * This executor may run tasks in any order, including LIFO order. + * There are no ordering guarantees. + * + * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable + * in the calling thread synchronously. It must enqueue/handoff the Runnable. + */ +private[concurrent] trait BatchingExecutor extends Executor { + + // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside + private val _tasksLocal = new ThreadLocal[List[Runnable]]() + + private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext { + private var parentBlockContext: BlockContext = _ + // this method runs in the delegate ExecutionContext's thread + override def run(): Unit = { + require(_tasksLocal.get eq null) + + val prevBlockContext = BlockContext.current + BlockContext.withBlockContext(this) { + try { + parentBlockContext = prevBlockContext + + @tailrec def processBatch(batch: List[Runnable]): Unit = batch match { + case Nil ⇒ () + case head :: tail ⇒ + _tasksLocal set tail + try { + head.run() + } catch { + case t: Throwable ⇒ + // if one task throws, move the + // remaining tasks to another thread + // so we can throw the exception + // up to the invoking executor + val remaining = _tasksLocal.get + _tasksLocal set Nil + unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails? + throw t // rethrow + } + processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here + } + + processBatch(initial) + } finally { + _tasksLocal.remove() + parentBlockContext = null + } + } + } + + override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = { + // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. + { + val tasks = _tasksLocal.get + _tasksLocal set Nil + if ((tasks ne null) && tasks.nonEmpty) + unbatchedExecute(new Batch(tasks)) + } + + // now delegate the blocking to the previous BC + require(parentBlockContext ne null) + parentBlockContext.blockOn(thunk) + } + } + + protected def unbatchedExecute(r: Runnable): Unit + + override def execute(runnable: Runnable): Unit = { + if (batchable(runnable)) { // If we can batch the runnable + _tasksLocal.get match { + case null ⇒ unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch + case some ⇒ _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch + } + } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying + } + + /** Override this to define which runnables will be batched. */ + def batchable(runnable: Runnable): Boolean = runnable match { + case b: Batchable ⇒ b.isBatchable + case _: scala.concurrent.OnCompleteRunnable ⇒ true + case _ ⇒ false + } +} diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4b9e74708d..36f3be341f 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -675,9 +675,9 @@ object Future { // by just not ever using it itself. scala.concurrent // doesn't need to create defaultExecutionContext as // a side effect. - private[concurrent] object InternalCallbackExecutor extends ExecutionContext { - override def execute(runnable: Runnable): Unit = - runnable.run() + private[concurrent] object InternalCallbackExecutor extends ExecutionContext with BatchingExecutor { + override protected def unbatchedExecute(r: Runnable): Unit = + r.run() override def reportFailure(t: Throwable): Unit = throw new IllegalStateException("problem in scala.concurrent internal callback", t) } diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index b529bca38a..b2b4183564 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -134,6 +134,12 @@ trait FutureCallbacks extends TestBase { assert(false) } } + + def testThatNestedCallbacksDoNotYieldStackOverflow(): Unit = { + val promise = Promise[Int] + (0 to 10000).map(Future(_)).foldLeft(promise.future)((f1, f2) => f2.flatMap(i => f1)) + promise.success(-1) + } testOnSuccess() testOnSuccessWhenCompleted() @@ -143,6 +149,7 @@ trait FutureCallbacks extends TestBase { // testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) //TODO: this test is currently problematic, because NonFatal does not match InterruptedException //testOnFailureWhenSpecialThrowable(7, new InterruptedException) + testThatNestedCallbacksDoNotYieldStackOverflow() testOnFailureWhenTimeoutException() } -- cgit v1.2.3