diff options
author | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-01-27 22:36:11 -0800 |
---|---|---|
committer | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-01-27 22:36:11 -0800 |
commit | 02963d724c512251ce66502226408091686989ee (patch) | |
tree | c4cd2e4337a4986b48e15dc8bc1e80c11a2b27e7 /src | |
parent | 1f24a11d3c3d450f8d27dd34952b8912ed9ced12 (diff) | |
parent | 262d7ec854ee32327188c292b54dd94ba2c7010c (diff) | |
download | scala-02963d724c512251ce66502226408091686989ee.tar.gz scala-02963d724c512251ce66502226408091686989ee.tar.bz2 scala-02963d724c512251ce66502226408091686989ee.zip |
Merge pull request #1941 from phaller/issue/6932-futures-internal-callbacks
SI-6932 StackOverflowError in chained Future.flatMap calls
Diffstat (limited to 'src')
-rw-r--r-- | src/library/scala/concurrent/BatchingExecutor.scala | 117 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 6 |
2 files changed, 120 insertions, 3 deletions
diff --git a/src/library/scala/concurrent/BatchingExecutor.scala b/src/library/scala/concurrent/BatchingExecutor.scala new file mode 100644 index 0000000000..a0d7aaea47 --- /dev/null +++ b/src/library/scala/concurrent/BatchingExecutor.scala @@ -0,0 +1,117 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import java.util.concurrent.Executor +import scala.annotation.tailrec + +/** + * Mixin trait for an Executor + * which groups multiple nested `Runnable.run()` calls + * into a single Runnable passed to the original + * Executor. This can be a useful optimization + * because it bypasses the original context's task + * queue and keeps related (nested) code on a single + * thread which may improve CPU affinity. However, + * if tasks passed to the Executor are blocking + * or expensive, this optimization can prevent work-stealing + * and make performance worse. Also, some ExecutionContext + * may be fast enough natively that this optimization just + * adds overhead. + * The default ExecutionContext.global is already batching + * or fast enough not to benefit from it; while + * `fromExecutor` and `fromExecutorService` do NOT add + * this optimization since they don't know whether the underlying + * executor will benefit from it. + * A batching executor can create deadlocks if code does + * not use `scala.concurrent.blocking` when it should, + * because tasks created within other tasks will block + * on the outer task completing. + * This executor may run tasks in any order, including LIFO order. + * There are no ordering guarantees. + * + * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable + * in the calling thread synchronously. It must enqueue/handoff the Runnable. + */ +private[concurrent] trait BatchingExecutor extends Executor { + + // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside + private val _tasksLocal = new ThreadLocal[List[Runnable]]() + + private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext { + private var parentBlockContext: BlockContext = _ + // this method runs in the delegate ExecutionContext's thread + override def run(): Unit = { + require(_tasksLocal.get eq null) + + val prevBlockContext = BlockContext.current + BlockContext.withBlockContext(this) { + try { + parentBlockContext = prevBlockContext + + @tailrec def processBatch(batch: List[Runnable]): Unit = batch match { + case Nil => () + case head :: tail => + _tasksLocal set tail + try { + head.run() + } catch { + case t: Throwable => + // if one task throws, move the + // remaining tasks to another thread + // so we can throw the exception + // up to the invoking executor + val remaining = _tasksLocal.get + _tasksLocal set Nil + unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails? + throw t // rethrow + } + processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here + } + + processBatch(initial) + } finally { + _tasksLocal.remove() + parentBlockContext = null + } + } + } + + override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. + { + val tasks = _tasksLocal.get + _tasksLocal set Nil + if ((tasks ne null) && tasks.nonEmpty) + unbatchedExecute(new Batch(tasks)) + } + + // now delegate the blocking to the previous BC + require(parentBlockContext ne null) + parentBlockContext.blockOn(thunk) + } + } + + protected def unbatchedExecute(r: Runnable): Unit + + override def execute(runnable: Runnable): Unit = { + if (batchable(runnable)) { // If we can batch the runnable + _tasksLocal.get match { + case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch + case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch + } + } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying + } + + /** Override this to define which runnables will be batched. */ + def batchable(runnable: Runnable): Boolean = runnable match { + case _: OnCompleteRunnable => true + case _ => false + } +} diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4b9e74708d..36f3be341f 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -675,9 +675,9 @@ object Future { // by just not ever using it itself. scala.concurrent // doesn't need to create defaultExecutionContext as // a side effect. - private[concurrent] object InternalCallbackExecutor extends ExecutionContext { - override def execute(runnable: Runnable): Unit = - runnable.run() + private[concurrent] object InternalCallbackExecutor extends ExecutionContext with BatchingExecutor { + override protected def unbatchedExecute(r: Runnable): Unit = + r.run() override def reportFailure(t: Throwable): Unit = throw new IllegalStateException("problem in scala.concurrent internal callback", t) } |