diff options
-rw-r--r-- | src/library/scala/collection/parallel/TaskSupport.scala | 2 | ||||
-rw-r--r-- | src/library/scala/concurrent/BlockContext.scala | 81 | ||||
-rw-r--r-- | src/library/scala/concurrent/ConcurrentPackageObject.scala | 34 | ||||
-rw-r--r-- | src/library/scala/concurrent/DelayedLazyVal.scala | 5 | ||||
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 74 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 29 | ||||
-rw-r--r-- | src/library/scala/concurrent/Promise.scala | 9 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 61 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Future.scala | 102 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 34 | ||||
-rw-r--r-- | src/library/scala/util/control/NonFatal.scala | 23 | ||||
-rw-r--r-- | test/files/jvm/future-spec/FutureTests.scala | 124 | ||||
-rw-r--r-- | test/files/jvm/future-spec/PromiseTests.scala | 15 | ||||
-rw-r--r-- | test/files/jvm/non-fatal-tests.scala | 47 | ||||
-rw-r--r-- | test/files/jvm/scala-concurrent-tck.scala | 159 |
15 files changed, 478 insertions, 321 deletions
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala index 2eaa861429..3d27f619bb 100644 --- a/src/library/scala/collection/parallel/TaskSupport.scala +++ b/src/library/scala/collection/parallel/TaskSupport.scala @@ -48,7 +48,7 @@ extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks * By default, parallel collections are parametrized with this task support object, so parallel collections * share the same execution context backend as the rest of the `scala.concurrent` package. */ -class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.defaultExecutionContext) +class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global) extends TaskSupport with ExecutionContextTasks diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala new file mode 100644 index 0000000000..a5b878c546 --- /dev/null +++ b/src/library/scala/concurrent/BlockContext.scala @@ -0,0 +1,81 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import java.lang.Thread +import scala.concurrent.util.Duration + +/** + * A context to be notified by `scala.concurrent.blocking()` when + * a thread is about to block. In effect this trait provides + * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()` + * locates an instance of `BlockContext` by first looking for one + * provided through `BlockContext.withBlockContext()` and failing that, + * checking whether `Thread.currentThread` is an instance of `BlockContext`. + * So a thread pool can have its `java.lang.Thread` instances implement + * `BlockContext`. There's a default `BlockContext` used if the thread + * doesn't implement `BlockContext`. + * + * Typically, you'll want to chain to the previous `BlockContext`, + * like this: + * {{{ + * val oldContext = BlockContext.current + * val myContext = new BlockContext { + * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { + * // you'd have code here doing whatever you need to do + * // when the thread is about to block. + * // Then you'd chain to the previous context: + * oldContext.internalBlockingCall(awaitable, atMost) + * } + * } + * BlockContext.withBlockContext(myContext) { + * // then this block runs with myContext as the handler + * // for scala.concurrent.blocking + * } + * }}} + */ +trait BlockContext { + + /** Used internally by the framework; blocks execution for at most + * `atMost` time while waiting for an `awaitable` object to become ready. + * + * Clients should use `scala.concurrent.blocking` instead; this is + * the implementation of `scala.concurrent.blocking`, generally + * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`. + */ + def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T +} + +object BlockContext { + private object DefaultBlockContext extends BlockContext { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = + awaitable.result(atMost)(Await.canAwaitEvidence) + } + + private val contextLocal = new ThreadLocal[BlockContext]() { + override def initialValue = Thread.currentThread match { + case ctx: BlockContext => ctx + case _ => DefaultBlockContext + } + } + + /** Obtain the current thread's current `BlockContext`. */ + def current: BlockContext = contextLocal.get + + /** Pushes a current `BlockContext` while executing `body`. */ + def withBlockContext[T](blockContext: BlockContext)(body: => T): T = { + val old = contextLocal.get + try { + contextLocal.set(blockContext) + body + } finally { + contextLocal.set(old) + } + } +} diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala index 330a2f0e25..86a86966ef 100644 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala @@ -17,23 +17,6 @@ import language.implicitConversions /** This package object contains primitives for concurrent and parallel programming. */ abstract class ConcurrentPackageObject { - /** A global execution environment for executing lightweight tasks. - */ - lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor) - - val currentExecutionContext = new ThreadLocal[ExecutionContext] - - val handledFutureException: PartialFunction[Throwable, Throwable] = { - case t: Throwable if isFutureThrowable(t) => t - } - - // TODO rename appropriately and make public - private[concurrent] def isFutureThrowable(t: Throwable) = t match { - case e: Error => false - case t: scala.util.control.ControlThrowable => false - case i: InterruptedException => false - case _ => true - } /* concurrency constructs */ @@ -46,8 +29,7 @@ abstract class ConcurrentPackageObject { * @param execctx the execution context on which the future is run * @return the `Future` holding the result of the computation */ - def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] = - Future[T](body) + def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body) /** Creates a promise object which can be completed with a value. * @@ -55,8 +37,7 @@ abstract class ConcurrentPackageObject { * @param execctx the execution context on which the promise is created on * @return the newly created `Promise` object */ - def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] = - Promise[T]() + def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]() /** Used to block on a piece of code which potentially blocks. * @@ -67,8 +48,7 @@ abstract class ConcurrentPackageObject { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](body: =>T): T = - blocking(impl.Future.body2awaitable(body), Duration.Inf) + def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf) /** Blocks on an awaitable object. * @@ -79,12 +59,8 @@ abstract class ConcurrentPackageObject { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { - currentExecutionContext.get match { - case null => awaitable.result(atMost)(Await.canAwaitEvidence) - case ec => ec.internalBlockingCall(awaitable, atMost) - } - } + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = + BlockContext.current.internalBlockingCall(awaitable, atMost) @inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x) } diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala index 96a66d83b6..91e41748f5 100644 --- a/src/library/scala/concurrent/DelayedLazyVal.scala +++ b/src/library/scala/concurrent/DelayedLazyVal.scala @@ -23,7 +23,7 @@ package scala.concurrent * @author Paul Phillips * @version 2.8 */ -class DelayedLazyVal[T](f: () => T, body: => Unit) { +class DelayedLazyVal[T](f: () => T, body: => Unit){ @volatile private[this] var _isDone = false private[this] lazy val complete = f() @@ -39,7 +39,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) { */ def apply(): T = if (isDone) complete else f() - // TODO replace with scala.concurrent.future { ... } + // FIXME need to take ExecutionContext in constructor + import ExecutionContext.Implicits.global future { body _isDone = true diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 436a17a33b..b486e5269e 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -9,58 +9,80 @@ package scala.concurrent - -import java.util.concurrent.atomic.{ AtomicInteger } -import java.util.concurrent.{ Executors, Future => JFuture, Callable, ExecutorService, Executor } +import java.util.concurrent.{ ExecutorService, Executor } import scala.concurrent.util.Duration -import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } -import scala.collection.generic.CanBuildFrom -import collection._ - - +import scala.annotation.implicitNotFound +/** + * An `ExecutionContext` is an abstraction over an entity that can execute program logic. + */ +@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global") trait ExecutionContext { /** Runs a block of code on this execution context. */ def execute(runnable: Runnable): Unit - /** Used internally by the framework - blocks execution for at most `atMost` time while waiting - * for an `awaitable` object to become ready. - * - * Clients should use `scala.concurrent.blocking` instead. - */ - def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T - /** Reports that an asynchronous computation failed. */ def reportFailure(t: Throwable): Unit } +/** + * Union interface since Java does not support union types + */ +trait ExecutionContextExecutor extends ExecutionContext with Executor + +/** + * Union interface since Java does not support union types + */ +trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService + /** Contains factory methods for creating execution contexts. */ object ExecutionContext { - - implicit def defaultExecutionContext: ExecutionContext = scala.concurrent.defaultExecutionContext - + /** + * The `ExecutionContext` associated with the current `Thread` + */ + val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set + + /** + * This is the explicit global ExecutionContext, + * call this when you want to provide the global ExecutionContext explicitly + */ + def global: ExecutionContextExecutor = Implicits.global + + object Implicits { + /** + * This is the implicit global ExecutionContext, + * import this when you want to provide the global ExecutionContext implicitly + */ + implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor) + } + /** Creates an `ExecutionContext` from the given `ExecutorService`. */ - def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService = + def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService = impl.ExecutionContextImpl.fromExecutorService(e, reporter) + + /** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter. + */ + def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter) /** Creates an `ExecutionContext` from the given `Executor`. */ - def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = + def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(e, reporter) + + /** Creates an `ExecutionContext` from the given `Executor` with the default Reporter. + */ + def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter) - def defaultReporter: Throwable => Unit = { - // re-throwing `Error`s here causes an exception handling test to fail. - //case e: Error => throw e - case t => t.printStackTrace() - } - + /** The default reporter simply prints the stack trace of the `Throwable` to System.err. + */ + def defaultReporter: Throwable => Unit = { case t => t.printStackTrace() } } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 2de0c57253..75a83d6ef8 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -8,7 +8,7 @@ package scala.concurrent - +import language.higherKinds import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } @@ -23,11 +23,9 @@ import scala.Option import scala.util.{Try, Success, Failure} import scala.annotation.tailrec -import scala.collection.mutable.Stack import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom import scala.reflect.ClassTag -import language.higherKinds @@ -138,7 +136,7 @@ trait Future[+T] extends Awaitable[T] { * $callbackInContext */ def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { - case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t) + case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t) case _ => }(executor) @@ -580,6 +578,20 @@ object Future { classOf[Double] -> classOf[jl.Double], classOf[Unit] -> classOf[scala.runtime.BoxedUnit] ) + + /** Creates an already completed Future with the specified exception. + * + * @tparam T the type of the value in the future + * @return the newly created `Future` object + */ + def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future + + /** Creates an already completed Future with the specified result. + * + * @tparam T the type of the value in the future + * @return the newly created `Future` object + */ + def successful[T](result: T): Future[T] = Promise.successful(result).future /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. * @@ -710,5 +722,12 @@ object Future { } } - +/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext` + * wraps a callback provided to `Future.onComplete`. + * All callbacks provided to a `Future` end up going through `onComplete`, so this allows an + * `ExecutionContext` to special-case callbacks that were executed by `Future` if desired. + */ +trait OnCompleteRunnable { + self: Runnable => +} diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 578642966f..5d1b2c00b6 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -34,6 +34,15 @@ trait Promise[T] { */ def future: Future[T] + /** Returns whether the promise has already been completed with + * a value or an exception. + * + * $nonDeterministic + * + * @return `true` if the promise is already completed, `false` otherwise + */ + def isCompleted: Boolean + /** Completes the promise with either an exception or a value. * * @param result Either the value or the exception to complete the promise with. diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 4c6347dce0..551a444425 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -13,34 +13,34 @@ package scala.concurrent.impl import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit } import java.util.Collection import scala.concurrent.forkjoin._ -import scala.concurrent.{ ExecutionContext, Awaitable } +import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService } import scala.concurrent.util.Duration import scala.util.control.NonFatal -private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor { +private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor { val executor: Executor = es match { case null => createExecutorService case some => some } - - // to ensure that the current execution context thread local is properly set - def executorsThreadFactory = new ThreadFactory { - def newThread(r: Runnable) = new Thread(new Runnable { - override def run() { - scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this) - r.run() - } - }) - } - - // to ensure that the current execution context thread local is properly set + + // Implement BlockContext on FJP threads def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory { - def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) { - override def onStart() { - scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this) + def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) with BlockContext { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { + var result: T = null.asInstanceOf[T] + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { + @volatile var isdone = false + def block(): Boolean = { + result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here? + isdone = true + true + } + def isReleasable = isdone + }) + result } } } @@ -68,7 +68,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: case NonFatal(t) => System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool") t.printStackTrace(System.err) - Executors.newCachedThreadPool(executorsThreadFactory) //FIXME use the same desired parallelism here too? + Executors.newCachedThreadPool() //FIXME use the same desired parallelism here too? } def execute(runnable: Runnable): Unit = executor match { @@ -84,27 +84,6 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: case generic => generic execute runnable } - def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { - Future.releaseStack(this) - - executor match { - case fj: ForkJoinPool => - var result: T = null.asInstanceOf[T] - ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { - @volatile var isdone = false - def block(): Boolean = { - result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here? - isdone = true - true - } - def isReleasable = isdone - }) - result - case _ => - awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) - } - } - def reportFailure(t: Throwable) = reporter(t) } @@ -112,8 +91,8 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: private[concurrent] object ExecutionContextImpl { def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter) - def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService = - new ExecutionContextImpl(es, reporter) with ExecutorService { + def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService = + new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService { final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService] override def execute(command: Runnable) = executor.execute(command) override def shutdown() { asExecutorService.shutdown() } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 8012ea6a93..073e6c4c9f 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -46,11 +46,19 @@ private[concurrent] object Future { def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c - private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T) - extends Task { + // TODO rename appropriately and make public + private[concurrent] def isFutureThrowable(t: Throwable) = t match { + case e: Error => false + case t: scala.util.control.ControlThrowable => false + case i: InterruptedException => false + case _ => true + } + + private[impl] class PromiseCompletingRunnable[T](body: => T) + extends Runnable { val promise = new Promise.DefaultPromise[T]() - protected override def task() = { + override def run() = { promise complete { try Right(body) catch { case NonFatal(e) => @@ -63,90 +71,8 @@ private[concurrent] object Future { } def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { - val task = new PromiseCompletingTask(executor, body) - task.dispatch() - - task.promise.future - } - - private[impl] val throwableId: Throwable => Throwable = identity _ - - // an optimization for batching futures - // TODO we should replace this with a public queue, - // so that it can be stolen from - // OR: a push to the local task queue should be so cheap that this is - // not even needed, but stealing is still possible - - private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext) - - private val _taskStack = new ThreadLocal[TaskStack]() - - private[impl] trait Task extends Runnable { - def executor: ExecutionContext - - // run the original callback (no dispatch) - protected def task(): Unit - - // we implement Runnable to avoid creating - // an extra object. run() runs ourselves with - // a TaskStack pushed, and then runs any - // other tasks that show up in the stack. - final override def run() = { - try { - val taskStack = TaskStack(Stack[Task](this), executor) - _taskStack set taskStack - while (taskStack.stack.nonEmpty) { - val next = taskStack.stack.pop() - require(next.executor eq executor) - try next.task() catch { case NonFatal(e) => executor reportFailure e } - } - } finally { - _taskStack.remove() - } - } - - // send the task to the running executor.execute() via - // _taskStack, or start a new executor.execute() - def dispatch(force: Boolean = false): Unit = - _taskStack.get match { - case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this - case _ => executor.execute(this) - } - } - - private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task]) - extends Task { - protected override def task() = { - _taskStack.get.stack.elems = elems - } - } - - private[impl] def releaseStack(executor: ExecutionContext): Unit = - _taskStack.get match { - case stack if (stack ne null) && stack.stack.nonEmpty => - val tasks = stack.stack.elems - stack.stack.clear() - _taskStack.remove() - val release = new ReleaseTask(executor, tasks) - release.dispatch(force=true) - case null => - // do nothing - there is no local batching stack anymore - case _ => - _taskStack.remove() - } - - private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) - extends Task { - private var value: Either[Throwable, T] = null - - protected override def task() = { - require(value ne null) // dispatch(value) must be called before dispatch() - onComplete(value) - } - - def dispatch(value: Either[Throwable, T]): Unit = { - this.value = value - dispatch() - } + val runnable = new PromiseCompletingRunnable(body) + executor.execute(runnable) + runnable.promise.future } } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index c5060a2368..3ac34bef8a 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -11,11 +11,12 @@ package scala.concurrent.impl import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } -import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, TimeoutException, ExecutionException } +import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException } //import scala.util.continuations._ import scala.concurrent.util.Duration import scala.util import scala.annotation.tailrec +import scala.util.control.NonFatal //import scala.concurrent.NonDeterministic @@ -24,6 +25,21 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu def future: this.type = this } +private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable { + // must be filled in before running it + var value: Either[Throwable, T] = null + + override def run() = { + require(value ne null) // must set value to non-null before running! + try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } + } + + def executeWithValue(v: Either[Throwable, T]): Unit = { + require(value eq null) // can't complete it twice + value = v + executor.execute(this) + } +} object Promise { @@ -94,10 +110,10 @@ object Promise { val resolved = resolveEither(value) (try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = { + def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = { getState match { case raw: List[_] => - val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]] + val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] if (updateState(cur, v)) cur else tryComplete(v) case _ => null } @@ -107,19 +123,19 @@ object Promise { synchronized { notifyAll() } //Notify any evil blockers }) match { case null => false - case cs if cs.isEmpty => true - case cs => cs.foreach(c => c.dispatch(resolved)); true + case rs if rs.isEmpty => true + case rs => rs.foreach(r => r.executeWithValue(resolved)); true } } def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { - val bound = new Future.OnCompleteTask[T](executor, func) + val runnable = new CallbackRunnable[T](executor, func) @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { - case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]]) - case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback() + case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]]) + case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() } @@ -139,7 +155,7 @@ object Promise { def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get - (new Future.OnCompleteTask(executor, func)).dispatch(completedAs) + (new CallbackRunnable(executor, func)).executeWithValue(completedAs) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this diff --git a/src/library/scala/util/control/NonFatal.scala b/src/library/scala/util/control/NonFatal.scala index 9da2f63307..5137f0f2f5 100644 --- a/src/library/scala/util/control/NonFatal.scala +++ b/src/library/scala/util/control/NonFatal.scala @@ -23,16 +23,23 @@ package scala.util.control * // dangerous stuff * } catch { * case NonFatal(e) => log.error(e, "Something not that bad.") + * // or + * case e if NonFatal(e) => log.error(e, "Something not that bad.") * } * }}} */ object NonFatal { - - def unapply(t: Throwable): Option[Throwable] = t match { - case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError - // VirtualMachineError includes OutOfMemoryError and other fatal errors - case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => None - case e ⇒ Some(e) - } - + /** + * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal + */ + def apply(t: Throwable): Boolean = t match { + case _: StackOverflowError => true // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => false + case _ => true + } + /** + * Returns Some(t) if NonFatal(t) == true, otherwise None + */ + def unapply(t: Throwable): Option[Throwable] = if (apply(t)) Some(t) else None } diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index e5e01a5954..ca9ff5090f 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -10,21 +10,69 @@ import scala.runtime.NonLocalReturnControl object FutureTests extends MinimalScalaTest { - + /* some utils */ - def testAsync(s: String): Future[String] = s match { + def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match { case "Hello" => future { "World" } - case "Failure" => Promise.failed(new RuntimeException("Expected exception; to test fault-tolerance")).future + case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance")) case "NoReply" => Promise[String]().future } val defaultTimeout = 5 seconds /* future specification */ + + "A future with custom ExecutionContext" should { + "shouldHandleThrowables" in { + val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable] + implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), { + t => + ms += t + }) + + class ThrowableTest(m: String) extends Throwable(m) + + val f1 = future[Any] { + throw new ThrowableTest("test") + } + + intercept[ThrowableTest] { + Await.result(f1, defaultTimeout) + } + + val latch = new TestLatch + val f2 = future { + Await.ready(latch, 5 seconds) + "success" + } + val f3 = f2 map { s => s.toUpperCase } + + f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") } + + latch.open() + + Await.result(f2, defaultTimeout) mustBe ("success") + + f2 foreach { _ => throw new ThrowableTest("current thread foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") } + + Await.result(f3, defaultTimeout) mustBe ("SUCCESS") + + val waiting = future { + Thread.sleep(1000) + } + Await.ready(waiting, 2000 millis) + + ms.size mustBe (4) + //FIXME should check + } + } - "A future" should { - + "A future with global ExecutionContext" should { + import ExecutionContext.Implicits._ + "compose with for-comprehensions" in { def async(x: Int) = future { (x * 2).toString } val future0 = future[Any] { @@ -122,20 +170,20 @@ object FutureTests extends MinimalScalaTest { val r = new IllegalStateException("recovered") intercept[IllegalStateException] { - val failed = Promise.failed[String](o).future recoverWith { - case _ if false == true => Promise.successful("yay!").future + val failed = Future.failed[String](o) recoverWith { + case _ if false == true => Future.successful("yay!") } Await.result(failed, defaultTimeout) } mustBe (o) - val recovered = Promise.failed[String](o).future recoverWith { - case _ => Promise.successful("yay!").future + val recovered = Future.failed[String](o) recoverWith { + case _ => Future.successful("yay!") } Await.result(recovered, defaultTimeout) mustBe ("yay!") intercept[IllegalStateException] { - val refailed = Promise.failed[String](o).future recoverWith { - case _ => Promise.failed[String](r).future + val refailed = Future.failed[String](o) recoverWith { + case _ => Future.failed[String](r) } Await.result(refailed, defaultTimeout) } mustBe (r) @@ -164,7 +212,7 @@ object FutureTests extends MinimalScalaTest { "firstCompletedOf" in { def futures = Vector.fill[Future[Int]](10) { Promise[Int]().future - } :+ Promise.successful[Int](5).future + } :+ Future.successful[Int](5) Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5) Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5) @@ -186,21 +234,21 @@ object FutureTests extends MinimalScalaTest { val timeout = 10000 millis val f = new IllegalStateException("test") intercept[IllegalStateException] { - val failed = Promise.failed[String](f).future zip Promise.successful("foo").future + val failed = Future.failed[String](f) zip Future.successful("foo") Await.result(failed, timeout) } mustBe (f) intercept[IllegalStateException] { - val failed = Promise.successful("foo").future zip Promise.failed[String](f).future + val failed = Future.successful("foo") zip Future.failed[String](f) Await.result(failed, timeout) } mustBe (f) intercept[IllegalStateException] { - val failed = Promise.failed[String](f).future zip Promise.failed[String](f).future + val failed = Future.failed[String](f) zip Future.failed[String](f) Await.result(failed, timeout) } mustBe (f) - val successful = Promise.successful("foo").future zip Promise.successful("foo").future + val successful = Future.successful("foo") zip Future.successful("foo") Await.result(successful, timeout) mustBe (("foo", "foo")) } @@ -337,50 +385,6 @@ object FutureTests extends MinimalScalaTest { Await.result(traversedIterator, defaultTimeout).sum mustBe (10000) } - "shouldHandleThrowables" in { - val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable] - implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new scala.concurrent.forkjoin.ForkJoinPool(), { - t => - ms += t - }) - - class ThrowableTest(m: String) extends Throwable(m) - - val f1 = future[Any] { - throw new ThrowableTest("test") - } - - intercept[ThrowableTest] { - Await.result(f1, defaultTimeout) - } - - val latch = new TestLatch - val f2 = future { - Await.ready(latch, 5 seconds) - "success" - } - val f3 = f2 map { s => s.toUpperCase } - - f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") } - f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") } - - latch.open() - - Await.result(f2, defaultTimeout) mustBe ("success") - - f2 foreach { _ => throw new ThrowableTest("current thread foreach") } - f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") } - - Await.result(f3, defaultTimeout) mustBe ("SUCCESS") - - val waiting = future { - Thread.sleep(1000) - } - Await.ready(waiting, 2000 millis) - - ms.size mustBe (4) - } - "shouldBlockUntilResult" in { val latch = new TestLatch diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala index bf9d9b39d7..49bc642b57 100644 --- a/test/files/jvm/future-spec/PromiseTests.scala +++ b/test/files/jvm/future-spec/PromiseTests.scala @@ -10,7 +10,8 @@ import scala.runtime.NonLocalReturnControl object PromiseTests extends MinimalScalaTest { - + import ExecutionContext.Implicits._ + val defaultTimeout = Inf /* promise specification */ @@ -20,11 +21,13 @@ object PromiseTests extends MinimalScalaTest { "not be completed" in { val p = Promise() p.future.isCompleted mustBe (false) + p.isCompleted mustBe (false) } "have no value" in { val p = Promise() p.future.value mustBe (None) + p.isCompleted mustBe (false) } "return supplied value on timeout" in { @@ -45,14 +48,16 @@ object PromiseTests extends MinimalScalaTest { "A successful Promise" should { val result = "test value" - val future = Promise[String]().complete(Right(result)).future - futureWithResult(_(future, result)) + val promise = Promise[String]().complete(Right(result)) + promise.isCompleted mustBe (true) + futureWithResult(_(promise.future, result)) } "A failed Promise" should { val message = "Expected Exception" - val future = Promise[String]().complete(Left(new RuntimeException(message))).future - futureWithException[RuntimeException](_(future, message)) + val promise = Promise[String]().complete(Left(new RuntimeException(message))) + promise.isCompleted mustBe (true) + futureWithException[RuntimeException](_(promise.future, message)) } "An interrupted Promise" should { diff --git a/test/files/jvm/non-fatal-tests.scala b/test/files/jvm/non-fatal-tests.scala new file mode 100644 index 0000000000..471a9d227a --- /dev/null +++ b/test/files/jvm/non-fatal-tests.scala @@ -0,0 +1,47 @@ +import scala.util.control.NonFatal + +trait NonFatalTests { + + //NonFatals + val nonFatals: Seq[Throwable] = + Seq(new StackOverflowError, + new RuntimeException, + new Exception, + new Throwable) + + //Fatals + val fatals: Seq[Throwable] = + Seq(new InterruptedException, + new OutOfMemoryError, + new LinkageError, + new VirtualMachineError {}, + new Throwable with scala.util.control.ControlThrowable, + new NotImplementedError) + + def testFatalsUsingApply(): Unit = { + fatals foreach { t => assert(NonFatal(t) == false) } + } + + def testNonFatalsUsingApply(): Unit = { + nonFatals foreach { t => assert(NonFatal(t) == true) } + } + + def testFatalsUsingUnapply(): Unit = { + fatals foreach { t => assert(NonFatal.unapply(t).isEmpty) } + } + + def testNonFatalsUsingUnapply(): Unit = { + nonFatals foreach { t => assert(NonFatal.unapply(t).isDefined) } + } + + testFatalsUsingApply() + testNonFatalsUsingApply() + testFatalsUsingUnapply() + testNonFatalsUsingUnapply() +} + +object Test +extends App +with NonFatalTests { + System.exit(0) +}
\ No newline at end of file diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 407027f904..5c9c71f3f8 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -3,22 +3,19 @@ import scala.concurrent.{ Promise, TimeoutException, SyncVar, - ExecutionException + ExecutionException, + ExecutionContext } -import scala.concurrent.future -import scala.concurrent.promise -import scala.concurrent.blocking +import scala.concurrent.{ future, promise, blocking } import scala.util.{ Try, Success, Failure } - import scala.concurrent.util.Duration - trait TestBase { def once(body: (() => Unit) => Unit) { val sv = new SyncVar[Boolean] body(() => sv put true) - sv.take() + sv.take(2000) } // def assert(cond: => Boolean) { @@ -33,7 +30,8 @@ trait TestBase { trait FutureCallbacks extends TestBase { - + import ExecutionContext.Implicits._ + def testOnSuccess(): Unit = once { done => var x = 0 @@ -147,6 +145,7 @@ trait FutureCallbacks extends TestBase { trait FutureCombinators extends TestBase { + import ExecutionContext.Implicits._ def testMapSuccess(): Unit = once { done => @@ -591,7 +590,8 @@ trait FutureCombinators extends TestBase { trait FutureProjections extends TestBase { - + import ExecutionContext.Implicits._ + def testFailedFailureOnComplete(): Unit = once { done => val cause = new RuntimeException @@ -673,7 +673,8 @@ trait FutureProjections extends TestBase { trait Blocking extends TestBase { - + import ExecutionContext.Implicits._ + def testAwaitSuccess(): Unit = once { done => val f = future { 0 } @@ -702,8 +703,67 @@ trait Blocking extends TestBase { } +trait BlockContexts extends TestBase { + import ExecutionContext.Implicits._ + import scala.concurrent.{ Await, Awaitable, BlockContext } + + private def getBlockContext(body: => BlockContext): BlockContext = { + blocking(Future { body }, Duration(500, "ms")) + } + + // test outside of an ExecutionContext + def testDefaultOutsideFuture(): Unit = { + val bc = BlockContext.current + assert(bc.getClass.getName.contains("DefaultBlockContext")) + } + + // test BlockContext in our default ExecutionContext + def testDefaultFJP(): Unit = { + val bc = getBlockContext(BlockContext.current) + assert(bc.isInstanceOf[scala.concurrent.forkjoin.ForkJoinWorkerThread]) + } + + // test BlockContext inside BlockContext.withBlockContext + def testPushCustom(): Unit = { + val orig = BlockContext.current + val customBC = new BlockContext() { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = + orig.internalBlockingCall(awaitable, atMost) + } + + val bc = getBlockContext({ + BlockContext.withBlockContext(customBC) { + BlockContext.current + } + }) + + assert(bc eq customBC) + } + + // test BlockContext after a BlockContext.push + def testPopCustom(): Unit = { + val orig = BlockContext.current + val customBC = new BlockContext() { + override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = + orig.internalBlockingCall(awaitable, atMost) + } + + val bc = getBlockContext({ + BlockContext.withBlockContext(customBC) {} + BlockContext.current + }) + + assert(bc ne customBC) + } + + testDefaultOutsideFuture() + testDefaultFJP() + testPushCustom() + testPopCustom() +} trait Promises extends TestBase { + import ExecutionContext.Implicits._ def testSuccess(): Unit = once { done => @@ -730,7 +790,8 @@ trait Promises extends TestBase { trait Exceptions extends TestBase { - + import ExecutionContext.Implicits._ + } // trait TryEitherExtractor extends TestBase { @@ -811,7 +872,7 @@ trait Exceptions extends TestBase { trait CustomExecutionContext extends TestBase { import scala.concurrent.{ ExecutionContext, Awaitable } - def defaultEC = ExecutionContext.defaultExecutionContext + def defaultEC = ExecutionContext.global val inEC = new java.lang.ThreadLocal[Int]() { override def initialValue = 0 @@ -826,7 +887,7 @@ trait CustomExecutionContext extends TestBase { val _count = new java.util.concurrent.atomic.AtomicInteger(0) def count = _count.get - def delegate = ExecutionContext.defaultExecutionContext + def delegate = ExecutionContext.global override def execute(runnable: Runnable) = { _count.incrementAndGet() @@ -843,9 +904,6 @@ trait CustomExecutionContext extends TestBase { delegate.execute(wrapper) } - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = - delegate.internalBlockingCall(awaitable, atMost) - override def reportFailure(t: Throwable): Unit = { System.err.println("Failure: " + t.getClass.getSimpleName + ": " + t.getMessage) delegate.reportFailure(t) @@ -860,14 +918,16 @@ trait CustomExecutionContext extends TestBase { def testOnSuccessCustomEC(): Unit = { val count = countExecs { implicit ec => - once { done => - val f = future({ assertNoEC() })(defaultEC) - f onSuccess { - case _ => - assertEC() + blocking { + once { done => + val f = future({ assertNoEC() })(defaultEC) + f onSuccess { + case _ => + assertEC() done() + } + assertNoEC() } - assertNoEC() } } @@ -877,12 +937,14 @@ trait CustomExecutionContext extends TestBase { def testKeptPromiseCustomEC(): Unit = { val count = countExecs { implicit ec => - once { done => - val f = Promise.successful(10).future - f onSuccess { - case _ => - assertEC() + blocking { + once { done => + val f = Promise.successful(10).future + f onSuccess { + case _ => + assertEC() done() + } } } } @@ -893,28 +955,30 @@ trait CustomExecutionContext extends TestBase { def testCallbackChainCustomEC(): Unit = { val count = countExecs { implicit ec => - once { done => - assertNoEC() - val addOne = { x: Int => assertEC(); x + 1 } - val f = Promise.successful(10).future - f.map(addOne).filter { x => - assertEC() - x == 11 - } flatMap { x => - Promise.successful(x + 1).future.map(addOne).map(addOne) - } onComplete { - case Left(t) => - try { - throw new AssertionError("error in test: " + t.getMessage, t) - } finally { + blocking { + once { done => + assertNoEC() + val addOne = { x: Int => assertEC(); x + 1 } + val f = Promise.successful(10).future + f.map(addOne).filter { x => + assertEC() + x == 11 + } flatMap { x => + Promise.successful(x + 1).future.map(addOne).map(addOne) + } onComplete { + case Left(t) => + try { + throw new AssertionError("error in test: " + t.getMessage, t) + } finally { + done() + } + case Right(x) => + assertEC() + assert(x == 14) done() - } - case Right(x) => - assertEC() - assert(x == 14) - done() + } + assertNoEC() } - assertNoEC() } } @@ -934,6 +998,7 @@ with FutureCallbacks with FutureCombinators with FutureProjections with Promises +with BlockContexts with Exceptions // with TryEitherExtractor with CustomExecutionContext |