summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHavoc Pennington <hp@pobox.com>2012-07-09 12:59:29 -0400
committerHavoc Pennington <hp@pobox.com>2012-07-09 16:03:00 -0400
commit4496c5daa9c573df6d4e2c85a34c37eb9933d1bd (patch)
tree6a1b460de6c86d3722c40c5a71861a180ba7137d /src
parente9afc228fd25a9dbdcf4c39e6187fcac7f26f969 (diff)
downloadscala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.gz
scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.tar.bz2
scala-4496c5daa9c573df6d4e2c85a34c37eb9933d1bd.zip
Collection of updates to SIP-14 (scala.concurrent)
Developed by Viktor Klang and Havoc Pennington - add Promise.isCompleted - add Future.successful and Future.failed - add ExecutionContextExecutor and ExecutionContextExecutorService for Java interop - remove defaultExecutionContext as default parameter value from promise and future - add ExecutionContext.Implicits.global which must be explicitly imported, rather than the previous always-available value for the implicit EC - remove currentExecutionContext, since it could create bugs by being out of sync with the implicit ExecutionContext - remove Future task batching (_taskStack) and Future.releaseStack This optimization should instead be implemented either in a specific thread pool or in a specific ExecutionContext. Some pools or ExecutionContexts may not want or need it. In this patch, the defaultExecutionContext does not keep the batching optimization. Whether it should have it should perhaps be determined through benchmarking. - move internalBlockingCall to BlockContext and remove currentExecutionContext In this patch, BlockContext must be implemented by Thread.currentThread, so the thread pool is the only place you can add custom hooks to be run when blocking. We implement BlockContext for the default ForkJoinWorkerThread in terms of ForkJoinPool.ManagedBlocker. - add public BlockContext.current and BlockContext.withBlockContext These allow an ExecutionContext or other code to override the BlockContext for the current thread. With this API, the BlockContext is customizable without creating a new pool of threads. BlockContext.current is needed to obtain the previous BlockContext before you push, so you can "chain up" to it if desired. BlockContext.withBlockContext is used to override the context for a given piece of code. - move isFutureThrowable into impl.Future - add implicitNotFound to ExecutionContext - remove default global EC from future {} and promise {} - add ExecutionContext.global for explicit use of the global default EC, replaces defaultExecutionContext - add a timeout to scala-concurrent-tck tests that block on SyncVar (so tests time out rather than hang) - insert blocking{} calls into concurrent tck to fix deadlocking - add NonFatal.apply and tests for NonFatal - add OnCompleteRunnable marker trait This would allow an ExecutionContext to distinguish a Runnable originating from Future.onComplete (all callbacks on Future end up going through onComplete). - rename ListenerRunnable to CallbackRunnable and use for KeptPromise too Just adds some clarity and consistency.
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala2
-rw-r--r--src/library/scala/concurrent/BlockContext.scala81
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala34
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala5
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala74
-rw-r--r--src/library/scala/concurrent/Future.scala29
-rw-r--r--src/library/scala/concurrent/Promise.scala9
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala61
-rw-r--r--src/library/scala/concurrent/impl/Future.scala102
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala34
-rw-r--r--src/library/scala/util/control/NonFatal.scala23
11 files changed, 245 insertions, 209 deletions
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 2eaa861429..3d27f619bb 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -48,7 +48,7 @@ extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
* By default, parallel collections are parametrized with this task support object, so parallel collections
* share the same execution context backend as the rest of the `scala.concurrent` package.
*/
-class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.defaultExecutionContext)
+class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
new file mode 100644
index 0000000000..a5b878c546
--- /dev/null
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -0,0 +1,81 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import java.lang.Thread
+import scala.concurrent.util.Duration
+
+/**
+ * A context to be notified by `scala.concurrent.blocking()` when
+ * a thread is about to block. In effect this trait provides
+ * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
+ * locates an instance of `BlockContext` by first looking for one
+ * provided through `BlockContext.withBlockContext()` and failing that,
+ * checking whether `Thread.currentThread` is an instance of `BlockContext`.
+ * So a thread pool can have its `java.lang.Thread` instances implement
+ * `BlockContext`. There's a default `BlockContext` used if the thread
+ * doesn't implement `BlockContext`.
+ *
+ * Typically, you'll want to chain to the previous `BlockContext`,
+ * like this:
+ * {{{
+ * val oldContext = BlockContext.current
+ * val myContext = new BlockContext {
+ * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ * // you'd have code here doing whatever you need to do
+ * // when the thread is about to block.
+ * // Then you'd chain to the previous context:
+ * oldContext.internalBlockingCall(awaitable, atMost)
+ * }
+ * }
+ * BlockContext.withBlockContext(myContext) {
+ * // then this block runs with myContext as the handler
+ * // for scala.concurrent.blocking
+ * }
+ * }}}
+ */
+trait BlockContext {
+
+ /** Used internally by the framework; blocks execution for at most
+ * `atMost` time while waiting for an `awaitable` object to become ready.
+ *
+ * Clients should use `scala.concurrent.blocking` instead; this is
+ * the implementation of `scala.concurrent.blocking`, generally
+ * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
+ */
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+}
+
+object BlockContext {
+ private object DefaultBlockContext extends BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ awaitable.result(atMost)(Await.canAwaitEvidence)
+ }
+
+ private val contextLocal = new ThreadLocal[BlockContext]() {
+ override def initialValue = Thread.currentThread match {
+ case ctx: BlockContext => ctx
+ case _ => DefaultBlockContext
+ }
+ }
+
+ /** Obtain the current thread's current `BlockContext`. */
+ def current: BlockContext = contextLocal.get
+
+ /** Pushes a current `BlockContext` while executing `body`. */
+ def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
+ val old = contextLocal.get
+ try {
+ contextLocal.set(blockContext)
+ body
+ } finally {
+ contextLocal.set(old)
+ }
+ }
+}
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index 330a2f0e25..86a86966ef 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -17,23 +17,6 @@ import language.implicitConversions
/** This package object contains primitives for concurrent and parallel programming.
*/
abstract class ConcurrentPackageObject {
- /** A global execution environment for executing lightweight tasks.
- */
- lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
-
- val currentExecutionContext = new ThreadLocal[ExecutionContext]
-
- val handledFutureException: PartialFunction[Throwable, Throwable] = {
- case t: Throwable if isFutureThrowable(t) => t
- }
-
- // TODO rename appropriately and make public
- private[concurrent] def isFutureThrowable(t: Throwable) = t match {
- case e: Error => false
- case t: scala.util.control.ControlThrowable => false
- case i: InterruptedException => false
- case _ => true
- }
/* concurrency constructs */
@@ -46,8 +29,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
- def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] =
- Future[T](body)
+ def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
/** Creates a promise object which can be completed with a value.
*
@@ -55,8 +37,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] =
- Promise[T]()
+ def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
/** Used to block on a piece of code which potentially blocks.
*
@@ -67,8 +48,7 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](body: =>T): T =
- blocking(impl.Future.body2awaitable(body), Duration.Inf)
+ def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
/** Blocks on an awaitable object.
*
@@ -79,12 +59,8 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.result(atMost)(Await.canAwaitEvidence)
- case ec => ec.internalBlockingCall(awaitable, atMost)
- }
- }
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
+ BlockContext.current.internalBlockingCall(awaitable, atMost)
@inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 96a66d83b6..91e41748f5 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -23,7 +23,7 @@ package scala.concurrent
* @author Paul Phillips
* @version 2.8
*/
-class DelayedLazyVal[T](f: () => T, body: => Unit) {
+class DelayedLazyVal[T](f: () => T, body: => Unit){
@volatile private[this] var _isDone = false
private[this] lazy val complete = f()
@@ -39,7 +39,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- // TODO replace with scala.concurrent.future { ... }
+ // FIXME need to take ExecutionContext in constructor
+ import ExecutionContext.Implicits.global
future {
body
_isDone = true
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 436a17a33b..b486e5269e 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -9,58 +9,80 @@
package scala.concurrent
-
-import java.util.concurrent.atomic.{ AtomicInteger }
-import java.util.concurrent.{ Executors, Future => JFuture, Callable, ExecutorService, Executor }
+import java.util.concurrent.{ ExecutorService, Executor }
import scala.concurrent.util.Duration
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
-import scala.collection.generic.CanBuildFrom
-import collection._
-
-
+import scala.annotation.implicitNotFound
+/**
+ * An `ExecutionContext` is an abstraction over an entity that can execute program logic.
+ */
+@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global")
trait ExecutionContext {
/** Runs a block of code on this execution context.
*/
def execute(runnable: Runnable): Unit
- /** Used internally by the framework - blocks execution for at most `atMost` time while waiting
- * for an `awaitable` object to become ready.
- *
- * Clients should use `scala.concurrent.blocking` instead.
- */
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
-
/** Reports that an asynchronous computation failed.
*/
def reportFailure(t: Throwable): Unit
}
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutor extends ExecutionContext with Executor
+
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
+
/** Contains factory methods for creating execution contexts.
*/
object ExecutionContext {
-
- implicit def defaultExecutionContext: ExecutionContext = scala.concurrent.defaultExecutionContext
-
+ /**
+ * The `ExecutionContext` associated with the current `Thread`
+ */
+ val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set
+
+ /**
+ * This is the explicit global ExecutionContext,
+ * call this when you want to provide the global ExecutionContext explicitly
+ */
+ def global: ExecutionContextExecutor = Implicits.global
+
+ object Implicits {
+ /**
+ * This is the implicit global ExecutionContext,
+ * import this when you want to provide the global ExecutionContext implicitly
+ */
+ implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ }
+
/** Creates an `ExecutionContext` from the given `ExecutorService`.
*/
- def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
+ def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter.
+ */
+ def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
/** Creates an `ExecutionContext` from the given `Executor`.
*/
- def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
+ def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
impl.ExecutionContextImpl.fromExecutor(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `Executor` with the default Reporter.
+ */
+ def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
- def defaultReporter: Throwable => Unit = {
- // re-throwing `Error`s here causes an exception handling test to fail.
- //case e: Error => throw e
- case t => t.printStackTrace()
- }
-
+ /** The default reporter simply prints the stack trace of the `Throwable` to System.err.
+ */
+ def defaultReporter: Throwable => Unit = { case t => t.printStackTrace() }
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 2de0c57253..75a83d6ef8 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -8,7 +8,7 @@
package scala.concurrent
-
+import language.higherKinds
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
@@ -23,11 +23,9 @@ import scala.Option
import scala.util.{Try, Success, Failure}
import scala.annotation.tailrec
-import scala.collection.mutable.Stack
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
-import language.higherKinds
@@ -138,7 +136,7 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
+ case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
case _ =>
}(executor)
@@ -580,6 +578,20 @@ object Future {
classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
+
+ /** Creates an already completed Future with the specified exception.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
+
+ /** Creates an already completed Future with the specified result.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def successful[T](result: T): Future[T] = Promise.successful(result).future
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
@@ -710,5 +722,12 @@ object Future {
}
}
-
+/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
+ * wraps a callback provided to `Future.onComplete`.
+ * All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
+ * `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
+ */
+trait OnCompleteRunnable {
+ self: Runnable =>
+}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 578642966f..5d1b2c00b6 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -34,6 +34,15 @@ trait Promise[T] {
*/
def future: Future[T]
+ /** Returns whether the promise has already been completed with
+ * a value or an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return `true` if the promise is already completed, `false` otherwise
+ */
+ def isCompleted: Boolean
+
/** Completes the promise with either an exception or a value.
*
* @param result Either the value or the exception to complete the promise with.
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 4c6347dce0..551a444425 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -13,34 +13,34 @@ package scala.concurrent.impl
import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
import java.util.Collection
import scala.concurrent.forkjoin._
-import scala.concurrent.{ ExecutionContext, Awaitable }
+import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService }
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
-private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor {
+private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {
val executor: Executor = es match {
case null => createExecutorService
case some => some
}
-
- // to ensure that the current execution context thread local is properly set
- def executorsThreadFactory = new ThreadFactory {
- def newThread(r: Runnable) = new Thread(new Runnable {
- override def run() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
- r.run()
- }
- })
- }
-
- // to ensure that the current execution context thread local is properly set
+
+ // Implement BlockContext on FJP threads
def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
- def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) {
- override def onStart() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
+ def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) with BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ var result: T = null.asInstanceOf[T]
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ @volatile var isdone = false
+ def block(): Boolean = {
+ result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
+ isdone = true
+ true
+ }
+ def isReleasable = isdone
+ })
+ result
}
}
}
@@ -68,7 +68,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case NonFatal(t) =>
System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
t.printStackTrace(System.err)
- Executors.newCachedThreadPool(executorsThreadFactory) //FIXME use the same desired parallelism here too?
+ Executors.newCachedThreadPool() //FIXME use the same desired parallelism here too?
}
def execute(runnable: Runnable): Unit = executor match {
@@ -84,27 +84,6 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case generic => generic execute runnable
}
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
- Future.releaseStack(this)
-
- executor match {
- case fj: ForkJoinPool =>
- var result: T = null.asInstanceOf[T]
- ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
- @volatile var isdone = false
- def block(): Boolean = {
- result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
- isdone = true
- true
- }
- def isReleasable = isdone
- })
- result
- case _ =>
- awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
- }
- }
-
def reportFailure(t: Throwable) = reporter(t)
}
@@ -112,8 +91,8 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
private[concurrent] object ExecutionContextImpl {
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
- def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService =
- new ExecutionContextImpl(es, reporter) with ExecutorService {
+ def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
+ new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
override def execute(command: Runnable) = executor.execute(command)
override def shutdown() { asExecutorService.shutdown() }
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 8012ea6a93..073e6c4c9f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -46,11 +46,19 @@ private[concurrent] object Future {
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
- private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T)
- extends Task {
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptedException => false
+ case _ => true
+ }
+
+ private[impl] class PromiseCompletingRunnable[T](body: => T)
+ extends Runnable {
val promise = new Promise.DefaultPromise[T]()
- protected override def task() = {
+ override def run() = {
promise complete {
try Right(body) catch {
case NonFatal(e) =>
@@ -63,90 +71,8 @@ private[concurrent] object Future {
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
- val task = new PromiseCompletingTask(executor, body)
- task.dispatch()
-
- task.promise.future
- }
-
- private[impl] val throwableId: Throwable => Throwable = identity _
-
- // an optimization for batching futures
- // TODO we should replace this with a public queue,
- // so that it can be stolen from
- // OR: a push to the local task queue should be so cheap that this is
- // not even needed, but stealing is still possible
-
- private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)
-
- private val _taskStack = new ThreadLocal[TaskStack]()
-
- private[impl] trait Task extends Runnable {
- def executor: ExecutionContext
-
- // run the original callback (no dispatch)
- protected def task(): Unit
-
- // we implement Runnable to avoid creating
- // an extra object. run() runs ourselves with
- // a TaskStack pushed, and then runs any
- // other tasks that show up in the stack.
- final override def run() = {
- try {
- val taskStack = TaskStack(Stack[Task](this), executor)
- _taskStack set taskStack
- while (taskStack.stack.nonEmpty) {
- val next = taskStack.stack.pop()
- require(next.executor eq executor)
- try next.task() catch { case NonFatal(e) => executor reportFailure e }
- }
- } finally {
- _taskStack.remove()
- }
- }
-
- // send the task to the running executor.execute() via
- // _taskStack, or start a new executor.execute()
- def dispatch(force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
- case _ => executor.execute(this)
- }
- }
-
- private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
- extends Task {
- protected override def task() = {
- _taskStack.get.stack.elems = elems
- }
- }
-
- private[impl] def releaseStack(executor: ExecutionContext): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && stack.stack.nonEmpty =>
- val tasks = stack.stack.elems
- stack.stack.clear()
- _taskStack.remove()
- val release = new ReleaseTask(executor, tasks)
- release.dispatch(force=true)
- case null =>
- // do nothing - there is no local batching stack anymore
- case _ =>
- _taskStack.remove()
- }
-
- private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
- extends Task {
- private var value: Either[Throwable, T] = null
-
- protected override def task() = {
- require(value ne null) // dispatch(value) must be called before dispatch()
- onComplete(value)
- }
-
- def dispatch(value: Either[Throwable, T]): Unit = {
- this.value = value
- dispatch()
- }
+ val runnable = new PromiseCompletingRunnable(body)
+ executor.execute(runnable)
+ runnable.promise.future
}
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index c5060a2368..3ac34bef8a 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -11,11 +11,12 @@ package scala.concurrent.impl
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
-import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, TimeoutException, ExecutionException }
+import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
//import scala.util.continuations._
import scala.concurrent.util.Duration
import scala.util
import scala.annotation.tailrec
+import scala.util.control.NonFatal
//import scala.concurrent.NonDeterministic
@@ -24,6 +25,21 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
def future: this.type = this
}
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable {
+ // must be filled in before running it
+ var value: Either[Throwable, T] = null
+
+ override def run() = {
+ require(value ne null) // must set value to non-null before running!
+ try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
+ }
+
+ def executeWithValue(v: Either[Throwable, T]): Unit = {
+ require(value eq null) // can't complete it twice
+ value = v
+ executor.execute(this)
+ }
+}
object Promise {
@@ -94,10 +110,10 @@ object Promise {
val resolved = resolveEither(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = {
+ def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]]
+ val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
@@ -107,19 +123,19 @@ object Promise {
synchronized { notifyAll() } //Notify any evil blockers
}) match {
case null => false
- case cs if cs.isEmpty => true
- case cs => cs.foreach(c => c.dispatch(resolved)); true
+ case rs if rs.isEmpty => true
+ case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
- val bound = new Future.OnCompleteTask[T](executor, func)
+ val runnable = new CallbackRunnable[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]])
- case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
+ case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]])
+ case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
}
@@ -139,7 +155,7 @@ object Promise {
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get
- (new Future.OnCompleteTask(executor, func)).dispatch(completedAs)
+ (new CallbackRunnable(executor, func)).executeWithValue(completedAs)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
diff --git a/src/library/scala/util/control/NonFatal.scala b/src/library/scala/util/control/NonFatal.scala
index 9da2f63307..5137f0f2f5 100644
--- a/src/library/scala/util/control/NonFatal.scala
+++ b/src/library/scala/util/control/NonFatal.scala
@@ -23,16 +23,23 @@ package scala.util.control
* // dangerous stuff
* } catch {
* case NonFatal(e) => log.error(e, "Something not that bad.")
+ * // or
+ * case e if NonFatal(e) => log.error(e, "Something not that bad.")
* }
* }}}
*/
object NonFatal {
-
- def unapply(t: Throwable): Option[Throwable] = t match {
- case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError
- // VirtualMachineError includes OutOfMemoryError and other fatal errors
- case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => None
- case e ⇒ Some(e)
- }
-
+ /**
+ * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal
+ */
+ def apply(t: Throwable): Boolean = t match {
+ case _: StackOverflowError => true // StackOverflowError ok even though it is a VirtualMachineError
+ // VirtualMachineError includes OutOfMemoryError and other fatal errors
+ case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => false
+ case _ => true
+ }
+ /**
+ * Returns Some(t) if NonFatal(t) == true, otherwise None
+ */
+ def unapply(t: Throwable): Option[Throwable] = if (apply(t)) Some(t) else None
}