summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdriaan Moors <adriaan.moors@typesafe.com>2013-02-08 17:10:20 -0800
committerAdriaan Moors <adriaan.moors@typesafe.com>2013-02-09 18:11:12 -0800
commit56cbf233c1b3d62def022f09702f525c63141a0f (patch)
tree36322ac6c9c4f0cb75b3f44ba6c572469e7164ab
parent549a1fea9366af0c0a4697bc39e3a75bd8f066f5 (diff)
downloadscala-56cbf233c1b3d62def022f09702f525c63141a0f.tar.gz
scala-56cbf233c1b3d62def022f09702f525c63141a0f.tar.bz2
scala-56cbf233c1b3d62def022f09702f525c63141a0f.zip
[nomaster] can't add new class BatchingExecutor
This is necessary to maintain binary compatibility with 2.10.0.
-rw-r--r--bincompat-forward.whitelist.conf20
-rw-r--r--src/library/scala/concurrent/BatchingExecutor.scala117
-rw-r--r--src/library/scala/concurrent/Future.scala106
3 files changed, 113 insertions, 130 deletions
diff --git a/bincompat-forward.whitelist.conf b/bincompat-forward.whitelist.conf
index cb8191fbcc..ab43b3015f 100644
--- a/bincompat-forward.whitelist.conf
+++ b/bincompat-forward.whitelist.conf
@@ -34,16 +34,16 @@ filter {
# matchName="scala.util.Random$"
# problemName=MissingTypesProblem
# },
- {
- # private[concurrent]
- matchName="scala.concurrent.BatchingExecutor$Batch"
- problemName=MissingClassProblem
- },
- {
- # private[concurrent]
- matchName="scala.concurrent.BatchingExecutor"
- problemName=MissingClassProblem
- },
+ # {
+ # # private[concurrent]
+ # matchName="scala.concurrent.BatchingExecutor$Batch"
+ # problemName=MissingClassProblem
+ # },
+ # {
+ # # private[concurrent]
+ # matchName="scala.concurrent.BatchingExecutor"
+ # problemName=MissingClassProblem
+ # },
{
# private[concurrent]
matchName="scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask"
diff --git a/src/library/scala/concurrent/BatchingExecutor.scala b/src/library/scala/concurrent/BatchingExecutor.scala
deleted file mode 100644
index a0d7aaea47..0000000000
--- a/src/library/scala/concurrent/BatchingExecutor.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-package scala.concurrent
-
-import java.util.concurrent.Executor
-import scala.annotation.tailrec
-
-/**
- * Mixin trait for an Executor
- * which groups multiple nested `Runnable.run()` calls
- * into a single Runnable passed to the original
- * Executor. This can be a useful optimization
- * because it bypasses the original context's task
- * queue and keeps related (nested) code on a single
- * thread which may improve CPU affinity. However,
- * if tasks passed to the Executor are blocking
- * or expensive, this optimization can prevent work-stealing
- * and make performance worse. Also, some ExecutionContext
- * may be fast enough natively that this optimization just
- * adds overhead.
- * The default ExecutionContext.global is already batching
- * or fast enough not to benefit from it; while
- * `fromExecutor` and `fromExecutorService` do NOT add
- * this optimization since they don't know whether the underlying
- * executor will benefit from it.
- * A batching executor can create deadlocks if code does
- * not use `scala.concurrent.blocking` when it should,
- * because tasks created within other tasks will block
- * on the outer task completing.
- * This executor may run tasks in any order, including LIFO order.
- * There are no ordering guarantees.
- *
- * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
- * in the calling thread synchronously. It must enqueue/handoff the Runnable.
- */
-private[concurrent] trait BatchingExecutor extends Executor {
-
- // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside
- private val _tasksLocal = new ThreadLocal[List[Runnable]]()
-
- private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext {
- private var parentBlockContext: BlockContext = _
- // this method runs in the delegate ExecutionContext's thread
- override def run(): Unit = {
- require(_tasksLocal.get eq null)
-
- val prevBlockContext = BlockContext.current
- BlockContext.withBlockContext(this) {
- try {
- parentBlockContext = prevBlockContext
-
- @tailrec def processBatch(batch: List[Runnable]): Unit = batch match {
- case Nil => ()
- case head :: tail =>
- _tasksLocal set tail
- try {
- head.run()
- } catch {
- case t: Throwable =>
- // if one task throws, move the
- // remaining tasks to another thread
- // so we can throw the exception
- // up to the invoking executor
- val remaining = _tasksLocal.get
- _tasksLocal set Nil
- unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails?
- throw t // rethrow
- }
- processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here
- }
-
- processBatch(initial)
- } finally {
- _tasksLocal.remove()
- parentBlockContext = null
- }
- }
- }
-
- override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
- // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
- {
- val tasks = _tasksLocal.get
- _tasksLocal set Nil
- if ((tasks ne null) && tasks.nonEmpty)
- unbatchedExecute(new Batch(tasks))
- }
-
- // now delegate the blocking to the previous BC
- require(parentBlockContext ne null)
- parentBlockContext.blockOn(thunk)
- }
- }
-
- protected def unbatchedExecute(r: Runnable): Unit
-
- override def execute(runnable: Runnable): Unit = {
- if (batchable(runnable)) { // If we can batch the runnable
- _tasksLocal.get match {
- case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
- case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch
- }
- } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying
- }
-
- /** Override this to define which runnables will be batched. */
- def batchable(runnable: Runnable): Boolean = runnable match {
- case _: OnCompleteRunnable => true
- case _ => false
- }
-}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 36f3be341f..5a51e97072 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -675,11 +675,111 @@ object Future {
// by just not ever using it itself. scala.concurrent
// doesn't need to create defaultExecutionContext as
// a side effect.
- private[concurrent] object InternalCallbackExecutor extends ExecutionContext with BatchingExecutor {
- override protected def unbatchedExecute(r: Runnable): Unit =
- r.run()
+ private[concurrent] object InternalCallbackExecutor extends ExecutionContext with java.util.concurrent.Executor {
override def reportFailure(t: Throwable): Unit =
throw new IllegalStateException("problem in scala.concurrent internal callback", t)
+
+ /**
+ * The BatchingExecutor trait had to be inlined into InternalCallbackExecutor for binary compatibility.
+ *
+ * BatchingExecutor is a trait for an Executor
+ * which groups multiple nested `Runnable.run()` calls
+ * into a single Runnable passed to the original
+ * Executor. This can be a useful optimization
+ * because it bypasses the original context's task
+ * queue and keeps related (nested) code on a single
+ * thread which may improve CPU affinity. However,
+ * if tasks passed to the Executor are blocking
+ * or expensive, this optimization can prevent work-stealing
+ * and make performance worse. Also, some ExecutionContext
+ * may be fast enough natively that this optimization just
+ * adds overhead.
+ * The default ExecutionContext.global is already batching
+ * or fast enough not to benefit from it; while
+ * `fromExecutor` and `fromExecutorService` do NOT add
+ * this optimization since they don't know whether the underlying
+ * executor will benefit from it.
+ * A batching executor can create deadlocks if code does
+ * not use `scala.concurrent.blocking` when it should,
+ * because tasks created within other tasks will block
+ * on the outer task completing.
+ * This executor may run tasks in any order, including LIFO order.
+ * There are no ordering guarantees.
+ *
+ * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
+ * in the calling thread synchronously. It must enqueue/handoff the Runnable.
+ */
+ // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside
+ private val _tasksLocal = new ThreadLocal[List[Runnable]]()
+
+ private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext {
+ private[this] var parentBlockContext: BlockContext = _
+ // this method runs in the delegate ExecutionContext's thread
+ override def run(): Unit = {
+ require(_tasksLocal.get eq null)
+
+ val prevBlockContext = BlockContext.current
+ BlockContext.withBlockContext(this) {
+ try {
+ parentBlockContext = prevBlockContext
+
+ @tailrec def processBatch(batch: List[Runnable]): Unit = batch match {
+ case Nil => ()
+ case head :: tail =>
+ _tasksLocal set tail
+ try {
+ head.run()
+ } catch {
+ case t: Throwable =>
+ // if one task throws, move the
+ // remaining tasks to another thread
+ // so we can throw the exception
+ // up to the invoking executor
+ val remaining = _tasksLocal.get
+ _tasksLocal set Nil
+ unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails?
+ throw t // rethrow
+ }
+ processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here
+ }
+
+ processBatch(initial)
+ } finally {
+ _tasksLocal.remove()
+ parentBlockContext = null
+ }
+ }
+ }
+
+ override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
+ // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
+ {
+ val tasks = _tasksLocal.get
+ _tasksLocal set Nil
+ if ((tasks ne null) && tasks.nonEmpty)
+ unbatchedExecute(new Batch(tasks))
+ }
+
+ // now delegate the blocking to the previous BC
+ require(parentBlockContext ne null)
+ parentBlockContext.blockOn(thunk)
+ }
+ }
+
+ override def execute(runnable: Runnable): Unit = runnable match {
+ // If we can batch the runnable
+ case _: OnCompleteRunnable =>
+ _tasksLocal.get match {
+ case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
+ case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch
+ }
+
+ // If not batchable, just delegate to underlying
+ case _ =>
+ unbatchedExecute(runnable)
+ }
+
+ private def unbatchedExecute(r: Runnable): Unit = r.run()
}
}