summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorPaul Phillips <paulp@improving.org>2012-07-13 12:08:20 -0700
committerPaul Phillips <paulp@improving.org>2012-07-13 12:08:20 -0700
commit453b7068ed4294eef18bf10a321a5b63497c7466 (patch)
tree8cfc64ec934fdf15d7665208a366611ce9474855 /src/library
parent724b0dc71f1f8f91b995d01e9e027789f54ecdfe (diff)
parent3d0099dbc990ab914de5b9deb5087d9d3fb6220c (diff)
downloadscala-453b7068ed4294eef18bf10a321a5b63497c7466.tar.gz
scala-453b7068ed4294eef18bf10a321a5b63497c7466.tar.bz2
scala-453b7068ed4294eef18bf10a321a5b63497c7466.zip
Merge branch '2.10.x' into topic/name-implicits
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/StringContext.scala38
-rw-r--r--src/library/scala/collection/MapLike.scala6
-rw-r--r--src/library/scala/collection/convert/Wrappers.scala3
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala2
-rw-r--r--src/library/scala/concurrent/BlockContext.scala81
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala34
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala5
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala74
-rw-r--r--src/library/scala/concurrent/Future.scala29
-rw-r--r--src/library/scala/concurrent/Promise.scala9
-rw-r--r--src/library/scala/concurrent/SyncVar.scala20
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala61
-rw-r--r--src/library/scala/concurrent/impl/Future.scala102
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala34
-rw-r--r--src/library/scala/package.scala1
-rw-r--r--src/library/scala/util/control/NonFatal.scala23
16 files changed, 268 insertions, 254 deletions
diff --git a/src/library/scala/StringContext.scala b/src/library/scala/StringContext.scala
index f400f18dab..f11dfb72ae 100644
--- a/src/library/scala/StringContext.scala
+++ b/src/library/scala/StringContext.scala
@@ -8,7 +8,7 @@
package scala
-import collection.mutable.ArrayBuffer
+import language.experimental.macros
/** A class to support string interpolation.
* This class supports string interpolation as outlined in Scala SIP-11.
@@ -42,7 +42,7 @@ case class StringContext(parts: String*) {
* @throws A `StringContext.InvalidEscapeException` if if a `parts` string contains a backslash (`\`) character
* that does not start a valid escape sequence.
*/
- def s(args: Any*) = {
+ def s(args: Any*): String = {
checkLengths(args: _*)
val pi = parts.iterator
val ai = args.iterator
@@ -82,38 +82,8 @@ case class StringContext(parts: String*) {
* string literally. This is achieved by replacing each such occurrence by the
* format specifier `%%`.
*/
- def f(args: Any*) = {
- checkLengths(args: _*)
- val pi = parts.iterator
- val bldr = new java.lang.StringBuilder
- def copyString(first: Boolean): Unit = {
- val str = treatEscapes(pi.next())
- val strIsEmpty = str.length == 0
- var start = 0
- var idx = 0
- if (!first) {
- if (strIsEmpty || (str charAt 0) != '%')
- bldr append "%s"
- idx = 1
- }
- if (!strIsEmpty) {
- val len = str.length
- while (idx < len) {
- if (str(idx) == '%') {
- bldr append (str substring (start, idx)) append "%%"
- start = idx + 1
- }
- idx += 1
- }
- bldr append (str substring (start, idx))
- }
- }
- copyString(first = true)
- while (pi.hasNext) {
- copyString(first = false)
- }
- bldr.toString format (args: _*)
- }
+ // The implementation is magically hardwired into `scala.tools.reflect.MacroImplementations.macro_StringInterpolation_f`
+ def f(args: Any*): String = macro ???
}
object StringContext {
diff --git a/src/library/scala/collection/MapLike.scala b/src/library/scala/collection/MapLike.scala
index 55d482f6c8..ed2a877631 100644
--- a/src/library/scala/collection/MapLike.scala
+++ b/src/library/scala/collection/MapLike.scala
@@ -301,11 +301,11 @@ self =>
def ++[B1 >: B](xs: GenTraversableOnce[(A, B1)]): Map[A, B1] =
((repr: Map[A, B1]) /: xs.seq) (_ + _)
- /** Returns a new map with all key/value pairs for which the predicate
+ /** Returns a new map obtained by removing all key/value pairs for which the predicate
* `p` returns `true`.
*
- * '''Note:''' This method works by successively removing elements fro which the
- * predicate is false from this set.
+ * '''Note:''' This method works by successively removing elements for which the
+ * predicate is true from this set.
* If removal is slow, or you expect that most elements of the set
* will be removed, you might consider using `filter`
* with a negated predicate instead.
diff --git a/src/library/scala/collection/convert/Wrappers.scala b/src/library/scala/collection/convert/Wrappers.scala
index 8c603dc91b..75707b69b0 100644
--- a/src/library/scala/collection/convert/Wrappers.scala
+++ b/src/library/scala/collection/convert/Wrappers.scala
@@ -467,4 +467,5 @@ private[collection] trait Wrappers {
}
}
-object Wrappers extends Wrappers
+@SerialVersionUID(0 - 5857859809262781311L)
+object Wrappers extends Wrappers with Serializable
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 2eaa861429..3d27f619bb 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -48,7 +48,7 @@ extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
* By default, parallel collections are parametrized with this task support object, so parallel collections
* share the same execution context backend as the rest of the `scala.concurrent` package.
*/
-class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.defaultExecutionContext)
+class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
new file mode 100644
index 0000000000..a5b878c546
--- /dev/null
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -0,0 +1,81 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import java.lang.Thread
+import scala.concurrent.util.Duration
+
+/**
+ * A context to be notified by `scala.concurrent.blocking()` when
+ * a thread is about to block. In effect this trait provides
+ * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
+ * locates an instance of `BlockContext` by first looking for one
+ * provided through `BlockContext.withBlockContext()` and failing that,
+ * checking whether `Thread.currentThread` is an instance of `BlockContext`.
+ * So a thread pool can have its `java.lang.Thread` instances implement
+ * `BlockContext`. There's a default `BlockContext` used if the thread
+ * doesn't implement `BlockContext`.
+ *
+ * Typically, you'll want to chain to the previous `BlockContext`,
+ * like this:
+ * {{{
+ * val oldContext = BlockContext.current
+ * val myContext = new BlockContext {
+ * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ * // you'd have code here doing whatever you need to do
+ * // when the thread is about to block.
+ * // Then you'd chain to the previous context:
+ * oldContext.internalBlockingCall(awaitable, atMost)
+ * }
+ * }
+ * BlockContext.withBlockContext(myContext) {
+ * // then this block runs with myContext as the handler
+ * // for scala.concurrent.blocking
+ * }
+ * }}}
+ */
+trait BlockContext {
+
+ /** Used internally by the framework; blocks execution for at most
+ * `atMost` time while waiting for an `awaitable` object to become ready.
+ *
+ * Clients should use `scala.concurrent.blocking` instead; this is
+ * the implementation of `scala.concurrent.blocking`, generally
+ * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
+ */
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+}
+
+object BlockContext {
+ private object DefaultBlockContext extends BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
+ awaitable.result(atMost)(Await.canAwaitEvidence)
+ }
+
+ private val contextLocal = new ThreadLocal[BlockContext]() {
+ override def initialValue = Thread.currentThread match {
+ case ctx: BlockContext => ctx
+ case _ => DefaultBlockContext
+ }
+ }
+
+ /** Obtain the current thread's current `BlockContext`. */
+ def current: BlockContext = contextLocal.get
+
+ /** Pushes a current `BlockContext` while executing `body`. */
+ def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
+ val old = contextLocal.get
+ try {
+ contextLocal.set(blockContext)
+ body
+ } finally {
+ contextLocal.set(old)
+ }
+ }
+}
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index 330a2f0e25..86a86966ef 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -17,23 +17,6 @@ import language.implicitConversions
/** This package object contains primitives for concurrent and parallel programming.
*/
abstract class ConcurrentPackageObject {
- /** A global execution environment for executing lightweight tasks.
- */
- lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
-
- val currentExecutionContext = new ThreadLocal[ExecutionContext]
-
- val handledFutureException: PartialFunction[Throwable, Throwable] = {
- case t: Throwable if isFutureThrowable(t) => t
- }
-
- // TODO rename appropriately and make public
- private[concurrent] def isFutureThrowable(t: Throwable) = t match {
- case e: Error => false
- case t: scala.util.control.ControlThrowable => false
- case i: InterruptedException => false
- case _ => true
- }
/* concurrency constructs */
@@ -46,8 +29,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
- def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] =
- Future[T](body)
+ def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
/** Creates a promise object which can be completed with a value.
*
@@ -55,8 +37,7 @@ abstract class ConcurrentPackageObject {
* @param execctx the execution context on which the promise is created on
* @return the newly created `Promise` object
*/
- def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] =
- Promise[T]()
+ def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
/** Used to block on a piece of code which potentially blocks.
*
@@ -67,8 +48,7 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](body: =>T): T =
- blocking(impl.Future.body2awaitable(body), Duration.Inf)
+ def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
/** Blocks on an awaitable object.
*
@@ -79,12 +59,8 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.result(atMost)(Await.canAwaitEvidence)
- case ec => ec.internalBlockingCall(awaitable, atMost)
- }
- }
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
+ BlockContext.current.internalBlockingCall(awaitable, atMost)
@inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 96a66d83b6..91e41748f5 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -23,7 +23,7 @@ package scala.concurrent
* @author Paul Phillips
* @version 2.8
*/
-class DelayedLazyVal[T](f: () => T, body: => Unit) {
+class DelayedLazyVal[T](f: () => T, body: => Unit){
@volatile private[this] var _isDone = false
private[this] lazy val complete = f()
@@ -39,7 +39,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- // TODO replace with scala.concurrent.future { ... }
+ // FIXME need to take ExecutionContext in constructor
+ import ExecutionContext.Implicits.global
future {
body
_isDone = true
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 436a17a33b..b486e5269e 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -9,58 +9,80 @@
package scala.concurrent
-
-import java.util.concurrent.atomic.{ AtomicInteger }
-import java.util.concurrent.{ Executors, Future => JFuture, Callable, ExecutorService, Executor }
+import java.util.concurrent.{ ExecutorService, Executor }
import scala.concurrent.util.Duration
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
-import scala.collection.generic.CanBuildFrom
-import collection._
-
-
+import scala.annotation.implicitNotFound
+/**
+ * An `ExecutionContext` is an abstraction over an entity that can execute program logic.
+ */
+@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global")
trait ExecutionContext {
/** Runs a block of code on this execution context.
*/
def execute(runnable: Runnable): Unit
- /** Used internally by the framework - blocks execution for at most `atMost` time while waiting
- * for an `awaitable` object to become ready.
- *
- * Clients should use `scala.concurrent.blocking` instead.
- */
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
-
/** Reports that an asynchronous computation failed.
*/
def reportFailure(t: Throwable): Unit
}
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutor extends ExecutionContext with Executor
+
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
+
/** Contains factory methods for creating execution contexts.
*/
object ExecutionContext {
-
- implicit def defaultExecutionContext: ExecutionContext = scala.concurrent.defaultExecutionContext
-
+ /**
+ * The `ExecutionContext` associated with the current `Thread`
+ */
+ val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set
+
+ /**
+ * This is the explicit global ExecutionContext,
+ * call this when you want to provide the global ExecutionContext explicitly
+ */
+ def global: ExecutionContextExecutor = Implicits.global
+
+ object Implicits {
+ /**
+ * This is the implicit global ExecutionContext,
+ * import this when you want to provide the global ExecutionContext implicitly
+ */
+ implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ }
+
/** Creates an `ExecutionContext` from the given `ExecutorService`.
*/
- def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService =
+ def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
impl.ExecutionContextImpl.fromExecutorService(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter.
+ */
+ def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
/** Creates an `ExecutionContext` from the given `Executor`.
*/
- def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor =
+ def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
impl.ExecutionContextImpl.fromExecutor(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `Executor` with the default Reporter.
+ */
+ def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
- def defaultReporter: Throwable => Unit = {
- // re-throwing `Error`s here causes an exception handling test to fail.
- //case e: Error => throw e
- case t => t.printStackTrace()
- }
-
+ /** The default reporter simply prints the stack trace of the `Throwable` to System.err.
+ */
+ def defaultReporter: Throwable => Unit = { case t => t.printStackTrace() }
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 2de0c57253..75a83d6ef8 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -8,7 +8,7 @@
package scala.concurrent
-
+import language.higherKinds
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
@@ -23,11 +23,9 @@ import scala.Option
import scala.util.{Try, Success, Failure}
import scala.annotation.tailrec
-import scala.collection.mutable.Stack
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
-import language.higherKinds
@@ -138,7 +136,7 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
+ case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
case _ =>
}(executor)
@@ -580,6 +578,20 @@ object Future {
classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
+
+ /** Creates an already completed Future with the specified exception.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
+
+ /** Creates an already completed Future with the specified result.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def successful[T](result: T): Future[T] = Promise.successful(result).future
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
@@ -710,5 +722,12 @@ object Future {
}
}
-
+/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
+ * wraps a callback provided to `Future.onComplete`.
+ * All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
+ * `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
+ */
+trait OnCompleteRunnable {
+ self: Runnable =>
+}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 578642966f..5d1b2c00b6 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -34,6 +34,15 @@ trait Promise[T] {
*/
def future: Future[T]
+ /** Returns whether the promise has already been completed with
+ * a value or an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return `true` if the promise is already completed, `false` otherwise
+ */
+ def isCompleted: Boolean
+
/** Completes the promise with either an exception or a value.
*
* @param result Either the value or the exception to complete the promise with.
diff --git a/src/library/scala/concurrent/SyncVar.scala b/src/library/scala/concurrent/SyncVar.scala
index 5a6d95c2ed..292014706d 100644
--- a/src/library/scala/concurrent/SyncVar.scala
+++ b/src/library/scala/concurrent/SyncVar.scala
@@ -53,6 +53,8 @@ class SyncVar[A] {
value
}
+ /** Waits for this SyncVar to become defined and returns
+ * the result */
def take(): A = synchronized {
try get
finally unsetVal()
@@ -64,7 +66,8 @@ class SyncVar[A] {
* the SyncVar.
*
* @param timeout the amount of milliseconds to wait, 0 means forever
- * @return `None` if variable is undefined after `timeout`, `Some(value)` otherwise
+ * @return the value or a throws an exception if the timeout occurs
+ * @throws NoSuchElementException on timeout
*/
def take(timeout: Long): A = synchronized {
try get(timeout).get
@@ -72,25 +75,28 @@ class SyncVar[A] {
}
// TODO: this method should be private
- // [Heather] the reason why: it doesn't take into consideration
+ // [Heather] the reason why: it doesn't take into consideration
// whether or not the SyncVar is already defined. So, set has been
// deprecated in order to eventually be able to make "setting" private
@deprecated("Use `put` instead, as `set` is potentionally error-prone", "2.10.0")
def set(x: A): Unit = setVal(x)
+ /** Places a value in the SyncVar. If the SyncVar already has a stored value,
+ * it waits until another thread takes it */
def put(x: A): Unit = synchronized {
while (isDefined) wait()
setVal(x)
}
+ /** Checks whether a value is stored in the synchronized variable */
def isSet: Boolean = synchronized {
isDefined
}
// TODO: this method should be private
- // [Heather] the reason why: it doesn't take into consideration
+ // [Heather] the reason why: it doesn't take into consideration
// whether or not the SyncVar is already defined. So, unset has been
- // deprecated in order to eventually be able to make "unsetting" private
+ // deprecated in order to eventually be able to make "unsetting" private
@deprecated("Use `take` instead, as `unset` is potentionally error-prone", "2.10.0")
def unset(): Unit = synchronized {
isDefined = false
@@ -98,7 +104,7 @@ class SyncVar[A] {
notifyAll()
}
- // `setVal` exists so as to retroactively deprecate `set` without
+ // `setVal` exists so as to retroactively deprecate `set` without
// deprecation warnings where we use `set` internally. The
// implementation of `set` was moved to `setVal` to achieve this
private def setVal(x: A): Unit = synchronized {
@@ -107,13 +113,13 @@ class SyncVar[A] {
notifyAll()
}
- // `unsetVal` exists so as to retroactively deprecate `unset` without
+ // `unsetVal` exists so as to retroactively deprecate `unset` without
// deprecation warnings where we use `unset` internally. The
// implementation of `unset` was moved to `unsetVal` to achieve this
private def unsetVal(): Unit = synchronized {
isDefined = false
value = None
- notifyAll()
+ notifyAll()
}
}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 4c6347dce0..551a444425 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -13,34 +13,34 @@ package scala.concurrent.impl
import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
import java.util.Collection
import scala.concurrent.forkjoin._
-import scala.concurrent.{ ExecutionContext, Awaitable }
+import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService }
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
-private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor {
+private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {
val executor: Executor = es match {
case null => createExecutorService
case some => some
}
-
- // to ensure that the current execution context thread local is properly set
- def executorsThreadFactory = new ThreadFactory {
- def newThread(r: Runnable) = new Thread(new Runnable {
- override def run() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
- r.run()
- }
- })
- }
-
- // to ensure that the current execution context thread local is properly set
+
+ // Implement BlockContext on FJP threads
def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
- def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) {
- override def onStart() {
- scala.concurrent.currentExecutionContext.set(ExecutionContextImpl.this)
+ def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) with BlockContext {
+ override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ var result: T = null.asInstanceOf[T]
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ @volatile var isdone = false
+ def block(): Boolean = {
+ result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
+ isdone = true
+ true
+ }
+ def isReleasable = isdone
+ })
+ result
}
}
}
@@ -68,7 +68,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case NonFatal(t) =>
System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
t.printStackTrace(System.err)
- Executors.newCachedThreadPool(executorsThreadFactory) //FIXME use the same desired parallelism here too?
+ Executors.newCachedThreadPool() //FIXME use the same desired parallelism here too?
}
def execute(runnable: Runnable): Unit = executor match {
@@ -84,27 +84,6 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case generic => generic execute runnable
}
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
- Future.releaseStack(this)
-
- executor match {
- case fj: ForkJoinPool =>
- var result: T = null.asInstanceOf[T]
- ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
- @volatile var isdone = false
- def block(): Boolean = {
- result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
- isdone = true
- true
- }
- def isReleasable = isdone
- })
- result
- case _ =>
- awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
- }
- }
-
def reportFailure(t: Throwable) = reporter(t)
}
@@ -112,8 +91,8 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
private[concurrent] object ExecutionContextImpl {
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
- def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService =
- new ExecutionContextImpl(es, reporter) with ExecutorService {
+ def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
+ new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
override def execute(command: Runnable) = executor.execute(command)
override def shutdown() { asExecutorService.shutdown() }
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 8012ea6a93..073e6c4c9f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -46,11 +46,19 @@ private[concurrent] object Future {
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
- private[impl] class PromiseCompletingTask[T](override val executor: ExecutionContext, body: => T)
- extends Task {
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptedException => false
+ case _ => true
+ }
+
+ private[impl] class PromiseCompletingRunnable[T](body: => T)
+ extends Runnable {
val promise = new Promise.DefaultPromise[T]()
- protected override def task() = {
+ override def run() = {
promise complete {
try Right(body) catch {
case NonFatal(e) =>
@@ -63,90 +71,8 @@ private[concurrent] object Future {
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
- val task = new PromiseCompletingTask(executor, body)
- task.dispatch()
-
- task.promise.future
- }
-
- private[impl] val throwableId: Throwable => Throwable = identity _
-
- // an optimization for batching futures
- // TODO we should replace this with a public queue,
- // so that it can be stolen from
- // OR: a push to the local task queue should be so cheap that this is
- // not even needed, but stealing is still possible
-
- private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)
-
- private val _taskStack = new ThreadLocal[TaskStack]()
-
- private[impl] trait Task extends Runnable {
- def executor: ExecutionContext
-
- // run the original callback (no dispatch)
- protected def task(): Unit
-
- // we implement Runnable to avoid creating
- // an extra object. run() runs ourselves with
- // a TaskStack pushed, and then runs any
- // other tasks that show up in the stack.
- final override def run() = {
- try {
- val taskStack = TaskStack(Stack[Task](this), executor)
- _taskStack set taskStack
- while (taskStack.stack.nonEmpty) {
- val next = taskStack.stack.pop()
- require(next.executor eq executor)
- try next.task() catch { case NonFatal(e) => executor reportFailure e }
- }
- } finally {
- _taskStack.remove()
- }
- }
-
- // send the task to the running executor.execute() via
- // _taskStack, or start a new executor.execute()
- def dispatch(force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
- case _ => executor.execute(this)
- }
- }
-
- private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
- extends Task {
- protected override def task() = {
- _taskStack.get.stack.elems = elems
- }
- }
-
- private[impl] def releaseStack(executor: ExecutionContext): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && stack.stack.nonEmpty =>
- val tasks = stack.stack.elems
- stack.stack.clear()
- _taskStack.remove()
- val release = new ReleaseTask(executor, tasks)
- release.dispatch(force=true)
- case null =>
- // do nothing - there is no local batching stack anymore
- case _ =>
- _taskStack.remove()
- }
-
- private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
- extends Task {
- private var value: Either[Throwable, T] = null
-
- protected override def task() = {
- require(value ne null) // dispatch(value) must be called before dispatch()
- onComplete(value)
- }
-
- def dispatch(value: Either[Throwable, T]): Unit = {
- this.value = value
- dispatch()
- }
+ val runnable = new PromiseCompletingRunnable(body)
+ executor.execute(runnable)
+ runnable.promise.future
}
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index c5060a2368..3ac34bef8a 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -11,11 +11,12 @@ package scala.concurrent.impl
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
-import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, TimeoutException, ExecutionException }
+import scala.concurrent.{ Awaitable, ExecutionContext, blocking, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
//import scala.util.continuations._
import scala.concurrent.util.Duration
import scala.util
import scala.annotation.tailrec
+import scala.util.control.NonFatal
//import scala.concurrent.NonDeterministic
@@ -24,6 +25,21 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
def future: this.type = this
}
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable {
+ // must be filled in before running it
+ var value: Either[Throwable, T] = null
+
+ override def run() = {
+ require(value ne null) // must set value to non-null before running!
+ try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
+ }
+
+ def executeWithValue(v: Either[Throwable, T]): Unit = {
+ require(value eq null) // can't complete it twice
+ value = v
+ executor.execute(this)
+ }
+}
object Promise {
@@ -94,10 +110,10 @@ object Promise {
val resolved = resolveEither(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = {
+ def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]]
+ val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
@@ -107,19 +123,19 @@ object Promise {
synchronized { notifyAll() } //Notify any evil blockers
}) match {
case null => false
- case cs if cs.isEmpty => true
- case cs => cs.foreach(c => c.dispatch(resolved)); true
+ case rs if rs.isEmpty => true
+ case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
- val bound = new Future.OnCompleteTask[T](executor, func)
+ val runnable = new CallbackRunnable[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]])
- case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
+ case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]])
+ case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
}
@@ -139,7 +155,7 @@ object Promise {
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get
- (new Future.OnCompleteTask(executor, func)).dispatch(completedAs)
+ (new CallbackRunnable(executor, func)).executeWithValue(completedAs)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
diff --git a/src/library/scala/package.scala b/src/library/scala/package.scala
index c4a8585563..5f90c32e22 100644
--- a/src/library/scala/package.scala
+++ b/src/library/scala/package.scala
@@ -9,6 +9,7 @@
/**
* Core Scala types. They are always available without an explicit import.
+ * @contentDiagram hideNodes "scala.Serializable"
*/
package object scala {
type Throwable = java.lang.Throwable
diff --git a/src/library/scala/util/control/NonFatal.scala b/src/library/scala/util/control/NonFatal.scala
index 9da2f63307..5137f0f2f5 100644
--- a/src/library/scala/util/control/NonFatal.scala
+++ b/src/library/scala/util/control/NonFatal.scala
@@ -23,16 +23,23 @@ package scala.util.control
* // dangerous stuff
* } catch {
* case NonFatal(e) => log.error(e, "Something not that bad.")
+ * // or
+ * case e if NonFatal(e) => log.error(e, "Something not that bad.")
* }
* }}}
*/
object NonFatal {
-
- def unapply(t: Throwable): Option[Throwable] = t match {
- case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError
- // VirtualMachineError includes OutOfMemoryError and other fatal errors
- case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => None
- case e ⇒ Some(e)
- }
-
+ /**
+ * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal
+ */
+ def apply(t: Throwable): Boolean = t match {
+ case _: StackOverflowError => true // StackOverflowError ok even though it is a VirtualMachineError
+ // VirtualMachineError includes OutOfMemoryError and other fatal errors
+ case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable | _: NotImplementedError => false
+ case _ => true
+ }
+ /**
+ * Returns Some(t) if NonFatal(t) == true, otherwise None
+ */
+ def unapply(t: Throwable): Option[Throwable] = if (apply(t)) Some(t) else None
}