diff options
author | Havoc Pennington <hp@pobox.com> | 2012-07-09 12:59:29 -0400 |
---|---|---|
committer | Havoc Pennington <hp@pobox.com> | 2012-07-09 16:03:00 -0400 |
commit | 4496c5daa9c573df6d4e2c85a34c37eb9933d1bd (patch) | |
tree | 6a1b460de6c86d3722c40c5a71861a180ba7137d /src/library/scala/concurrent/BlockContext.scala | |
parent | e9afc228fd25a9dbdcf4c39e6187fcac7f26f969 (diff) | |
download | scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.gz scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.bz2 scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.zip |
Collection of updates to SIP-14 (scala.concurrent)
Developed by Viktor Klang and Havoc Pennington
- add Promise.isCompleted
- add Future.successful and Future.failed
- add ExecutionContextExecutor and ExecutionContextExecutorService for Java interop
- remove defaultExecutionContext as default parameter value from promise and future
- add ExecutionContext.Implicits.global which must be explicitly imported, rather
than the previous always-available value for the implicit EC
- remove currentExecutionContext, since it could create bugs by being
out of sync with the implicit ExecutionContext
- remove Future task batching (_taskStack) and Future.releaseStack
This optimization should instead be implemented either in
a specific thread pool or in a specific ExecutionContext.
Some pools or ExecutionContexts may not want or need it.
In this patch, the defaultExecutionContext does not
keep the batching optimization. Whether it should
have it should perhaps be determined through benchmarking.
- move internalBlockingCall to BlockContext and remove currentExecutionContext
In this patch, BlockContext must be implemented by Thread.currentThread,
so the thread pool is the only place you can add custom hooks
to be run when blocking.
We implement BlockContext for the default ForkJoinWorkerThread in terms of
ForkJoinPool.ManagedBlocker.
- add public BlockContext.current and BlockContext.withBlockContext
These allow an ExecutionContext or other code to override
the BlockContext for the current thread. With this
API, the BlockContext is customizable without
creating a new pool of threads.
BlockContext.current is needed to obtain the previous
BlockContext before you push, so you can "chain up" to
it if desired.
BlockContext.withBlockContext is used to override the context
for a given piece of code.
- move isFutureThrowable into impl.Future
- add implicitNotFound to ExecutionContext
- remove default global EC from future {} and promise {}
- add ExecutionContext.global for explicit use of the global default EC,
replaces defaultExecutionContext
- add a timeout to scala-concurrent-tck tests that block on SyncVar
(so tests time out rather than hang)
- insert blocking{} calls into concurrent tck to fix deadlocking
- add NonFatal.apply and tests for NonFatal
- add OnCompleteRunnable marker trait
This would allow an ExecutionContext to distinguish a Runnable originating
from Future.onComplete (all callbacks on Future end up going through
onComplete).
- rename ListenerRunnable to CallbackRunnable and use for KeptPromise too
Just adds some clarity and consistency.
Diffstat (limited to 'src/library/scala/concurrent/BlockContext.scala')
-rw-r--r-- | src/library/scala/concurrent/BlockContext.scala | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala new file mode 100644 index 0000000000..a5b878c546 --- /dev/null +++ b/src/library/scala/concurrent/BlockContext.scala @@ -0,0 +1,81 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import java.lang.Thread +import scala.concurrent.util.Duration + +/** + * A context to be notified by `scala.concurrent.blocking()` when + * a thread is about to block. In effect this trait provides + * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()` + * locates an instance of `BlockContext` by first looking for one + * provided through `BlockContext.withBlockContext()` and failing that, + * checking whether `Thread.currentThread` is an instance of `BlockContext`. + * So a thread pool can have its `java.lang.Thread` instances implement + * `BlockContext`. There's a default `BlockContext` used if the thread + * doesn't implement `BlockContext`. + * + * Typically, you'll want to chain to the previous `BlockContext`, + * like this: + * {{{ + * val oldContext = BlockContext.current + * val myContext = new BlockContext { + * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { + * // you'd have code here doing whatever you need to do + * // when the thread is about to block. + * // Then you'd chain to the previous context: + * oldContext.internalBlockingCall(awaitable, atMost) + * } + * } + * BlockContext.withBlockContext(myContext) { + * // then this block runs with myContext as the handler + * // for scala.concurrent.blocking + * } + * }}} + */ +trait BlockContext { + + /** Used internally by the framework; blocks execution for at most + * `atMost` time while waiting for an `awaitable` object to become ready. + * + * Clients should use `scala.concurrent.blocking` instead; this is + * the implementation of `scala.concurrent.blocking`, generally + * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`. + */ + def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T +} + +object BlockContext { + private object DefaultBlockContext extends BlockContext { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = + awaitable.result(atMost)(Await.canAwaitEvidence) + } + + private val contextLocal = new ThreadLocal[BlockContext]() { + override def initialValue = Thread.currentThread match { + case ctx: BlockContext => ctx + case _ => DefaultBlockContext + } + } + + /** Obtain the current thread's current `BlockContext`. */ + def current: BlockContext = contextLocal.get + + /** Pushes a current `BlockContext` while executing `body`. */ + def withBlockContext[T](blockContext: BlockContext)(body: => T): T = { + val old = contextLocal.get + try { + contextLocal.set(blockContext) + body + } finally { + contextLocal.set(old) + } + } +} |