diff options
author | Havoc Pennington <hp@pobox.com> | 2012-07-09 12:59:29 -0400 |
---|---|---|
committer | Havoc Pennington <hp@pobox.com> | 2012-07-09 16:03:00 -0400 |
commit | 4496c5daa9c573df6d4e2c85a34c37eb9933d1bd (patch) | |
tree | 6a1b460de6c86d3722c40c5a71861a180ba7137d /test/files/jvm/scala-concurrent-tck.scala | |
parent | e9afc228fd25a9dbdcf4c39e6187fcac7f26f969 (diff) | |
download | scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.gz scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.bz2 scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.zip |
Collection of updates to SIP-14 (scala.concurrent)
Developed by Viktor Klang and Havoc Pennington
- add Promise.isCompleted
- add Future.successful and Future.failed
- add ExecutionContextExecutor and ExecutionContextExecutorService for Java interop
- remove defaultExecutionContext as default parameter value from promise and future
- add ExecutionContext.Implicits.global which must be explicitly imported, rather
than the previous always-available value for the implicit EC
- remove currentExecutionContext, since it could create bugs by being
out of sync with the implicit ExecutionContext
- remove Future task batching (_taskStack) and Future.releaseStack
This optimization should instead be implemented either in
a specific thread pool or in a specific ExecutionContext.
Some pools or ExecutionContexts may not want or need it.
In this patch, the defaultExecutionContext does not
keep the batching optimization. Whether it should
have it should perhaps be determined through benchmarking.
- move internalBlockingCall to BlockContext and remove currentExecutionContext
In this patch, BlockContext must be implemented by Thread.currentThread,
so the thread pool is the only place you can add custom hooks
to be run when blocking.
We implement BlockContext for the default ForkJoinWorkerThread in terms of
ForkJoinPool.ManagedBlocker.
- add public BlockContext.current and BlockContext.withBlockContext
These allow an ExecutionContext or other code to override
the BlockContext for the current thread. With this
API, the BlockContext is customizable without
creating a new pool of threads.
BlockContext.current is needed to obtain the previous
BlockContext before you push, so you can "chain up" to
it if desired.
BlockContext.withBlockContext is used to override the context
for a given piece of code.
- move isFutureThrowable into impl.Future
- add implicitNotFound to ExecutionContext
- remove default global EC from future {} and promise {}
- add ExecutionContext.global for explicit use of the global default EC,
replaces defaultExecutionContext
- add a timeout to scala-concurrent-tck tests that block on SyncVar
(so tests time out rather than hang)
- insert blocking{} calls into concurrent tck to fix deadlocking
- add NonFatal.apply and tests for NonFatal
- add OnCompleteRunnable marker trait
This would allow an ExecutionContext to distinguish a Runnable originating
from Future.onComplete (all callbacks on Future end up going through
onComplete).
- rename ListenerRunnable to CallbackRunnable and use for KeptPromise too
Just adds some clarity and consistency.
Diffstat (limited to 'test/files/jvm/scala-concurrent-tck.scala')
-rw-r--r-- | test/files/jvm/scala-concurrent-tck.scala | 159 |
1 files changed, 112 insertions, 47 deletions
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 407027f904..5c9c71f3f8 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -3,22 +3,19 @@ import scala.concurrent.{ Promise, TimeoutException, SyncVar, - ExecutionException + ExecutionException, + ExecutionContext } -import scala.concurrent.future -import scala.concurrent.promise -import scala.concurrent.blocking +import scala.concurrent.{ future, promise, blocking } import scala.util.{ Try, Success, Failure } - import scala.concurrent.util.Duration - trait TestBase { def once(body: (() => Unit) => Unit) { val sv = new SyncVar[Boolean] body(() => sv put true) - sv.take() + sv.take(2000) } // def assert(cond: => Boolean) { @@ -33,7 +30,8 @@ trait TestBase { trait FutureCallbacks extends TestBase { - + import ExecutionContext.Implicits._ + def testOnSuccess(): Unit = once { done => var x = 0 @@ -147,6 +145,7 @@ trait FutureCallbacks extends TestBase { trait FutureCombinators extends TestBase { + import ExecutionContext.Implicits._ def testMapSuccess(): Unit = once { done => @@ -591,7 +590,8 @@ trait FutureCombinators extends TestBase { trait FutureProjections extends TestBase { - + import ExecutionContext.Implicits._ + def testFailedFailureOnComplete(): Unit = once { done => val cause = new RuntimeException @@ -673,7 +673,8 @@ trait FutureProjections extends TestBase { trait Blocking extends TestBase { - + import ExecutionContext.Implicits._ + def testAwaitSuccess(): Unit = once { done => val f = future { 0 } @@ -702,8 +703,67 @@ trait Blocking extends TestBase { } +trait BlockContexts extends TestBase { + import ExecutionContext.Implicits._ + import scala.concurrent.{ Await, Awaitable, BlockContext } + + private def getBlockContext(body: => BlockContext): BlockContext = { + blocking(Future { body }, Duration(500, "ms")) + } + + // test outside of an ExecutionContext + def testDefaultOutsideFuture(): Unit = { + val bc = BlockContext.current + assert(bc.getClass.getName.contains("DefaultBlockContext")) + } + + // test BlockContext in our default ExecutionContext + def testDefaultFJP(): Unit = { + val bc = getBlockContext(BlockContext.current) + assert(bc.isInstanceOf[scala.concurrent.forkjoin.ForkJoinWorkerThread]) + } + + // test BlockContext inside BlockContext.withBlockContext + def testPushCustom(): Unit = { + val orig = BlockContext.current + val customBC = new BlockContext() { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = + orig.internalBlockingCall(awaitable, atMost) + } + + val bc = getBlockContext({ + BlockContext.withBlockContext(customBC) { + BlockContext.current + } + }) + + assert(bc eq customBC) + } + + // test BlockContext after a BlockContext.push + def testPopCustom(): Unit = { + val orig = BlockContext.current + val customBC = new BlockContext() { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = + orig.internalBlockingCall(awaitable, atMost) + } + + val bc = getBlockContext({ + BlockContext.withBlockContext(customBC) {} + BlockContext.current + }) + + assert(bc ne customBC) + } + + testDefaultOutsideFuture() + testDefaultFJP() + testPushCustom() + testPopCustom() +} trait Promises extends TestBase { + import ExecutionContext.Implicits._ def testSuccess(): Unit = once { done => @@ -730,7 +790,8 @@ trait Promises extends TestBase { trait Exceptions extends TestBase { - + import ExecutionContext.Implicits._ + } // trait TryEitherExtractor extends TestBase { @@ -811,7 +872,7 @@ trait Exceptions extends TestBase { trait CustomExecutionContext extends TestBase { import scala.concurrent.{ ExecutionContext, Awaitable } - def defaultEC = ExecutionContext.defaultExecutionContext + def defaultEC = ExecutionContext.global val inEC = new java.lang.ThreadLocal[Int]() { override def initialValue = 0 @@ -826,7 +887,7 @@ trait CustomExecutionContext extends TestBase { val _count = new java.util.concurrent.atomic.AtomicInteger(0) def count = _count.get - def delegate = ExecutionContext.defaultExecutionContext + def delegate = ExecutionContext.global override def execute(runnable: Runnable) = { _count.incrementAndGet() @@ -843,9 +904,6 @@ trait CustomExecutionContext extends TestBase { delegate.execute(wrapper) } - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = - delegate.internalBlockingCall(awaitable, atMost) - override def reportFailure(t: Throwable): Unit = { System.err.println("Failure: " + t.getClass.getSimpleName + ": " + t.getMessage) delegate.reportFailure(t) @@ -860,14 +918,16 @@ trait CustomExecutionContext extends TestBase { def testOnSuccessCustomEC(): Unit = { val count = countExecs { implicit ec => - once { done => - val f = future({ assertNoEC() })(defaultEC) - f onSuccess { - case _ => - assertEC() + blocking { + once { done => + val f = future({ assertNoEC() })(defaultEC) + f onSuccess { + case _ => + assertEC() done() + } + assertNoEC() } - assertNoEC() } } @@ -877,12 +937,14 @@ trait CustomExecutionContext extends TestBase { def testKeptPromiseCustomEC(): Unit = { val count = countExecs { implicit ec => - once { done => - val f = Promise.successful(10).future - f onSuccess { - case _ => - assertEC() + blocking { + once { done => + val f = Promise.successful(10).future + f onSuccess { + case _ => + assertEC() done() + } } } } @@ -893,28 +955,30 @@ trait CustomExecutionContext extends TestBase { def testCallbackChainCustomEC(): Unit = { val count = countExecs { implicit ec => - once { done => - assertNoEC() - val addOne = { x: Int => assertEC(); x + 1 } - val f = Promise.successful(10).future - f.map(addOne).filter { x => - assertEC() - x == 11 - } flatMap { x => - Promise.successful(x + 1).future.map(addOne).map(addOne) - } onComplete { - case Left(t) => - try { - throw new AssertionError("error in test: " + t.getMessage, t) - } finally { + blocking { + once { done => + assertNoEC() + val addOne = { x: Int => assertEC(); x + 1 } + val f = Promise.successful(10).future + f.map(addOne).filter { x => + assertEC() + x == 11 + } flatMap { x => + Promise.successful(x + 1).future.map(addOne).map(addOne) + } onComplete { + case Left(t) => + try { + throw new AssertionError("error in test: " + t.getMessage, t) + } finally { + done() + } + case Right(x) => + assertEC() + assert(x == 14) done() - } - case Right(x) => - assertEC() - assert(x == 14) - done() + } + assertNoEC() } - assertNoEC() } } @@ -934,6 +998,7 @@ with FutureCallbacks with FutureCombinators with FutureProjections with Promises +with BlockContexts with Exceptions // with TryEitherExtractor with CustomExecutionContext |