summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdriaan Moors <adriaan.moors@epfl.ch>2012-07-10 02:53:06 -0700
committerAdriaan Moors <adriaan.moors@epfl.ch>2012-07-10 02:53:06 -0700
commitb08de29331a6f26d06609c640b2fc9d2d38ff525 (patch)
tree3dd9c64b3511f50878df39f8b32a264d508ac040
parent9a7546db9f2b5e656678960bd8ee2d3e4eac8981 (diff)
parent4496c5daa9c573df6d4e2c85a34c37eb9933d1bd (diff)
downloadscala-b08de29331a6f26d06609c640b2fc9d2d38ff525.tar.gz
scala-b08de29331a6f26d06609c640b2fc9d2d38ff525.tar.bz2
scala-b08de29331a6f26d06609c640b2fc9d2d38ff525.zip
Merge pull request #856 from havocp/sip14-execution-changes
Collection of updates to SIP-14 (scala.concurrent)
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala2
-rw-r--r--src/library/scala/concurrent/BlockContext.scala81
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala34
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala5
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala74
-rw-r--r--src/library/scala/concurrent/Future.scala29
-rw-r--r--src/library/scala/concurrent/Promise.scala9
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala61
-rw-r--r--src/library/scala/concurrent/impl/Future.scala102
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala34
-rw-r--r--src/library/scala/util/control/NonFatal.scala23
-rw-r--r--test/files/jvm/future-spec/FutureTests.scala124
-rw-r--r--test/files/jvm/future-spec/PromiseTests.scala15
-rw-r--r--test/files/jvm/non-fatal-tests.scala47
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala159
15 files changed, 478 insertions, 321 deletions
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 2eaa861429..3d27f619bb 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -48,7 +48,7 @@ extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
* By default, parallel collections are parametrized with this task support object, so parallel collections
* share the same execution context backend as the rest of the `scala.concurrent` package.
*/
-class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.defaultExecutionContext)
+class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
new file mode 100644
index 0000000000..a5b878c546
--- /dev/null
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -0,0 +1,81 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import java.lang.Thread
+import scala.concurrent.util.Duration
+
+/**
+ * A context to be notified by `scala.concurrent.blocking()` when
+ * a thread is about to block. In effect this trait provides
+ * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
+ * locates an instance of `BlockContext` by first looking for one
+ * provided through `BlockContext.withBlockContext()` and failing that,
+ * checking whether `Thread.currentThread` is an instance of `BlockContext`.
+ * So a thread pool can have its `java.lang.Thread` instances implement
+ * `BlockContext`. There's a default `BlockContext` used if the thread
+ * doesn't implement `BlockContext`.
+ *
+ * Typically, you'll want to chain to the previous `BlockContext`,
+ * like this:
+ * {{{
+ * val oldContext = BlockContext.current
+ * val myContext = new BlockContext {
+ * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ * // you'd have code here doing whatever you need to do
+ * // when the thread is about to block.
+ * // Then you'd chain to the previous context:
+ * oldContext.internalBlockingCall(awaitable, atMost)
+ * }
+ * }
+ * BlockContext.withBlockContext(myContext) {
+ * // then this block runs with myContext as the handler
+ * // for scala.concurrent.blocking
+ * }
+ * }}}
+ */
+trait BlockContext {
+
+ /** Used internally by the framework; blocks execution for at most
+ * `atMost` time while waiting for an `awaitable` object to become ready.
+ *
+ * Clients should use `scala.concurrent.blocking` instead; this is
+ * the implementation of `scala.concurrent.blocking`, generally
+ * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
+ */
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+}
+
+object BlockContext {
+ private object DefaultBlockContext extends BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ awaitable.result(atMost)(Await.canAwaitEvidence)
+ }
+
+ private val contextLocal = new ThreadLocal[BlockContext]() {
+ override def initialValue = Thread.currentThread match {
+ case ctx: BlockContext => ctx
+ case _ => DefaultBlockContext
+ }
+ }
+
+ /** Obtain the current thread's current `BlockContext`. */
+ def current: BlockContext = contextLocal.get
+
+ /** Pushes a current `BlockContext` while executing `body`. */
+ def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
+ val old = contextLocal.get
+ try {
+ contextLocal.set(blockContext)
+ body
+ } finally {
+ contextLocal.set(old)
+ }
+ }
+}
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index 330a2f0e25..86a86966ef 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -17,23 +17,6 @@ import language.implicitConversions
/** This package object contains primitives for concurrent and parallel programming.
*/
abstract class ConcurrentPackageObject {
- /** A global execution environment for executing lightweight tasks.
- */
- lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
-
- val currentExecutionContext = new ThreadLocal[ExecutionContext]
-
- val handledFutureException: PartialFunction[Throwable, Throwable] = {
- case t: Throwable if isFutureThrowable(t) => t
- }
-
- // TODO rename appropriately and make public
- private[concurrent] def isFutureThrowable(t: Throwable) = t match {
- case e: Error => false
- case t: scala.util.control.ControlThrowable => false
- case i: InterruptedException => false
- case _ => true
- }
/* concurrency constructs */
@@ -46,8 +29,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
- def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] =
- Future[T](body)
+ def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
/** Creates a promise object which can be completed with a value.
*
@@ -55,8 +37,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] =
- Promise[T]()
+ def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
/** Used to block on a piece of code which potentially blocks.
*
@@ -67,8 +48,7 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](body: =>T): T =
- blocking(impl.Future.body2awaitable(body), Duration.Inf)
+ def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
/** Blocks on an awaitable object.
*
@@ -79,12 +59,8 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.result(atMost)(Await.canAwaitEvidence)
- case ec => ec.internalBlockingCall(awaitable, atMost)
- }
- }
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
+ BlockContext.current.internalBlockingCall(awaitable, atMost)
@inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 96a66d83b6..91e41748f5 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -23,7 +23,7 @@ package scala.concurrent
* @author Paul Phillips
* @version 2.8
*/
-class DelayedLazyVal[T](f: () => T, body: => Unit) {
+class DelayedLazyVal[T](f: () => T, body: => Unit){
@volatile private[this] var _isDone = false
private[this] lazy val complete = f()
@@ -39,7 +39,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- // TODO replace with scala.concurrent.future { ... }
+ // FIXME need to take ExecutionContext in constructor
+ import ExecutionContext.Implicits.global
future {
body
_isDone = true
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 436a17a33b..b486e5269e 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -9,58 +9,80 @@
package scala.concurrent
-
-import java.util.concurrent.atomic.{ AtomicInteger }
-import java.util.concurrent.{ Executors, Future => JFuture, Callable, ExecutorService, Executor }
+import java.util.concurrent.{ ExecutorService, Executor }
import scala.concurrent.util.Duration
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
-import scala.collection.generic.CanBuildFrom
-import collection._
-
-
+import scala.annotation.implicitNotFound
+/**
+ * An `ExecutionContext` is an abstraction over an entity that can execute program logic.
+ */
+@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global")
trait ExecutionContext {
/** Runs a block of code on this execution context.
*/
def execute(runnable: Runnable): Unit
- /** Used internally by the framework - blocks execution for at most `atMost` time while waiting
- * for an `awaitable` object to become ready.
- *
- * Clients should use `scala.concurrent.blocking` instead.
- */
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
-
/** Reports that an asynchronous computation failed.
*/
def reportFailure(t: Throwable): Unit
}
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutor extends ExecutionContext with Executor
+
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
+
/** Contains factory methods for creating execution contexts.
*/
object ExecutionContext {
-
- implicit def defaultExecutionContext: ExecutionContext = scala.concurrent.defaultExecutionContext
-
+ /**
+ * The `ExecutionContext` associated with the current `Thread`
+ */
+ val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set
+
+ /**
+ * This is the explicit global ExecutionContext,
+ * call this when you want to provide the global ExecutionContext explicitly
+ */
+ def global: ExecutionContextExecutor = Implicits.global
+
+ object Implicits {
+ /**
+ * This is the implicit global ExecutionContext,
+ * import this when you want to provide the global ExecutionContext implicitly
+ */
+ implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ }
+
/** Creates an `ExecutionContext` from the given `ExecutorService`.
*/
- def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
+ def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter.
+ */
+ def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
/** Creates an `ExecutionContext` from the given `Executor`.
*/
- def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
+ def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
impl.ExecutionContextImpl.fromExecutor(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `Executor` with the default Reporter.
+ */
+ def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
- def defaultReporter: Throwable => Unit = {
- // re-throwing `Error`s here causes an exception handling test to fail.
- //case e: Error => throw e
- case t => t.printStackTrace()
- }
-
+ /** The default reporter simply prints the stack trace of the `Throwable` to System.err.
+ */
+ def defaultReporter: Throwable => Unit = { case t => t.printStackTrace() }
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 2de0c57253..75a83d6ef8 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -8,7 +8,7 @@
package scala.concurrent
-
+import language.higherKinds
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
@@ -23,11 +23,9 @@ import scala.Option
import scala.util.{Try, Success, Failure}
import scala.annotation.tailrec
-import scala.collection.mutable.Stack
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
-import language.higherKinds
@@ -138,7 +136,7 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
+ case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
case _ =>
}(executor)
@@ -580,6 +578,20 @@ object Future {
classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
+
+ /** Creates an already completed Future with the specified exception.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
+
+ /** Creates an already completed Future with the specified result.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def successful[T](result: T): Future[T] = Promise.successful(result).future
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
@@ -710,5 +722,12 @@ object Future {
}
}
-
+/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
+ * wraps a callback provided to `Future.onComplete`.
+ * All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
+ * `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
+ */
+trait OnCompleteRunnable {
+ self: Runnable =>
+}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 578642966f..5d1b2c00b6 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -34,6 +34,15 @@ trait Promise[T] {
*/
def future: Future[T]
+ /** Returns whether the promise has already been completed with
+ * a value or an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return `true` if the promise is already completed, `false` otherwise
+ */
+ def isCompleted: Boolean
+
/** Completes the promise with either an exception or a value.
*
* @param result Either the value or the exception to complete the promise with.
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 4c6347dce0..551a444425 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -13,34 +13,34 @@ package scala.concurrent.impl
import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
import java.util.Collection
import scala.concurrent.forkjoin._
-import scala.concurrent.{ ExecutionContext, Awaitable }
+import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService }
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
-private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor {
+private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {
val executor: Executor = es match {
case null => createExecutorService
case some => some
}
-
- // to ensure that the current execution context thread local is properly set
- def executorsThreadFactory = new ThreadFactory {
- def newThread(r: Runnable) = new Thread(new Runnable {
- override def run() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
- r.run()
- }
- })
- }
-
- // to ensure that the current execution context thread local is properly set
+
+ // Implement BlockContext on FJP threads
def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
- def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) {
- override def onStart() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
+ def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) with BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ var result: T = null.asInstanceOf[T]
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ @volatile var isdone = false
+ def block(): Boolean = {
+ result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
+ isdone = true
+ true
+ }
+ def isReleasable = isdone
+ })
+ result
}
}
}
@@ -68,7 +68,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case NonFatal(t) =>
System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
t.printStackTrace(System.err)
- Executors.newCachedThreadPool(executorsThreadFactory) //FIXME use the same desired parallelism here too?
+ Executors.newCachedThreadPool() //FIXME use the same desired parallelism here too?
}
def execute(runnable: Runnable): Unit = executor match {
@@ -84,27 +84,6 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case generic => generic execute runnable
}
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
- Future.releaseStack(this)
-
- executor match {
- case fj: ForkJoinPool =>
- var result: T = null.asInstanceOf[T]
- ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
- @volatile var isdone = false
- def block(): Boolean = {
- result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
- isdone = true
- true
- }
- def isReleasable = isdone
- })
- result
- case _ =>
- awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
- }
- }
-
def reportFailure(t: Throwable) = reporter(t)
}
@@ -112,8 +91,8 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
private[concurrent] object ExecutionContextImpl {
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
- def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService =
- new ExecutionContextImpl(es, reporter) with ExecutorService {
+ def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
+ new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
override def execute(command: Runnable) = executor.execute(command)
override def shutdown() { asExecutorService.shutdown() }
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 8012ea6a93..073e6c4c9f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -46,11 +46,19 @@ private[concurrent] object Future {
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
- private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T)
- extends Task {
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptedException => false
+ case _ => true
+ }
+
+ private[impl] class PromiseCompletingRunnable[T](body: => T)
+ extends Runnable {
val promise = new Promise.DefaultPromise[T]()
- protected override def task() = {
+ override def run() = {
promise complete {
try Right(body) catch {
case NonFatal(e) =>
@@ -63,90 +71,8 @@ private[concurrent] object Future {
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
- val task = new PromiseCompletingTask(executor, body)
- task.dispatch()
-
- task.promise.future
- }
-
- private[impl] val throwableId: Throwable => Throwable = identity _
-
- // an optimization for batching futures
- // TODO we should replace this with a public queue,
- // so that it can be stolen from
- // OR: a push to the local task queue should be so cheap that this is
- // not even needed, but stealing is still possible
-
- private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)
-
- private val _taskStack = new ThreadLocal[TaskStack]()
-
- private[impl] trait Task extends Runnable {
- def executor: ExecutionContext
-
- // run the original callback (no dispatch)
- protected def task(): Unit
-
- // we implement Runnable to avoid creating
- // an extra object. run() runs ourselves with
- // a TaskStack pushed, and then runs any
- // other tasks that show up in the stack.
- final override def run() = {
- try {
- val taskStack = TaskStack(Stack[Task](this), executor)
- _taskStack set taskStack
- while (taskStack.stack.nonEmpty) {
- val next = taskStack.stack.pop()
- require(next.executor eq executor)
- try next.task() catch { case NonFatal(e) => executor reportFailure e }
- }
- } finally {
- _taskStack.remove()
- }
- }
-
- // send the task to the running executor.execute() via
- // _taskStack, or start a new executor.execute()
- def dispatch(force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
- case _ => executor.execute(this)
- }
- }
-
- private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
- extends Task {
- protected override def task() = {
- _taskStack.get.stack.elems = elems
- }
- }
-
- private[impl] def releaseStack(executor: ExecutionContext): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && stack.stack.nonEmpty =>
- val tasks = stack.stack.elems
- stack.stack.clear()
- _taskStack.remove()
- val release = new ReleaseTask(executor, tasks)
- release.dispatch(force=true)
- case null =>
- // do nothing - there is no local batching stack anymore
- case _ =>
- _taskStack.remove()
- }
-
- private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
- extends Task {
- private var value: Either[Throwable, T] = null
-
- protected override def task() = {
- require(value ne null) // dispatch(value) must be called before dispatch()
- onComplete(value)
- }
-
- def dispatch(value: Either[Throwable, T]): Unit = {
- this.value = value
- dispatch()
- }
+ val runnable = new PromiseCompletingRunnable(body)
+ executor.execute(runnable)
+ runnable.promise.future
}
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index c5060a2368..3ac34bef8a 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -11,11 +11,12 @@ package scala.concurrent.impl
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
-import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, TimeoutException, ExecutionException }
+import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
//import scala.util.continuations._
import scala.concurrent.util.Duration
import scala.util
import scala.annotation.tailrec
+import scala.util.control.NonFatal
//import scala.concurrent.NonDeterministic
@@ -24,6 +25,21 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
def future: this.type = this
}
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable {
+ // must be filled in before running it
+ var value: Either[Throwable, T] = null
+
+ override def run() = {
+ require(value ne null) // must set value to non-null before running!
+ try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
+ }
+
+ def executeWithValue(v: Either[Throwable, T]): Unit = {
+ require(value eq null) // can't complete it twice
+ value = v
+ executor.execute(this)
+ }
+}
object Promise {
@@ -94,10 +110,10 @@ object Promise {
val resolved = resolveEither(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = {
+ def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]]
+ val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
@@ -107,19 +123,19 @@ object Promise {
synchronized { notifyAll() } //Notify any evil blockers
}) match {
case null => false
- case cs if cs.isEmpty => true
- case cs => cs.foreach(c => c.dispatch(resolved)); true
+ case rs if rs.isEmpty => true
+ case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
- val bound = new Future.OnCompleteTask[T](executor, func)
+ val runnable = new CallbackRunnable[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]])
- case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
+ case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]])
+ case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
}
@@ -139,7 +155,7 @@ object Promise {
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get
- (new Future.OnCompleteTask(executor, func)).dispatch(completedAs)
+ (new CallbackRunnable(executor, func)).executeWithValue(completedAs)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
diff --git a/src/library/scala/util/control/NonFatal.scala b/src/library/scala/util/control/NonFatal.scala
index 9da2f63307..5137f0f2f5 100644
--- a/src/library/scala/util/control/NonFatal.scala
+++ b/src/library/scala/util/control/NonFatal.scala
@@ -23,16 +23,23 @@ package scala.util.control
* // dangerous stuff
* } catch {
* case NonFatal(e) => log.error(e, "Something not that bad.")
+ * // or
+ * case e if NonFatal(e) => log.error(e, "Something not that bad.")
* }
* }}}
*/
object NonFatal {
-
- def unapply(t: Throwable): Option[Throwable] = t match {
- case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError
- // VirtualMachineError includes OutOfMemoryError and other fatal errors
- case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => None
- case e ⇒ Some(e)
- }
-
+ /**
+ * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal
+ */
+ def apply(t: Throwable): Boolean = t match {
+ case _: StackOverflowError => true // StackOverflowError ok even though it is a VirtualMachineError
+ // VirtualMachineError includes OutOfMemoryError and other fatal errors
+ case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => false
+ case _ => true
+ }
+ /**
+ * Returns Some(t) if NonFatal(t) == true, otherwise None
+ */
+ def unapply(t: Throwable): Option[Throwable] = if (apply(t)) Some(t) else None
}
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala
index e5e01a5954..ca9ff5090f 100644
--- a/test/files/jvm/future-spec/FutureTests.scala
+++ b/test/files/jvm/future-spec/FutureTests.scala
@@ -10,21 +10,69 @@ import scala.runtime.NonLocalReturnControl
object FutureTests extends MinimalScalaTest {
-
+
/* some utils */
- def testAsync(s: String): Future[String] = s match {
+ def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match {
case "Hello" => future { "World" }
- case "Failure" => Promise.failed(new RuntimeException("Expected exception; to test fault-tolerance")).future
+ case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance"))
case "NoReply" => Promise[String]().future
}
val defaultTimeout = 5 seconds
/* future specification */
+
+ "A future with custom ExecutionContext" should {
+ "shouldHandleThrowables" in {
+ val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable]
+ implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), {
+ t =>
+ ms += t
+ })
+
+ class ThrowableTest(m: String) extends Throwable(m)
+
+ val f1 = future[Any] {
+ throw new ThrowableTest("test")
+ }
+
+ intercept[ThrowableTest] {
+ Await.result(f1, defaultTimeout)
+ }
+
+ val latch = new TestLatch
+ val f2 = future {
+ Await.ready(latch, 5 seconds)
+ "success"
+ }
+ val f3 = f2 map { s => s.toUpperCase }
+
+ f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
+ f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
+
+ latch.open()
+
+ Await.result(f2, defaultTimeout) mustBe ("success")
+
+ f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
+ f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
+
+ Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
+
+ val waiting = future {
+ Thread.sleep(1000)
+ }
+ Await.ready(waiting, 2000 millis)
+
+ ms.size mustBe (4)
+ //FIXME should check
+ }
+ }
- "A future" should {
-
+ "A future with global ExecutionContext" should {
+ import ExecutionContext.Implicits._
+
"compose with for-comprehensions" in {
def async(x: Int) = future { (x * 2).toString }
val future0 = future[Any] {
@@ -122,20 +170,20 @@ object FutureTests extends MinimalScalaTest {
val r = new IllegalStateException("recovered")
intercept[IllegalStateException] {
- val failed = Promise.failed[String](o).future recoverWith {
- case _ if false == true => Promise.successful("yay!").future
+ val failed = Future.failed[String](o) recoverWith {
+ case _ if false == true => Future.successful("yay!")
}
Await.result(failed, defaultTimeout)
} mustBe (o)
- val recovered = Promise.failed[String](o).future recoverWith {
- case _ => Promise.successful("yay!").future
+ val recovered = Future.failed[String](o) recoverWith {
+ case _ => Future.successful("yay!")
}
Await.result(recovered, defaultTimeout) mustBe ("yay!")
intercept[IllegalStateException] {
- val refailed = Promise.failed[String](o).future recoverWith {
- case _ => Promise.failed[String](r).future
+ val refailed = Future.failed[String](o) recoverWith {
+ case _ => Future.failed[String](r)
}
Await.result(refailed, defaultTimeout)
} mustBe (r)
@@ -164,7 +212,7 @@ object FutureTests extends MinimalScalaTest {
"firstCompletedOf" in {
def futures = Vector.fill[Future[Int]](10) {
Promise[Int]().future
- } :+ Promise.successful[Int](5).future
+ } :+ Future.successful[Int](5)
Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5)
@@ -186,21 +234,21 @@ object FutureTests extends MinimalScalaTest {
val timeout = 10000 millis
val f = new IllegalStateException("test")
intercept[IllegalStateException] {
- val failed = Promise.failed[String](f).future zip Promise.successful("foo").future
+ val failed = Future.failed[String](f) zip Future.successful("foo")
Await.result(failed, timeout)
} mustBe (f)
intercept[IllegalStateException] {
- val failed = Promise.successful("foo").future zip Promise.failed[String](f).future
+ val failed = Future.successful("foo") zip Future.failed[String](f)
Await.result(failed, timeout)
} mustBe (f)
intercept[IllegalStateException] {
- val failed = Promise.failed[String](f).future zip Promise.failed[String](f).future
+ val failed = Future.failed[String](f) zip Future.failed[String](f)
Await.result(failed, timeout)
} mustBe (f)
- val successful = Promise.successful("foo").future zip Promise.successful("foo").future
+ val successful = Future.successful("foo") zip Future.successful("foo")
Await.result(successful, timeout) mustBe (("foo", "foo"))
}
@@ -337,50 +385,6 @@ object FutureTests extends MinimalScalaTest {
Await.result(traversedIterator, defaultTimeout).sum mustBe (10000)
}
- "shouldHandleThrowables" in {
- val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable]
- implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), {
- t =>
- ms += t
- })
-
- class ThrowableTest(m: String) extends Throwable(m)
-
- val f1 = future[Any] {
- throw new ThrowableTest("test")
- }
-
- intercept[ThrowableTest] {
- Await.result(f1, defaultTimeout)
- }
-
- val latch = new TestLatch
- val f2 = future {
- Await.ready(latch, 5 seconds)
- "success"
- }
- val f3 = f2 map { s => s.toUpperCase }
-
- f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
- f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
-
- latch.open()
-
- Await.result(f2, defaultTimeout) mustBe ("success")
-
- f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
- f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
-
- Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
-
- val waiting = future {
- Thread.sleep(1000)
- }
- Await.ready(waiting, 2000 millis)
-
- ms.size mustBe (4)
- }
-
"shouldBlockUntilResult" in {
val latch = new TestLatch
diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala
index bf9d9b39d7..49bc642b57 100644
--- a/test/files/jvm/future-spec/PromiseTests.scala
+++ b/test/files/jvm/future-spec/PromiseTests.scala
@@ -10,7 +10,8 @@ import scala.runtime.NonLocalReturnControl
object PromiseTests extends MinimalScalaTest {
-
+ import ExecutionContext.Implicits._
+
val defaultTimeout = Inf
/* promise specification */
@@ -20,11 +21,13 @@ object PromiseTests extends MinimalScalaTest {
"not be completed" in {
val p = Promise()
p.future.isCompleted mustBe (false)
+ p.isCompleted mustBe (false)
}
"have no value" in {
val p = Promise()
p.future.value mustBe (None)
+ p.isCompleted mustBe (false)
}
"return supplied value on timeout" in {
@@ -45,14 +48,16 @@ object PromiseTests extends MinimalScalaTest {
"A successful Promise" should {
val result = "test value"
- val future = Promise[String]().complete(Right(result)).future
- futureWithResult(_(future, result))
+ val promise = Promise[String]().complete(Right(result))
+ promise.isCompleted mustBe (true)
+ futureWithResult(_(promise.future, result))
}
"A failed Promise" should {
val message = "Expected Exception"
- val future = Promise[String]().complete(Left(new RuntimeException(message))).future
- futureWithException[RuntimeException](_(future, message))
+ val promise = Promise[String]().complete(Left(new RuntimeException(message)))
+ promise.isCompleted mustBe (true)
+ futureWithException[RuntimeException](_(promise.future, message))
}
"An interrupted Promise" should {
diff --git a/test/files/jvm/non-fatal-tests.scala b/test/files/jvm/non-fatal-tests.scala
new file mode 100644
index 0000000000..471a9d227a
--- /dev/null
+++ b/test/files/jvm/non-fatal-tests.scala
@@ -0,0 +1,47 @@
+import scala.util.control.NonFatal
+
+trait NonFatalTests {
+
+ //NonFatals
+ val nonFatals: Seq[Throwable] =
+ Seq(new StackOverflowError,
+ new RuntimeException,
+ new Exception,
+ new Throwable)
+
+ //Fatals
+ val fatals: Seq[Throwable] =
+ Seq(new InterruptedException,
+ new OutOfMemoryError,
+ new LinkageError,
+ new VirtualMachineError {},
+ new Throwable with scala.util.control.ControlThrowable,
+ new NotImplementedError)
+
+ def testFatalsUsingApply(): Unit = {
+ fatals foreach { t => assert(NonFatal(t) == false) }
+ }
+
+ def testNonFatalsUsingApply(): Unit = {
+ nonFatals foreach { t => assert(NonFatal(t) == true) }
+ }
+
+ def testFatalsUsingUnapply(): Unit = {
+ fatals foreach { t => assert(NonFatal.unapply(t).isEmpty) }
+ }
+
+ def testNonFatalsUsingUnapply(): Unit = {
+ nonFatals foreach { t => assert(NonFatal.unapply(t).isDefined) }
+ }
+
+ testFatalsUsingApply()
+ testNonFatalsUsingApply()
+ testFatalsUsingUnapply()
+ testNonFatalsUsingUnapply()
+}
+
+object Test
+extends App
+with NonFatalTests {
+ System.exit(0)
+} \ No newline at end of file
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index 407027f904..5c9c71f3f8 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -3,22 +3,19 @@ import scala.concurrent.{
Promise,
TimeoutException,
SyncVar,
- ExecutionException
+ ExecutionException,
+ ExecutionContext
}
-import scala.concurrent.future
-import scala.concurrent.promise
-import scala.concurrent.blocking
+import scala.concurrent.{ future, promise, blocking }
import scala.util.{ Try, Success, Failure }
-
import scala.concurrent.util.Duration
-
trait TestBase {
def once(body: (() => Unit) => Unit) {
val sv = new SyncVar[Boolean]
body(() => sv put true)
- sv.take()
+ sv.take(2000)
}
// def assert(cond: => Boolean) {
@@ -33,7 +30,8 @@ trait TestBase {
trait FutureCallbacks extends TestBase {
-
+ import ExecutionContext.Implicits._
+
def testOnSuccess(): Unit = once {
done =>
var x = 0
@@ -147,6 +145,7 @@ trait FutureCallbacks extends TestBase {
trait FutureCombinators extends TestBase {
+ import ExecutionContext.Implicits._
def testMapSuccess(): Unit = once {
done =>
@@ -591,7 +590,8 @@ trait FutureCombinators extends TestBase {
trait FutureProjections extends TestBase {
-
+ import ExecutionContext.Implicits._
+
def testFailedFailureOnComplete(): Unit = once {
done =>
val cause = new RuntimeException
@@ -673,7 +673,8 @@ trait FutureProjections extends TestBase {
trait Blocking extends TestBase {
-
+ import ExecutionContext.Implicits._
+
def testAwaitSuccess(): Unit = once {
done =>
val f = future { 0 }
@@ -702,8 +703,67 @@ trait Blocking extends TestBase {
}
+trait BlockContexts extends TestBase {
+ import ExecutionContext.Implicits._
+ import scala.concurrent.{ Await, Awaitable, BlockContext }
+
+ private def getBlockContext(body: => BlockContext): BlockContext = {
+ blocking(Future { body }, Duration(500, "ms"))
+ }
+
+ // test outside of an ExecutionContext
+ def testDefaultOutsideFuture(): Unit = {
+ val bc = BlockContext.current
+ assert(bc.getClass.getName.contains("DefaultBlockContext"))
+ }
+
+ // test BlockContext in our default ExecutionContext
+ def testDefaultFJP(): Unit = {
+ val bc = getBlockContext(BlockContext.current)
+ assert(bc.isInstanceOf[scala.concurrent.forkjoin.ForkJoinWorkerThread])
+ }
+
+ // test BlockContext inside BlockContext.withBlockContext
+ def testPushCustom(): Unit = {
+ val orig = BlockContext.current
+ val customBC = new BlockContext() {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ orig.internalBlockingCall(awaitable, atMost)
+ }
+
+ val bc = getBlockContext({
+ BlockContext.withBlockContext(customBC) {
+ BlockContext.current
+ }
+ })
+
+ assert(bc eq customBC)
+ }
+
+ // test BlockContext after a BlockContext.push
+ def testPopCustom(): Unit = {
+ val orig = BlockContext.current
+ val customBC = new BlockContext() {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ orig.internalBlockingCall(awaitable, atMost)
+ }
+
+ val bc = getBlockContext({
+ BlockContext.withBlockContext(customBC) {}
+ BlockContext.current
+ })
+
+ assert(bc ne customBC)
+ }
+
+ testDefaultOutsideFuture()
+ testDefaultFJP()
+ testPushCustom()
+ testPopCustom()
+}
trait Promises extends TestBase {
+ import ExecutionContext.Implicits._
def testSuccess(): Unit = once {
done =>
@@ -730,7 +790,8 @@ trait Promises extends TestBase {
trait Exceptions extends TestBase {
-
+ import ExecutionContext.Implicits._
+
}
// trait TryEitherExtractor extends TestBase {
@@ -811,7 +872,7 @@ trait Exceptions extends TestBase {
trait CustomExecutionContext extends TestBase {
import scala.concurrent.{ ExecutionContext, Awaitable }
- def defaultEC = ExecutionContext.defaultExecutionContext
+ def defaultEC = ExecutionContext.global
val inEC = new java.lang.ThreadLocal[Int]() {
override def initialValue = 0
@@ -826,7 +887,7 @@ trait CustomExecutionContext extends TestBase {
val _count = new java.util.concurrent.atomic.AtomicInteger(0)
def count = _count.get
- def delegate = ExecutionContext.defaultExecutionContext
+ def delegate = ExecutionContext.global
override def execute(runnable: Runnable) = {
_count.incrementAndGet()
@@ -843,9 +904,6 @@ trait CustomExecutionContext extends TestBase {
delegate.execute(wrapper)
}
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- delegate.internalBlockingCall(awaitable, atMost)
-
override def reportFailure(t: Throwable): Unit = {
System.err.println("Failure: " + t.getClass.getSimpleName + ": " + t.getMessage)
delegate.reportFailure(t)
@@ -860,14 +918,16 @@ trait CustomExecutionContext extends TestBase {
def testOnSuccessCustomEC(): Unit = {
val count = countExecs { implicit ec =>
- once { done =>
- val f = future({ assertNoEC() })(defaultEC)
- f onSuccess {
- case _ =>
- assertEC()
+ blocking {
+ once { done =>
+ val f = future({ assertNoEC() })(defaultEC)
+ f onSuccess {
+ case _ =>
+ assertEC()
done()
+ }
+ assertNoEC()
}
- assertNoEC()
}
}
@@ -877,12 +937,14 @@ trait CustomExecutionContext extends TestBase {
def testKeptPromiseCustomEC(): Unit = {
val count = countExecs { implicit ec =>
- once { done =>
- val f = Promise.successful(10).future
- f onSuccess {
- case _ =>
- assertEC()
+ blocking {
+ once { done =>
+ val f = Promise.successful(10).future
+ f onSuccess {
+ case _ =>
+ assertEC()
done()
+ }
}
}
}
@@ -893,28 +955,30 @@ trait CustomExecutionContext extends TestBase {
def testCallbackChainCustomEC(): Unit = {
val count = countExecs { implicit ec =>
- once { done =>
- assertNoEC()
- val addOne = { x: Int => assertEC(); x + 1 }
- val f = Promise.successful(10).future
- f.map(addOne).filter { x =>
- assertEC()
- x == 11
- } flatMap { x =>
- Promise.successful(x + 1).future.map(addOne).map(addOne)
- } onComplete {
- case Left(t) =>
- try {
- throw new AssertionError("error in test: " + t.getMessage, t)
- } finally {
+ blocking {
+ once { done =>
+ assertNoEC()
+ val addOne = { x: Int => assertEC(); x + 1 }
+ val f = Promise.successful(10).future
+ f.map(addOne).filter { x =>
+ assertEC()
+ x == 11
+ } flatMap { x =>
+ Promise.successful(x + 1).future.map(addOne).map(addOne)
+ } onComplete {
+ case Left(t) =>
+ try {
+ throw new AssertionError("error in test: " + t.getMessage, t)
+ } finally {
+ done()
+ }
+ case Right(x) =>
+ assertEC()
+ assert(x == 14)
done()
- }
- case Right(x) =>
- assertEC()
- assert(x == 14)
- done()
+ }
+ assertNoEC()
}
- assertNoEC()
}
}
@@ -934,6 +998,7 @@ with FutureCallbacks
with FutureCombinators
with FutureProjections
with Promises
+with BlockContexts
with Exceptions
// with TryEitherExtractor
with CustomExecutionContext