diff options
author | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-02-08 17:10:20 -0800 |
---|---|---|
committer | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-02-09 18:11:12 -0800 |
commit | 56cbf233c1b3d62def022f09702f525c63141a0f (patch) | |
tree | 36322ac6c9c4f0cb75b3f44ba6c572469e7164ab | |
parent | 549a1fea9366af0c0a4697bc39e3a75bd8f066f5 (diff) | |
download | scala-56cbf233c1b3d62def022f09702f525c63141a0f.tar.gz scala-56cbf233c1b3d62def022f09702f525c63141a0f.tar.bz2 scala-56cbf233c1b3d62def022f09702f525c63141a0f.zip |
[nomaster] can't add new class BatchingExecutor
This is necessary to maintain binary compatibility with 2.10.0.
-rw-r--r-- | bincompat-forward.whitelist.conf | 20 | ||||
-rw-r--r-- | src/library/scala/concurrent/BatchingExecutor.scala | 117 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 106 |
3 files changed, 113 insertions, 130 deletions
diff --git a/bincompat-forward.whitelist.conf b/bincompat-forward.whitelist.conf index cb8191fbcc..ab43b3015f 100644 --- a/bincompat-forward.whitelist.conf +++ b/bincompat-forward.whitelist.conf @@ -34,16 +34,16 @@ filter { # matchName="scala.util.Random$" # problemName=MissingTypesProblem # }, - { - # private[concurrent] - matchName="scala.concurrent.BatchingExecutor$Batch" - problemName=MissingClassProblem - }, - { - # private[concurrent] - matchName="scala.concurrent.BatchingExecutor" - problemName=MissingClassProblem - }, + # { + # # private[concurrent] + # matchName="scala.concurrent.BatchingExecutor$Batch" + # problemName=MissingClassProblem + # }, + # { + # # private[concurrent] + # matchName="scala.concurrent.BatchingExecutor" + # problemName=MissingClassProblem + # }, { # private[concurrent] matchName="scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask" diff --git a/src/library/scala/concurrent/BatchingExecutor.scala b/src/library/scala/concurrent/BatchingExecutor.scala deleted file mode 100644 index a0d7aaea47..0000000000 --- a/src/library/scala/concurrent/BatchingExecutor.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - -import java.util.concurrent.Executor -import scala.annotation.tailrec - -/** - * Mixin trait for an Executor - * which groups multiple nested `Runnable.run()` calls - * into a single Runnable passed to the original - * Executor. This can be a useful optimization - * because it bypasses the original context's task - * queue and keeps related (nested) code on a single - * thread which may improve CPU affinity. However, - * if tasks passed to the Executor are blocking - * or expensive, this optimization can prevent work-stealing - * and make performance worse. Also, some ExecutionContext - * may be fast enough natively that this optimization just - * adds overhead. - * The default ExecutionContext.global is already batching - * or fast enough not to benefit from it; while - * `fromExecutor` and `fromExecutorService` do NOT add - * this optimization since they don't know whether the underlying - * executor will benefit from it. - * A batching executor can create deadlocks if code does - * not use `scala.concurrent.blocking` when it should, - * because tasks created within other tasks will block - * on the outer task completing. - * This executor may run tasks in any order, including LIFO order. - * There are no ordering guarantees. - * - * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable - * in the calling thread synchronously. It must enqueue/handoff the Runnable. - */ -private[concurrent] trait BatchingExecutor extends Executor { - - // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside - private val _tasksLocal = new ThreadLocal[List[Runnable]]() - - private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext { - private var parentBlockContext: BlockContext = _ - // this method runs in the delegate ExecutionContext's thread - override def run(): Unit = { - require(_tasksLocal.get eq null) - - val prevBlockContext = BlockContext.current - BlockContext.withBlockContext(this) { - try { - parentBlockContext = prevBlockContext - - @tailrec def processBatch(batch: List[Runnable]): Unit = batch match { - case Nil => () - case head :: tail => - _tasksLocal set tail - try { - head.run() - } catch { - case t: Throwable => - // if one task throws, move the - // remaining tasks to another thread - // so we can throw the exception - // up to the invoking executor - val remaining = _tasksLocal.get - _tasksLocal set Nil - unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails? - throw t // rethrow - } - processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here - } - - processBatch(initial) - } finally { - _tasksLocal.remove() - parentBlockContext = null - } - } - } - - override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { - // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. - { - val tasks = _tasksLocal.get - _tasksLocal set Nil - if ((tasks ne null) && tasks.nonEmpty) - unbatchedExecute(new Batch(tasks)) - } - - // now delegate the blocking to the previous BC - require(parentBlockContext ne null) - parentBlockContext.blockOn(thunk) - } - } - - protected def unbatchedExecute(r: Runnable): Unit - - override def execute(runnable: Runnable): Unit = { - if (batchable(runnable)) { // If we can batch the runnable - _tasksLocal.get match { - case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch - case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch - } - } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying - } - - /** Override this to define which runnables will be batched. */ - def batchable(runnable: Runnable): Boolean = runnable match { - case _: OnCompleteRunnable => true - case _ => false - } -} diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 36f3be341f..5a51e97072 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -675,11 +675,111 @@ object Future { // by just not ever using it itself. scala.concurrent // doesn't need to create defaultExecutionContext as // a side effect. - private[concurrent] object InternalCallbackExecutor extends ExecutionContext with BatchingExecutor { - override protected def unbatchedExecute(r: Runnable): Unit = - r.run() + private[concurrent] object InternalCallbackExecutor extends ExecutionContext with java.util.concurrent.Executor { override def reportFailure(t: Throwable): Unit = throw new IllegalStateException("problem in scala.concurrent internal callback", t) + + /** + * The BatchingExecutor trait had to be inlined into InternalCallbackExecutor for binary compatibility. + * + * BatchingExecutor is a trait for an Executor + * which groups multiple nested `Runnable.run()` calls + * into a single Runnable passed to the original + * Executor. This can be a useful optimization + * because it bypasses the original context's task + * queue and keeps related (nested) code on a single + * thread which may improve CPU affinity. However, + * if tasks passed to the Executor are blocking + * or expensive, this optimization can prevent work-stealing + * and make performance worse. Also, some ExecutionContext + * may be fast enough natively that this optimization just + * adds overhead. + * The default ExecutionContext.global is already batching + * or fast enough not to benefit from it; while + * `fromExecutor` and `fromExecutorService` do NOT add + * this optimization since they don't know whether the underlying + * executor will benefit from it. + * A batching executor can create deadlocks if code does + * not use `scala.concurrent.blocking` when it should, + * because tasks created within other tasks will block + * on the outer task completing. + * This executor may run tasks in any order, including LIFO order. + * There are no ordering guarantees. + * + * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable + * in the calling thread synchronously. It must enqueue/handoff the Runnable. + */ + // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside + private val _tasksLocal = new ThreadLocal[List[Runnable]]() + + private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext { + private[this] var parentBlockContext: BlockContext = _ + // this method runs in the delegate ExecutionContext's thread + override def run(): Unit = { + require(_tasksLocal.get eq null) + + val prevBlockContext = BlockContext.current + BlockContext.withBlockContext(this) { + try { + parentBlockContext = prevBlockContext + + @tailrec def processBatch(batch: List[Runnable]): Unit = batch match { + case Nil => () + case head :: tail => + _tasksLocal set tail + try { + head.run() + } catch { + case t: Throwable => + // if one task throws, move the + // remaining tasks to another thread + // so we can throw the exception + // up to the invoking executor + val remaining = _tasksLocal.get + _tasksLocal set Nil + unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails? + throw t // rethrow + } + processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here + } + + processBatch(initial) + } finally { + _tasksLocal.remove() + parentBlockContext = null + } + } + } + + override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. + { + val tasks = _tasksLocal.get + _tasksLocal set Nil + if ((tasks ne null) && tasks.nonEmpty) + unbatchedExecute(new Batch(tasks)) + } + + // now delegate the blocking to the previous BC + require(parentBlockContext ne null) + parentBlockContext.blockOn(thunk) + } + } + + override def execute(runnable: Runnable): Unit = runnable match { + // If we can batch the runnable + case _: OnCompleteRunnable => + _tasksLocal.get match { + case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch + case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch + } + + // If not batchable, just delegate to underlying + case _ => + unbatchedExecute(runnable) + } + + private def unbatchedExecute(r: Runnable): Unit = r.run() } } |