diff options
author | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-02-27 15:05:16 -0800 |
---|---|---|
committer | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-02-27 15:06:16 -0800 |
commit | 234d05d52b5a2985e53f9cb6877001b3c8fc780e (patch) | |
tree | 9ddac0344649cecde3504fb8f9834e782f8fc756 /src/library/scala/concurrent/Future.scala | |
parent | 031c5be557ed49f02ab365d64e55f30c616a5939 (diff) | |
parent | 4d1a1f7ee596fcec4fe51a505aceb323710d893f (diff) | |
download | scala-234d05d52b5a2985e53f9cb6877001b3c8fc780e.tar.gz scala-234d05d52b5a2985e53f9cb6877001b3c8fc780e.tar.bz2 scala-234d05d52b5a2985e53f9cb6877001b3c8fc780e.zip |
Merge 2.10.1 into 2.10.x.
Diffstat (limited to 'src/library/scala/concurrent/Future.scala')
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 106 |
1 files changed, 103 insertions, 3 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 0670da137c..6b6ad29074 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -675,11 +675,111 @@ object Future { // by just not ever using it itself. scala.concurrent // doesn't need to create defaultExecutionContext as // a side effect. - private[concurrent] object InternalCallbackExecutor extends ExecutionContext with BatchingExecutor { - override protected def unbatchedExecute(r: Runnable): Unit = - r.run() + private[concurrent] object InternalCallbackExecutor extends ExecutionContext with java.util.concurrent.Executor { override def reportFailure(t: Throwable): Unit = throw new IllegalStateException("problem in scala.concurrent internal callback", t) + + /** + * The BatchingExecutor trait had to be inlined into InternalCallbackExecutor for binary compatibility. + * + * BatchingExecutor is a trait for an Executor + * which groups multiple nested `Runnable.run()` calls + * into a single Runnable passed to the original + * Executor. This can be a useful optimization + * because it bypasses the original context's task + * queue and keeps related (nested) code on a single + * thread which may improve CPU affinity. However, + * if tasks passed to the Executor are blocking + * or expensive, this optimization can prevent work-stealing + * and make performance worse. Also, some ExecutionContext + * may be fast enough natively that this optimization just + * adds overhead. + * The default ExecutionContext.global is already batching + * or fast enough not to benefit from it; while + * `fromExecutor` and `fromExecutorService` do NOT add + * this optimization since they don't know whether the underlying + * executor will benefit from it. + * A batching executor can create deadlocks if code does + * not use `scala.concurrent.blocking` when it should, + * because tasks created within other tasks will block + * on the outer task completing. + * This executor may run tasks in any order, including LIFO order. + * There are no ordering guarantees. + * + * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable + * in the calling thread synchronously. It must enqueue/handoff the Runnable. + */ + // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside + private val _tasksLocal = new ThreadLocal[List[Runnable]]() + + private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext { + private[this] var parentBlockContext: BlockContext = _ + // this method runs in the delegate ExecutionContext's thread + override def run(): Unit = { + require(_tasksLocal.get eq null) + + val prevBlockContext = BlockContext.current + BlockContext.withBlockContext(this) { + try { + parentBlockContext = prevBlockContext + + @tailrec def processBatch(batch: List[Runnable]): Unit = batch match { + case Nil => () + case head :: tail => + _tasksLocal set tail + try { + head.run() + } catch { + case t: Throwable => + // if one task throws, move the + // remaining tasks to another thread + // so we can throw the exception + // up to the invoking executor + val remaining = _tasksLocal.get + _tasksLocal set Nil + unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails? + throw t // rethrow + } + processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here + } + + processBatch(initial) + } finally { + _tasksLocal.remove() + parentBlockContext = null + } + } + } + + override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. + { + val tasks = _tasksLocal.get + _tasksLocal set Nil + if ((tasks ne null) && tasks.nonEmpty) + unbatchedExecute(new Batch(tasks)) + } + + // now delegate the blocking to the previous BC + require(parentBlockContext ne null) + parentBlockContext.blockOn(thunk) + } + } + + override def execute(runnable: Runnable): Unit = runnable match { + // If we can batch the runnable + case _: OnCompleteRunnable => + _tasksLocal.get match { + case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch + case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch + } + + // If not batchable, just delegate to underlying + case _ => + unbatchedExecute(runnable) + } + + private def unbatchedExecute(r: Runnable): Unit = r.run() } } |