1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.concurrent
import java.util.concurrent.Executor
import scala.annotation.tailrec
/**
* Mixin trait for an Executor
* which groups multiple nested `Runnable.run()` calls
* into a single Runnable passed to the original
* Executor. This can be a useful optimization
* because it bypasses the original context's task
* queue and keeps related (nested) code on a single
* thread which may improve CPU affinity. However,
* if tasks passed to the Executor are blocking
* or expensive, this optimization can prevent work-stealing
* and make performance worse. Also, some ExecutionContext
* may be fast enough natively that this optimization just
* adds overhead.
* The default ExecutionContext.global is already batching
* or fast enough not to benefit from it; while
* `fromExecutor` and `fromExecutorService` do NOT add
* this optimization since they don't know whether the underlying
* executor will benefit from it.
* A batching executor can create deadlocks if code does
* not use `scala.concurrent.blocking` when it should,
* because tasks created within other tasks will block
* on the outer task completing.
* This executor may run tasks in any order, including LIFO order.
* There are no ordering guarantees.
*
* WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
* in the calling thread synchronously. It must enqueue/handoff the Runnable.
*/
private[concurrent] trait BatchingExecutor extends Executor {
// invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside
private val _tasksLocal = new ThreadLocal[List[Runnable]]()
private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext {
private var parentBlockContext: BlockContext = _
// this method runs in the delegate ExecutionContext's thread
override def run(): Unit = {
require(_tasksLocal.get eq null)
val prevBlockContext = BlockContext.current
BlockContext.withBlockContext(this) {
try {
parentBlockContext = prevBlockContext
@tailrec def processBatch(batch: List[Runnable]): Unit = batch match {
case Nil => ()
case head :: tail =>
_tasksLocal set tail
try {
head.run()
} catch {
case t: Throwable =>
// if one task throws, move the
// remaining tasks to another thread
// so we can throw the exception
// up to the invoking executor
val remaining = _tasksLocal.get
_tasksLocal set Nil
unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails?
throw t // rethrow
}
processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here
}
processBatch(initial)
} finally {
_tasksLocal.remove()
parentBlockContext = null
}
}
}
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
// if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
{
val tasks = _tasksLocal.get
_tasksLocal set Nil
if ((tasks ne null) && tasks.nonEmpty)
unbatchedExecute(new Batch(tasks))
}
// now delegate the blocking to the previous BC
require(parentBlockContext ne null)
parentBlockContext.blockOn(thunk)
}
}
protected def unbatchedExecute(r: Runnable): Unit
override def execute(runnable: Runnable): Unit = {
if (batchable(runnable)) { // If we can batch the runnable
_tasksLocal.get match {
case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch
}
} else unbatchedExecute(runnable) // If not batchable, just delegate to underlying
}
/** Override this to define which runnables will be batched. */
def batchable(runnable: Runnable): Boolean = runnable match {
case _: OnCompleteRunnable => true
case _ => false
}
}
|