diff options
author | Havoc Pennington <hp@pobox.com> | 2012-07-09 12:59:29 -0400 |
---|---|---|
committer | Havoc Pennington <hp@pobox.com> | 2012-07-09 16:03:00 -0400 |
commit | 4496c5daa9c573df6d4e2c85a34c37eb9933d1bd (patch) | |
tree | 6a1b460de6c86d3722c40c5a71861a180ba7137d /test/files/jvm/future-spec | |
parent | e9afc228fd25a9dbdcf4c39e6187fcac7f26f969 (diff) | |
download | scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.gz scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.bz2 scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.zip |
Collection of updates to SIP-14 (scala.concurrent)
Developed by Viktor Klang and Havoc Pennington
- add Promise.isCompleted
- add Future.successful and Future.failed
- add ExecutionContextExecutor and ExecutionContextExecutorService for Java interop
- remove defaultExecutionContext as default parameter value from promise and future
- add ExecutionContext.Implicits.global which must be explicitly imported, rather
than the previous always-available value for the implicit EC
- remove currentExecutionContext, since it could create bugs by being
out of sync with the implicit ExecutionContext
- remove Future task batching (_taskStack) and Future.releaseStack
This optimization should instead be implemented either in
a specific thread pool or in a specific ExecutionContext.
Some pools or ExecutionContexts may not want or need it.
In this patch, the defaultExecutionContext does not
keep the batching optimization. Whether it should
have it should perhaps be determined through benchmarking.
- move internalBlockingCall to BlockContext and remove currentExecutionContext
In this patch, BlockContext must be implemented by Thread.currentThread,
so the thread pool is the only place you can add custom hooks
to be run when blocking.
We implement BlockContext for the default ForkJoinWorkerThread in terms of
ForkJoinPool.ManagedBlocker.
- add public BlockContext.current and BlockContext.withBlockContext
These allow an ExecutionContext or other code to override
the BlockContext for the current thread. With this
API, the BlockContext is customizable without
creating a new pool of threads.
BlockContext.current is needed to obtain the previous
BlockContext before you push, so you can "chain up" to
it if desired.
BlockContext.withBlockContext is used to override the context
for a given piece of code.
- move isFutureThrowable into impl.Future
- add implicitNotFound to ExecutionContext
- remove default global EC from future {} and promise {}
- add ExecutionContext.global for explicit use of the global default EC,
replaces defaultExecutionContext
- add a timeout to scala-concurrent-tck tests that block on SyncVar
(so tests time out rather than hang)
- insert blocking{} calls into concurrent tck to fix deadlocking
- add NonFatal.apply and tests for NonFatal
- add OnCompleteRunnable marker trait
This would allow an ExecutionContext to distinguish a Runnable originating
from Future.onComplete (all callbacks on Future end up going through
onComplete).
- rename ListenerRunnable to CallbackRunnable and use for KeptPromise too
Just adds some clarity and consistency.
Diffstat (limited to 'test/files/jvm/future-spec')
-rw-r--r-- | test/files/jvm/future-spec/FutureTests.scala | 124 | ||||
-rw-r--r-- | test/files/jvm/future-spec/PromiseTests.scala | 15 |
2 files changed, 74 insertions, 65 deletions
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index e5e01a5954..ca9ff5090f 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -10,21 +10,69 @@ import scala.runtime.NonLocalReturnControl object FutureTests extends MinimalScalaTest { - + /* some utils */ - def testAsync(s: String): Future[String] = s match { + def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match { case "Hello" => future { "World" } - case "Failure" => Promise.failed(new RuntimeException("Expected exception; to test fault-tolerance")).future + case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance")) case "NoReply" => Promise[String]().future } val defaultTimeout = 5 seconds /* future specification */ + + "A future with custom ExecutionContext" should { + "shouldHandleThrowables" in { + val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable] + implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), { + t => + ms += t + }) + + class ThrowableTest(m: String) extends Throwable(m) + + val f1 = future[Any] { + throw new ThrowableTest("test") + } + + intercept[ThrowableTest] { + Await.result(f1, defaultTimeout) + } + + val latch = new TestLatch + val f2 = future { + Await.ready(latch, 5 seconds) + "success" + } + val f3 = f2 map { s => s.toUpperCase } + + f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") } + + latch.open() + + Await.result(f2, defaultTimeout) mustBe ("success") + + f2 foreach { _ => throw new ThrowableTest("current thread foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") } + + Await.result(f3, defaultTimeout) mustBe ("SUCCESS") + + val waiting = future { + Thread.sleep(1000) + } + Await.ready(waiting, 2000 millis) + + ms.size mustBe (4) + //FIXME should check + } + } - "A future" should { - + "A future with global ExecutionContext" should { + import ExecutionContext.Implicits._ + "compose with for-comprehensions" in { def async(x: Int) = future { (x * 2).toString } val future0 = future[Any] { @@ -122,20 +170,20 @@ object FutureTests extends MinimalScalaTest { val r = new IllegalStateException("recovered") intercept[IllegalStateException] { - val failed = Promise.failed[String](o).future recoverWith { - case _ if false == true => Promise.successful("yay!").future + val failed = Future.failed[String](o) recoverWith { + case _ if false == true => Future.successful("yay!") } Await.result(failed, defaultTimeout) } mustBe (o) - val recovered = Promise.failed[String](o).future recoverWith { - case _ => Promise.successful("yay!").future + val recovered = Future.failed[String](o) recoverWith { + case _ => Future.successful("yay!") } Await.result(recovered, defaultTimeout) mustBe ("yay!") intercept[IllegalStateException] { - val refailed = Promise.failed[String](o).future recoverWith { - case _ => Promise.failed[String](r).future + val refailed = Future.failed[String](o) recoverWith { + case _ => Future.failed[String](r) } Await.result(refailed, defaultTimeout) } mustBe (r) @@ -164,7 +212,7 @@ object FutureTests extends MinimalScalaTest { "firstCompletedOf" in { def futures = Vector.fill[Future[Int]](10) { Promise[Int]().future - } :+ Promise.successful[Int](5).future + } :+ Future.successful[Int](5) Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5) Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5) @@ -186,21 +234,21 @@ object FutureTests extends MinimalScalaTest { val timeout = 10000 millis val f = new IllegalStateException("test") intercept[IllegalStateException] { - val failed = Promise.failed[String](f).future zip Promise.successful("foo").future + val failed = Future.failed[String](f) zip Future.successful("foo") Await.result(failed, timeout) } mustBe (f) intercept[IllegalStateException] { - val failed = Promise.successful("foo").future zip Promise.failed[String](f).future + val failed = Future.successful("foo") zip Future.failed[String](f) Await.result(failed, timeout) } mustBe (f) intercept[IllegalStateException] { - val failed = Promise.failed[String](f).future zip Promise.failed[String](f).future + val failed = Future.failed[String](f) zip Future.failed[String](f) Await.result(failed, timeout) } mustBe (f) - val successful = Promise.successful("foo").future zip Promise.successful("foo").future + val successful = Future.successful("foo") zip Future.successful("foo") Await.result(successful, timeout) mustBe (("foo", "foo")) } @@ -337,50 +385,6 @@ object FutureTests extends MinimalScalaTest { Await.result(traversedIterator, defaultTimeout).sum mustBe (10000) } - "shouldHandleThrowables" in { - val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable] - implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), { - t => - ms += t - }) - - class ThrowableTest(m: String) extends Throwable(m) - - val f1 = future[Any] { - throw new ThrowableTest("test") - } - - intercept[ThrowableTest] { - Await.result(f1, defaultTimeout) - } - - val latch = new TestLatch - val f2 = future { - Await.ready(latch, 5 seconds) - "success" - } - val f3 = f2 map { s => s.toUpperCase } - - f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") } - f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") } - - latch.open() - - Await.result(f2, defaultTimeout) mustBe ("success") - - f2 foreach { _ => throw new ThrowableTest("current thread foreach") } - f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") } - - Await.result(f3, defaultTimeout) mustBe ("SUCCESS") - - val waiting = future { - Thread.sleep(1000) - } - Await.ready(waiting, 2000 millis) - - ms.size mustBe (4) - } - "shouldBlockUntilResult" in { val latch = new TestLatch diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala index bf9d9b39d7..49bc642b57 100644 --- a/test/files/jvm/future-spec/PromiseTests.scala +++ b/test/files/jvm/future-spec/PromiseTests.scala @@ -10,7 +10,8 @@ import scala.runtime.NonLocalReturnControl object PromiseTests extends MinimalScalaTest { - + import ExecutionContext.Implicits._ + val defaultTimeout = Inf /* promise specification */ @@ -20,11 +21,13 @@ object PromiseTests extends MinimalScalaTest { "not be completed" in { val p = Promise() p.future.isCompleted mustBe (false) + p.isCompleted mustBe (false) } "have no value" in { val p = Promise() p.future.value mustBe (None) + p.isCompleted mustBe (false) } "return supplied value on timeout" in { @@ -45,14 +48,16 @@ object PromiseTests extends MinimalScalaTest { "A successful Promise" should { val result = "test value" - val future = Promise[String]().complete(Right(result)).future - futureWithResult(_(future, result)) + val promise = Promise[String]().complete(Right(result)) + promise.isCompleted mustBe (true) + futureWithResult(_(promise.future, result)) } "A failed Promise" should { val message = "Expected Exception" - val future = Promise[String]().complete(Left(new RuntimeException(message))).future - futureWithException[RuntimeException](_(future, message)) + val promise = Promise[String]().complete(Left(new RuntimeException(message))) + promise.isCompleted mustBe (true) + futureWithException[RuntimeException](_(promise.future, message)) } "An interrupted Promise" should { |