blob: a5b878c546730f0fd27d13d0419260bab90c9495 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.concurrent
import java.lang.Thread
import scala.concurrent.util.Duration
/**
* A context to be notified by `scala.concurrent.blocking()` when
* a thread is about to block. In effect this trait provides
* the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
* locates an instance of `BlockContext` by first looking for one
* provided through `BlockContext.withBlockContext()` and failing that,
* checking whether `Thread.currentThread` is an instance of `BlockContext`.
* So a thread pool can have its `java.lang.Thread` instances implement
* `BlockContext`. There's a default `BlockContext` used if the thread
* doesn't implement `BlockContext`.
*
* Typically, you'll want to chain to the previous `BlockContext`,
* like this:
* {{{
* val oldContext = BlockContext.current
* val myContext = new BlockContext {
* override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
* // you'd have code here doing whatever you need to do
* // when the thread is about to block.
* // Then you'd chain to the previous context:
* oldContext.internalBlockingCall(awaitable, atMost)
* }
* }
* BlockContext.withBlockContext(myContext) {
* // then this block runs with myContext as the handler
* // for scala.concurrent.blocking
* }
* }}}
*/
trait BlockContext {
/** Used internally by the framework; blocks execution for at most
* `atMost` time while waiting for an `awaitable` object to become ready.
*
* Clients should use `scala.concurrent.blocking` instead; this is
* the implementation of `scala.concurrent.blocking`, generally
* provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
*/
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
}
object BlockContext {
private object DefaultBlockContext extends BlockContext {
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
awaitable.result(atMost)(Await.canAwaitEvidence)
}
private val contextLocal = new ThreadLocal[BlockContext]() {
override def initialValue = Thread.currentThread match {
case ctx: BlockContext => ctx
case _ => DefaultBlockContext
}
}
/** Obtain the current thread's current `BlockContext`. */
def current: BlockContext = contextLocal.get
/** Pushes a current `BlockContext` while executing `body`. */
def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
val old = contextLocal.get
try {
contextLocal.set(blockContext)
body
} finally {
contextLocal.set(old)
}
}
}
|