summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHavoc Pennington <hp@pobox.com>2012-07-09 12:59:29 -0400
committerHavoc Pennington <hp@pobox.com>2012-07-09 16:03:00 -0400
commit4496c5daa9c573df6d4e2c85a34c37eb9933d1bd (patch)
tree6a1b460de6c86d3722c40c5a71861a180ba7137d
parente9afc228fd25a9dbdcf4c39e6187fcac7f26f969 (diff)
downloadscala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.gz
scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.bz2
scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.zip
Collection of updates to SIP-14 (scala.concurrent)
Developed by Viktor Klang and Havoc Pennington - add Promise.isCompleted - add Future.successful and Future.failed - add ExecutionContextExecutor and ExecutionContextExecutorService for Java interop - remove defaultExecutionContext as default parameter value from promise and future - add ExecutionContext.Implicits.global which must be explicitly imported, rather than the previous always-available value for the implicit EC - remove currentExecutionContext, since it could create bugs by being out of sync with the implicit ExecutionContext - remove Future task batching (_taskStack) and Future.releaseStack This optimization should instead be implemented either in a specific thread pool or in a specific ExecutionContext. Some pools or ExecutionContexts may not want or need it. In this patch, the defaultExecutionContext does not keep the batching optimization. Whether it should have it should perhaps be determined through benchmarking. - move internalBlockingCall to BlockContext and remove currentExecutionContext In this patch, BlockContext must be implemented by Thread.currentThread, so the thread pool is the only place you can add custom hooks to be run when blocking. We implement BlockContext for the default ForkJoinWorkerThread in terms of ForkJoinPool.ManagedBlocker. - add public BlockContext.current and BlockContext.withBlockContext These allow an ExecutionContext or other code to override the BlockContext for the current thread. With this API, the BlockContext is customizable without creating a new pool of threads. BlockContext.current is needed to obtain the previous BlockContext before you push, so you can "chain up" to it if desired. BlockContext.withBlockContext is used to override the context for a given piece of code. - move isFutureThrowable into impl.Future - add implicitNotFound to ExecutionContext - remove default global EC from future {} and promise {} - add ExecutionContext.global for explicit use of the global default EC, replaces defaultExecutionContext - add a timeout to scala-concurrent-tck tests that block on SyncVar (so tests time out rather than hang) - insert blocking{} calls into concurrent tck to fix deadlocking - add NonFatal.apply and tests for NonFatal - add OnCompleteRunnable marker trait This would allow an ExecutionContext to distinguish a Runnable originating from Future.onComplete (all callbacks on Future end up going through onComplete). - rename ListenerRunnable to CallbackRunnable and use for KeptPromise too Just adds some clarity and consistency.
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala2
-rw-r--r--src/library/scala/concurrent/BlockContext.scala81
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala34
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala5
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala74
-rw-r--r--src/library/scala/concurrent/Future.scala29
-rw-r--r--src/library/scala/concurrent/Promise.scala9
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala61
-rw-r--r--src/library/scala/concurrent/impl/Future.scala102
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala34
-rw-r--r--src/library/scala/util/control/NonFatal.scala23
-rw-r--r--test/files/jvm/future-spec/FutureTests.scala124
-rw-r--r--test/files/jvm/future-spec/PromiseTests.scala15
-rw-r--r--test/files/jvm/non-fatal-tests.scala47
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala159
15 files changed, 478 insertions, 321 deletions
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 2eaa861429..3d27f619bb 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -48,7 +48,7 @@ extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
* By default, parallel collections are parametrized with this task support object, so parallel collections
* share the same execution context backend as the rest of the `scala.concurrent` package.
*/
-class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.defaultExecutionContext)
+class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
new file mode 100644
index 0000000000..a5b878c546
--- /dev/null
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -0,0 +1,81 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import java.lang.Thread
+import scala.concurrent.util.Duration
+
+/**
+ * A context to be notified by `scala.concurrent.blocking()` when
+ * a thread is about to block. In effect this trait provides
+ * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
+ * locates an instance of `BlockContext` by first looking for one
+ * provided through `BlockContext.withBlockContext()` and failing that,
+ * checking whether `Thread.currentThread` is an instance of `BlockContext`.
+ * So a thread pool can have its `java.lang.Thread` instances implement
+ * `BlockContext`. There's a default `BlockContext` used if the thread
+ * doesn't implement `BlockContext`.
+ *
+ * Typically, you'll want to chain to the previous `BlockContext`,
+ * like this:
+ * {{{
+ * val oldContext = BlockContext.current
+ * val myContext = new BlockContext {
+ * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ * // you'd have code here doing whatever you need to do
+ * // when the thread is about to block.
+ * // Then you'd chain to the previous context:
+ * oldContext.internalBlockingCall(awaitable, atMost)
+ * }
+ * }
+ * BlockContext.withBlockContext(myContext) {
+ * // then this block runs with myContext as the handler
+ * // for scala.concurrent.blocking
+ * }
+ * }}}
+ */
+trait BlockContext {
+
+ /** Used internally by the framework; blocks execution for at most
+ * `atMost` time while waiting for an `awaitable` object to become ready.
+ *
+ * Clients should use `scala.concurrent.blocking` instead; this is
+ * the implementation of `scala.concurrent.blocking`, generally
+ * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
+ */
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+}
+
+object BlockContext {
+ private object DefaultBlockContext extends BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ awaitable.result(atMost)(Await.canAwaitEvidence)
+ }
+
+ private val contextLocal = new ThreadLocal[BlockContext]() {
+ override def initialValue = Thread.currentThread match {
+ case ctx: BlockContext => ctx
+ case _ => DefaultBlockContext
+ }
+ }
+
+ /** Obtain the current thread's current `BlockContext`. */
+ def current: BlockContext = contextLocal.get
+
+ /** Pushes a current `BlockContext` while executing `body`. */
+ def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
+ val old = contextLocal.get
+ try {
+ contextLocal.set(blockContext)
+ body
+ } finally {
+ contextLocal.set(old)
+ }
+ }
+}
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index 330a2f0e25..86a86966ef 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -17,23 +17,6 @@ import language.implicitConversions
/** This package object contains primitives for concurrent and parallel programming.
*/
abstract class ConcurrentPackageObject {
- /** A global execution environment for executing lightweight tasks.
- */
- lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
-
- val currentExecutionContext = new ThreadLocal[ExecutionContext]
-
- val handledFutureException: PartialFunction[Throwable, Throwable] = {
- case t: Throwable if isFutureThrowable(t) => t
- }
-
- // TODO rename appropriately and make public
- private[concurrent] def isFutureThrowable(t: Throwable) = t match {
- case e: Error => false
- case t: scala.util.control.ControlThrowable => false
- case i: InterruptedException => false
- case _ => true
- }
/* concurrency constructs */
@@ -46,8 +29,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
- def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] =
- Future[T](body)
+ def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
/** Creates a promise object which can be completed with a value.
*
@@ -55,8 +37,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] =
- Promise[T]()
+ def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
/** Used to block on a piece of code which potentially blocks.
*
@@ -67,8 +48,7 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](body: =>T): T =
- blocking(impl.Future.body2awaitable(body), Duration.Inf)
+ def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
/** Blocks on an awaitable object.
*
@@ -79,12 +59,8 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.result(atMost)(Await.canAwaitEvidence)
- case ec => ec.internalBlockingCall(awaitable, atMost)
- }
- }
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
+ BlockContext.current.internalBlockingCall(awaitable, atMost)
@inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 96a66d83b6..91e41748f5 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -23,7 +23,7 @@ package scala.concurrent
* @author Paul Phillips
* @version 2.8
*/
-class DelayedLazyVal[T](f: () => T, body: => Unit) {
+class DelayedLazyVal[T](f: () => T, body: => Unit){
@volatile private[this] var _isDone = false
private[this] lazy val complete = f()
@@ -39,7 +39,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- // TODO replace with scala.concurrent.future { ... }
+ // FIXME need to take ExecutionContext in constructor
+ import ExecutionContext.Implicits.global
future {
body
_isDone = true
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 436a17a33b..b486e5269e 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -9,58 +9,80 @@
package scala.concurrent
-
-import java.util.concurrent.atomic.{ AtomicInteger }
-import java.util.concurrent.{ Executors, Future => JFuture, Callable, ExecutorService, Executor }
+import java.util.concurrent.{ ExecutorService, Executor }
import scala.concurrent.util.Duration
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
-import scala.collection.generic.CanBuildFrom
-import collection._
-
-
+import scala.annotation.implicitNotFound
+/**
+ * An `ExecutionContext` is an abstraction over an entity that can execute program logic.
+ */
+@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global")
trait ExecutionContext {
/** Runs a block of code on this execution context.
*/
def execute(runnable: Runnable): Unit
- /** Used internally by the framework - blocks execution for at most `atMost` time while waiting
- * for an `awaitable` object to become ready.
- *
- * Clients should use `scala.concurrent.blocking` instead.
- */
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
-
/** Reports that an asynchronous computation failed.
*/
def reportFailure(t: Throwable): Unit
}
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutor extends ExecutionContext with Executor
+
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
+
/** Contains factory methods for creating execution contexts.
*/
object ExecutionContext {
-
- implicit def defaultExecutionContext: ExecutionContext = scala.concurrent.defaultExecutionContext
-
+ /**
+ * The `ExecutionContext` associated with the current `Thread`
+ */
+ val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set
+
+ /**
+ * This is the explicit global ExecutionContext,
+ * call this when you want to provide the global ExecutionContext explicitly
+ */
+ def global: ExecutionContextExecutor = Implicits.global
+
+ object Implicits {
+ /**
+ * This is the implicit global ExecutionContext,
+ * import this when you want to provide the global ExecutionContext implicitly
+ */
+ implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ }
+
/** Creates an `ExecutionContext` from the given `ExecutorService`.
*/
- def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
+ def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter.
+ */
+ def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
/** Creates an `ExecutionContext` from the given `Executor`.
*/
- def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
+ def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
impl.ExecutionContextImpl.fromExecutor(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `Executor` with the default Reporter.
+ */
+ def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
- def defaultReporter: Throwable => Unit = {
- // re-throwing `Error`s here causes an exception handling test to fail.
- //case e: Error => throw e
- case t => t.printStackTrace()
- }
-
+ /** The default reporter simply prints the stack trace of the `Throwable` to System.err.
+ */
+ def defaultReporter: Throwable => Unit = { case t => t.printStackTrace() }
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 2de0c57253..75a83d6ef8 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -8,7 +8,7 @@
package scala.concurrent
-
+import language.higherKinds
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
@@ -23,11 +23,9 @@ import scala.Option
import scala.util.{Try, Success, Failure}
import scala.annotation.tailrec
-import scala.collection.mutable.Stack
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
-import language.higherKinds
@@ -138,7 +136,7 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
+ case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
case _ =>
}(executor)
@@ -580,6 +578,20 @@ object Future {
classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
+
+ /** Creates an already completed Future with the specified exception.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
+
+ /** Creates an already completed Future with the specified result.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def successful[T](result: T): Future[T] = Promise.successful(result).future
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
@@ -710,5 +722,12 @@ object Future {
}
}
-
+/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
+ * wraps a callback provided to `Future.onComplete`.
+ * All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
+ * `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
+ */
+trait OnCompleteRunnable {
+ self: Runnable =>
+}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 578642966f..5d1b2c00b6 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -34,6 +34,15 @@ trait Promise[T] {
*/
def future: Future[T]
+ /** Returns whether the promise has already been completed with
+ * a value or an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return `true` if the promise is already completed, `false` otherwise
+ */
+ def isCompleted: Boolean
+
/** Completes the promise with either an exception or a value.
*
* @param result Either the value or the exception to complete the promise with.
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 4c6347dce0..551a444425 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -13,34 +13,34 @@ package scala.concurrent.impl
import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
import java.util.Collection
import scala.concurrent.forkjoin._
-import scala.concurrent.{ ExecutionContext, Awaitable }
+import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService }
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
-private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor {
+private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {
val executor: Executor = es match {
case null => createExecutorService
case some => some
}
-
- // to ensure that the current execution context thread local is properly set
- def executorsThreadFactory = new ThreadFactory {
- def newThread(r: Runnable) = new Thread(new Runnable {
- override def run() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
- r.run()
- }
- })
- }
-
- // to ensure that the current execution context thread local is properly set
+
+ // Implement BlockContext on FJP threads
def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
- def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) {
- override def onStart() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
+ def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) with BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ var result: T = null.asInstanceOf[T]
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ @volatile var isdone = false
+ def block(): Boolean = {
+ result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
+ isdone = true
+ true
+ }
+ def isReleasable = isdone
+ })
+ result
}
}
}
@@ -68,7 +68,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case NonFatal(t) =>
System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
t.printStackTrace(System.err)
- Executors.newCachedThreadPool(executorsThreadFactory) //FIXME use the same desired parallelism here too?
+ Executors.newCachedThreadPool() //FIXME use the same desired parallelism here too?
}
def execute(runnable: Runnable): Unit = executor match {
@@ -84,27 +84,6 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case generic => generic execute runnable
}
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
- Future.releaseStack(this)
-
- executor match {
- case fj: ForkJoinPool =>
- var result: T = null.asInstanceOf[T]
- ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
- @volatile var isdone = false
- def block(): Boolean = {
- result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
- isdone = true
- true
- }
- def isReleasable = isdone
- })
- result
- case _ =>
- awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
- }
- }
-
def reportFailure(t: Throwable) = reporter(t)
}
@@ -112,8 +91,8 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
private[concurrent] object ExecutionContextImpl {
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
- def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService =
- new ExecutionContextImpl(es, reporter) with ExecutorService {
+ def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
+ new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
override def execute(command: Runnable) = executor.execute(command)
override def shutdown() { asExecutorService.shutdown() }
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 8012ea6a93..073e6c4c9f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -46,11 +46,19 @@ private[concurrent] object Future {
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
- private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T)
- extends Task {
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptedException => false
+ case _ => true
+ }
+
+ private[impl] class PromiseCompletingRunnable[T](body: => T)
+ extends Runnable {
val promise = new Promise.DefaultPromise[T]()
- protected override def task() = {
+ override def run() = {
promise complete {
try Right(body) catch {
case NonFatal(e) =>
@@ -63,90 +71,8 @@ private[concurrent] object Future {
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
- val task = new PromiseCompletingTask(executor, body)
- task.dispatch()
-
- task.promise.future
- }
-
- private[impl] val throwableId: Throwable => Throwable = identity _
-
- // an optimization for batching futures
- // TODO we should replace this with a public queue,
- // so that it can be stolen from
- // OR: a push to the local task queue should be so cheap that this is
- // not even needed, but stealing is still possible
-
- private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)
-
- private val _taskStack = new ThreadLocal[TaskStack]()
-
- private[impl] trait Task extends Runnable {
- def executor: ExecutionContext
-
- // run the original callback (no dispatch)
- protected def task(): Unit
-
- // we implement Runnable to avoid creating
- // an extra object. run() runs ourselves with
- // a TaskStack pushed, and then runs any
- // other tasks that show up in the stack.
- final override def run() = {
- try {
- val taskStack = TaskStack(Stack[Task](this), executor)
- _taskStack set taskStack
- while (taskStack.stack.nonEmpty) {
- val next = taskStack.stack.pop()
- require(next.executor eq executor)
- try next.task() catch { case NonFatal(e) => executor reportFailure e }
- }
- } finally {
- _taskStack.remove()
- }
- }
-
- // send the task to the running executor.execute() via
- // _taskStack, or start a new executor.execute()
- def dispatch(force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
- case _ => executor.execute(this)
- }
- }
-
- private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
- extends Task {
- protected override def task() = {
- _taskStack.get.stack.elems = elems
- }
- }
-
- private[impl] def releaseStack(executor: ExecutionContext): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && stack.stack.nonEmpty =>
- val tasks = stack.stack.elems
- stack.stack.clear()
- _taskStack.remove()
- val release = new ReleaseTask(executor, tasks)
- release.dispatch(force=true)
- case null =>
- // do nothing - there is no local batching stack anymore
- case _ =>
- _taskStack.remove()
- }
-
- private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
- extends Task {
- private var value: Either[Throwable, T] = null
-
- protected override def task() = {
- require(value ne null) // dispatch(value) must be called before dispatch()
- onComplete(value)
- }
-
- def dispatch(value: Either[Throwable, T]): Unit = {
- this.value = value
- dispatch()
- }
+ val runnable = new PromiseCompletingRunnable(body)
+ executor.execute(runnable)
+ runnable.promise.future
}
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index c5060a2368..3ac34bef8a 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -11,11 +11,12 @@ package scala.concurrent.impl
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
-import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, TimeoutException, ExecutionException }
+import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
//import scala.util.continuations._
import scala.concurrent.util.Duration
import scala.util
import scala.annotation.tailrec
+import scala.util.control.NonFatal
//import scala.concurrent.NonDeterministic
@@ -24,6 +25,21 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
def future: this.type = this
}
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable {
+ // must be filled in before running it
+ var value: Either[Throwable, T] = null
+
+ override def run() = {
+ require(value ne null) // must set value to non-null before running!
+ try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
+ }
+
+ def executeWithValue(v: Either[Throwable, T]): Unit = {
+ require(value eq null) // can't complete it twice
+ value = v
+ executor.execute(this)
+ }
+}
object Promise {
@@ -94,10 +110,10 @@ object Promise {
val resolved = resolveEither(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = {
+ def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]]
+ val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
@@ -107,19 +123,19 @@ object Promise {
synchronized { notifyAll() } //Notify any evil blockers
}) match {
case null => false
- case cs if cs.isEmpty => true
- case cs => cs.foreach(c => c.dispatch(resolved)); true
+ case rs if rs.isEmpty => true
+ case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
- val bound = new Future.OnCompleteTask[T](executor, func)
+ val runnable = new CallbackRunnable[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]])
- case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
+ case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]])
+ case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
}
@@ -139,7 +155,7 @@ object Promise {
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get
- (new Future.OnCompleteTask(executor, func)).dispatch(completedAs)
+ (new CallbackRunnable(executor, func)).executeWithValue(completedAs)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
diff --git a/src/library/scala/util/control/NonFatal.scala b/src/library/scala/util/control/NonFatal.scala
index 9da2f63307..5137f0f2f5 100644
--- a/src/library/scala/util/control/NonFatal.scala
+++ b/src/library/scala/util/control/NonFatal.scala
@@ -23,16 +23,23 @@ package scala.util.control
* // dangerous stuff
* } catch {
* case NonFatal(e) => log.error(e, "Something not that bad.")
+ * // or
+ * case e if NonFatal(e) => log.error(e, "Something not that bad.")
* }
* }}}
*/
object NonFatal {
-
- def unapply(t: Throwable): Option[Throwable] = t match {
- case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError
- // VirtualMachineError includes OutOfMemoryError and other fatal errors
- case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => None
- case e ⇒ Some(e)
- }
-
+ /**
+ * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal
+ */
+ def apply(t: Throwable): Boolean = t match {
+ case _: StackOverflowError => true // StackOverflowError ok even though it is a VirtualMachineError
+ // VirtualMachineError includes OutOfMemoryError and other fatal errors
+ case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => false
+ case _ => true
+ }
+ /**
+ * Returns Some(t) if NonFatal(t) == true, otherwise None
+ */
+ def unapply(t: Throwable): Option[Throwable] = if (apply(t)) Some(t) else None
}
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala
index e5e01a5954..ca9ff5090f 100644
--- a/test/files/jvm/future-spec/FutureTests.scala
+++ b/test/files/jvm/future-spec/FutureTests.scala
@@ -10,21 +10,69 @@ import scala.runtime.NonLocalReturnControl
object FutureTests extends MinimalScalaTest {
-
+
/* some utils */
- def testAsync(s: String): Future[String] = s match {
+ def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match {
case "Hello" => future { "World" }
- case "Failure" => Promise.failed(new RuntimeException("Expected exception; to test fault-tolerance")).future
+ case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance"))
case "NoReply" => Promise[String]().future
}
val defaultTimeout = 5 seconds
/* future specification */
+
+ "A future with custom ExecutionContext" should {
+ "shouldHandleThrowables" in {
+ val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable]
+ implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), {
+ t =>
+ ms += t
+ })
+
+ class ThrowableTest(m: String) extends Throwable(m)
+
+ val f1 = future[Any] {
+ throw new ThrowableTest("test")
+ }
+
+ intercept[ThrowableTest] {
+ Await.result(f1, defaultTimeout)
+ }
+
+ val latch = new TestLatch
+ val f2 = future {
+ Await.ready(latch, 5 seconds)
+ "success"
+ }
+ val f3 = f2 map { s => s.toUpperCase }
+
+ f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
+ f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
+
+ latch.open()
+
+ Await.result(f2, defaultTimeout) mustBe ("success")
+
+ f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
+ f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
+
+ Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
+
+ val waiting = future {
+ Thread.sleep(1000)
+ }
+ Await.ready(waiting, 2000 millis)
+
+ ms.size mustBe (4)
+ //FIXME should check
+ }
+ }
- "A future" should {
-
+ "A future with global ExecutionContext" should {
+ import ExecutionContext.Implicits._
+
"compose with for-comprehensions" in {
def async(x: Int) = future { (x * 2).toString }
val future0 = future[Any] {
@@ -122,20 +170,20 @@ object FutureTests extends MinimalScalaTest {
val r = new IllegalStateException("recovered")
intercept[IllegalStateException] {
- val failed = Promise.failed[String](o).future recoverWith {
- case _ if false == true => Promise.successful("yay!").future
+ val failed = Future.failed[String](o) recoverWith {
+ case _ if false == true => Future.successful("yay!")
}
Await.result(failed, defaultTimeout)
} mustBe (o)
- val recovered = Promise.failed[String](o).future recoverWith {
- case _ => Promise.successful("yay!").future
+ val recovered = Future.failed[String](o) recoverWith {
+ case _ => Future.successful("yay!")
}
Await.result(recovered, defaultTimeout) mustBe ("yay!")
intercept[IllegalStateException] {
- val refailed = Promise.failed[String](o).future recoverWith {
- case _ => Promise.failed[String](r).future
+ val refailed = Future.failed[String](o) recoverWith {
+ case _ => Future.failed[String](r)
}
Await.result(refailed, defaultTimeout)
} mustBe (r)
@@ -164,7 +212,7 @@ object FutureTests extends MinimalScalaTest {
"firstCompletedOf" in {
def futures = Vector.fill[Future[Int]](10) {
Promise[Int]().future
- } :+ Promise.successful[Int](5).future
+ } :+ Future.successful[Int](5)
Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5)
@@ -186,21 +234,21 @@ object FutureTests extends MinimalScalaTest {
val timeout = 10000 millis
val f = new IllegalStateException("test")
intercept[IllegalStateException] {
- val failed = Promise.failed[String](f).future zip Promise.successful("foo").future
+ val failed = Future.failed[String](f) zip Future.successful("foo")
Await.result(failed, timeout)
} mustBe (f)
intercept[IllegalStateException] {
- val failed = Promise.successful("foo").future zip Promise.failed[String](f).future
+ val failed = Future.successful("foo") zip Future.failed[String](f)
Await.result(failed, timeout)
} mustBe (f)
intercept[IllegalStateException] {
- val failed = Promise.failed[String](f).future zip Promise.failed[String](f).future
+ val failed = Future.failed[String](f) zip Future.failed[String](f)
Await.result(failed, timeout)
} mustBe (f)
- val successful = Promise.successful("foo").future zip Promise.successful("foo").future
+ val successful = Future.successful("foo") zip Future.successful("foo")
Await.result(successful, timeout) mustBe (("foo", "foo"))
}
@@ -337,50 +385,6 @@ object FutureTests extends MinimalScalaTest {
Await.result(traversedIterator, defaultTimeout).sum mustBe (10000)
}
- "shouldHandleThrowables" in {
- val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable]
- implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), {
- t =>
- ms += t
- })
-
- class ThrowableTest(m: String) extends Throwable(m)
-
- val f1 = future[Any] {
- throw new ThrowableTest("test")
- }
-
- intercept[ThrowableTest] {
- Await.result(f1, defaultTimeout)
- }
-
- val latch = new TestLatch
- val f2 = future {
- Await.ready(latch, 5 seconds)
- "success"
- }
- val f3 = f2 map { s => s.toUpperCase }
-
- f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
- f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
-
- latch.open()
-
- Await.result(f2, defaultTimeout) mustBe ("success")
-
- f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
- f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
-
- Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
-
- val waiting = future {
- Thread.sleep(1000)
- }
- Await.ready(waiting, 2000 millis)
-
- ms.size mustBe (4)
- }
-
"shouldBlockUntilResult" in {
val latch = new TestLatch
diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala
index bf9d9b39d7..49bc642b57 100644
--- a/test/files/jvm/future-spec/PromiseTests.scala
+++ b/test/files/jvm/future-spec/PromiseTests.scala
@@ -10,7 +10,8 @@ import scala.runtime.NonLocalReturnControl
object PromiseTests extends MinimalScalaTest {
-
+ import ExecutionContext.Implicits._
+
val defaultTimeout = Inf
/* promise specification */
@@ -20,11 +21,13 @@ object PromiseTests extends MinimalScalaTest {
"not be completed" in {
val p = Promise()
p.future.isCompleted mustBe (false)
+ p.isCompleted mustBe (false)
}
"have no value" in {
val p = Promise()
p.future.value mustBe (None)
+ p.isCompleted mustBe (false)
}
"return supplied value on timeout" in {
@@ -45,14 +48,16 @@ object PromiseTests extends MinimalScalaTest {
"A successful Promise" should {
val result = "test value"
- val future = Promise[String]().complete(Right(result)).future
- futureWithResult(_(future, result))
+ val promise = Promise[String]().complete(Right(result))
+ promise.isCompleted mustBe (true)
+ futureWithResult(_(promise.future, result))
}
"A failed Promise" should {
val message = "Expected Exception"
- val future = Promise[String]().complete(Left(new RuntimeException(message))).future
- futureWithException[RuntimeException](_(future, message))
+ val promise = Promise[String]().complete(Left(new RuntimeException(message)))
+ promise.isCompleted mustBe (true)
+ futureWithException[RuntimeException](_(promise.future, message))
}
"An interrupted Promise" should {
diff --git a/test/files/jvm/non-fatal-tests.scala b/test/files/jvm/non-fatal-tests.scala
new file mode 100644
index 0000000000..471a9d227a
--- /dev/null
+++ b/test/files/jvm/non-fatal-tests.scala
@@ -0,0 +1,47 @@
+import scala.util.control.NonFatal
+
+trait NonFatalTests {
+
+ //NonFatals
+ val nonFatals: Seq[Throwable] =
+ Seq(new StackOverflowError,
+ new RuntimeException,
+ new Exception,
+ new Throwable)
+
+ //Fatals
+ val fatals: Seq[Throwable] =
+ Seq(new InterruptedException,
+ new OutOfMemoryError,
+ new LinkageError,
+ new VirtualMachineError {},
+ new Throwable with scala.util.control.ControlThrowable,
+ new NotImplementedError)
+
+ def testFatalsUsingApply(): Unit = {
+ fatals foreach { t => assert(NonFatal(t) == false) }
+ }
+
+ def testNonFatalsUsingApply(): Unit = {
+ nonFatals foreach { t => assert(NonFatal(t) == true) }
+ }
+
+ def testFatalsUsingUnapply(): Unit = {
+ fatals foreach { t => assert(NonFatal.unapply(t).isEmpty) }
+ }
+
+ def testNonFatalsUsingUnapply(): Unit = {
+ nonFatals foreach { t => assert(NonFatal.unapply(t).isDefined) }
+ }
+
+ testFatalsUsingApply()
+ testNonFatalsUsingApply()
+ testFatalsUsingUnapply()
+ testNonFatalsUsingUnapply()
+}
+
+object Test
+extends App
+with NonFatalTests {
+ System.exit(0)
+} \ No newline at end of file
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index 407027f904..5c9c71f3f8 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -3,22 +3,19 @@ import scala.concurrent.{
Promise,
TimeoutException,
SyncVar,
- ExecutionException
+ ExecutionException,
+ ExecutionContext
}
-import scala.concurrent.future
-import scala.concurrent.promise
-import scala.concurrent.blocking
+import scala.concurrent.{ future, promise, blocking }
import scala.util.{ Try, Success, Failure }
-
import scala.concurrent.util.Duration
-
trait TestBase {
def once(body: (() => Unit) => Unit) {
val sv = new SyncVar[Boolean]
body(() => sv put true)
- sv.take()
+ sv.take(2000)
}
// def assert(cond: => Boolean) {
@@ -33,7 +30,8 @@ trait TestBase {
trait FutureCallbacks extends TestBase {
-
+ import ExecutionContext.Implicits._
+
def testOnSuccess(): Unit = once {
done =>
var x = 0
@@ -147,6 +145,7 @@ trait FutureCallbacks extends TestBase {
trait FutureCombinators extends TestBase {
+ import ExecutionContext.Implicits._
def testMapSuccess(): Unit = once {
done =>
@@ -591,7 +590,8 @@ trait FutureCombinators extends TestBase {
trait FutureProjections extends TestBase {
-
+ import ExecutionContext.Implicits._
+
def testFailedFailureOnComplete(): Unit = once {
done =>
val cause = new RuntimeException
@@ -673,7 +673,8 @@ trait FutureProjections extends TestBase {
trait Blocking extends TestBase {
-
+ import ExecutionContext.Implicits._
+
def testAwaitSuccess(): Unit = once {
done =>
val f = future { 0 }
@@ -702,8 +703,67 @@ trait Blocking extends TestBase {
}
+trait BlockContexts extends TestBase {
+ import ExecutionContext.Implicits._
+ import scala.concurrent.{ Await, Awaitable, BlockContext }
+
+ private def getBlockContext(body: => BlockContext): BlockContext = {
+ blocking(Future { body }, Duration(500, "ms"))
+ }
+
+ // test outside of an ExecutionContext
+ def testDefaultOutsideFuture(): Unit = {
+ val bc = BlockContext.current
+ assert(bc.getClass.getName.contains("DefaultBlockContext"))
+ }
+
+ // test BlockContext in our default ExecutionContext
+ def testDefaultFJP(): Unit = {
+ val bc = getBlockContext(BlockContext.current)
+ assert(bc.isInstanceOf[scala.concurrent.forkjoin.ForkJoinWorkerThread])
+ }
+
+ // test BlockContext inside BlockContext.withBlockContext
+ def testPushCustom(): Unit = {
+ val orig = BlockContext.current
+ val customBC = new BlockContext() {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ orig.internalBlockingCall(awaitable, atMost)
+ }
+
+ val bc = getBlockContext({
+ BlockContext.withBlockContext(customBC) {
+ BlockContext.current
+ }
+ })
+
+ assert(bc eq customBC)
+ }
+
+ // test BlockContext after a BlockContext.push
+ def testPopCustom(): Unit = {
+ val orig = BlockContext.current
+ val customBC = new BlockContext() {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ orig.internalBlockingCall(awaitable, atMost)
+ }
+
+ val bc = getBlockContext({
+ BlockContext.withBlockContext(customBC) {}
+ BlockContext.current
+ })
+
+ assert(bc ne customBC)
+ }
+
+ testDefaultOutsideFuture()
+ testDefaultFJP()
+ testPushCustom()
+ testPopCustom()
+}
trait Promises extends TestBase {
+ import ExecutionContext.Implicits._
def testSuccess(): Unit = once {
done =>
@@ -730,7 +790,8 @@ trait Promises extends TestBase {
trait Exceptions extends TestBase {
-
+ import ExecutionContext.Implicits._
+
}
// trait TryEitherExtractor extends TestBase {
@@ -811,7 +872,7 @@ trait Exceptions extends TestBase {
trait CustomExecutionContext extends TestBase {
import scala.concurrent.{ ExecutionContext, Awaitable }
- def defaultEC = ExecutionContext.defaultExecutionContext
+ def defaultEC = ExecutionContext.global
val inEC = new java.lang.ThreadLocal[Int]() {
override def initialValue = 0
@@ -826,7 +887,7 @@ trait CustomExecutionContext extends TestBase {
val _count = new java.util.concurrent.atomic.AtomicInteger(0)
def count = _count.get
- def delegate = ExecutionContext.defaultExecutionContext
+ def delegate = ExecutionContext.global
override def execute(runnable: Runnable) = {
_count.incrementAndGet()
@@ -843,9 +904,6 @@ trait CustomExecutionContext extends TestBase {
delegate.execute(wrapper)
}
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- delegate.internalBlockingCall(awaitable, atMost)
-
override def reportFailure(t: Throwable): Unit = {
System.err.println("Failure: " + t.getClass.getSimpleName + ": " + t.getMessage)
delegate.reportFailure(t)
@@ -860,14 +918,16 @@ trait CustomExecutionContext extends TestBase {
def testOnSuccessCustomEC(): Unit = {
val count = countExecs { implicit ec =>
- once { done =>
- val f = future({ assertNoEC() })(defaultEC)
- f onSuccess {
- case _ =>
- assertEC()
+ blocking {
+ once { done =>
+ val f = future({ assertNoEC() })(defaultEC)
+ f onSuccess {
+ case _ =>
+ assertEC()
done()
+ }
+ assertNoEC()
}
- assertNoEC()
}
}
@@ -877,12 +937,14 @@ trait CustomExecutionContext extends TestBase {
def testKeptPromiseCustomEC(): Unit = {
val count = countExecs { implicit ec =>
- once { done =>
- val f = Promise.successful(10).future
- f onSuccess {
- case _ =>
- assertEC()
+ blocking {
+ once { done =>
+ val f = Promise.successful(10).future
+ f onSuccess {
+ case _ =>
+ assertEC()
done()
+ }
}
}
}
@@ -893,28 +955,30 @@ trait CustomExecutionContext extends TestBase {
def testCallbackChainCustomEC(): Unit = {
val count = countExecs { implicit ec =>
- once { done =>
- assertNoEC()
- val addOne = { x: Int => assertEC(); x + 1 }
- val f = Promise.successful(10).future
- f.map(addOne).filter { x =>
- assertEC()
- x == 11
- } flatMap { x =>
- Promise.successful(x + 1).future.map(addOne).map(addOne)
- } onComplete {
- case Left(t) =>
- try {
- throw new AssertionError("error in test: " + t.getMessage, t)
- } finally {
+ blocking {
+ once { done =>
+ assertNoEC()
+ val addOne = { x: Int => assertEC(); x + 1 }
+ val f = Promise.successful(10).future
+ f.map(addOne).filter { x =>
+ assertEC()
+ x == 11
+ } flatMap { x =>
+ Promise.successful(x + 1).future.map(addOne).map(addOne)
+ } onComplete {
+ case Left(t) =>
+ try {
+ throw new AssertionError("error in test: " + t.getMessage, t)
+ } finally {
+ done()
+ }
+ case Right(x) =>
+ assertEC()
+ assert(x == 14)
done()
- }
- case Right(x) =>
- assertEC()
- assert(x == 14)
- done()
+ }
+ assertNoEC()
}
- assertNoEC()
}
}
@@ -934,6 +998,7 @@ with FutureCallbacks
with FutureCombinators
with FutureProjections
with Promises
+with BlockContexts
with Exceptions
// with TryEitherExtractor
with CustomExecutionContext