summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/collection/immutable/List.scala2
-rw-r--r--src/library/scala/collection/mutable/Stack.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala4
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/ParVector.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala6
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashSet.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParTrieMap.scala2
-rw-r--r--src/library/scala/concurrent/Awaitable.scala9
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala14
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala91
-rw-r--r--src/library/scala/concurrent/Future.scala128
-rw-r--r--src/library/scala/concurrent/Promise.scala16
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala84
-rw-r--r--src/library/scala/concurrent/impl/Future.scala74
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala29
-rw-r--r--src/library/scala/concurrent/package.scala28
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala2
20 files changed, 259 insertions, 242 deletions
diff --git a/src/library/scala/collection/immutable/List.scala b/src/library/scala/collection/immutable/List.scala
index 870c179b2d..1b75c10113 100644
--- a/src/library/scala/collection/immutable/List.scala
+++ b/src/library/scala/collection/immutable/List.scala
@@ -58,7 +58,7 @@ import java.io._
* @author Martin Odersky and others
* @version 2.8
* @since 1.0
- * @see [["http://docs.scala-lang.org/overviews/collections/concrete-immutable-collection-classes.html#lists" "Scala's Collection Library overview"]]
+ * @see [[http://docs.scala-lang.org/overviews/collections/concrete-immutable-collection-classes.html#lists "Scala's Collection Library overview"]]
* section on `Lists` for more information.
*
* @define coll list
diff --git a/src/library/scala/collection/mutable/Stack.scala b/src/library/scala/collection/mutable/Stack.scala
index 8fad131009..b70df05c55 100644
--- a/src/library/scala/collection/mutable/Stack.scala
+++ b/src/library/scala/collection/mutable/Stack.scala
@@ -44,7 +44,7 @@ object Stack extends SeqFactory[Stack] {
* @author Martin Odersky
* @version 2.8
* @since 1
- * @see [[http://docs.scala-lang.org/overviews/collections/concrete-mutable-collection-classes.html#stacks"Scala's Collection Library overview"]]
+ * @see [[http://docs.scala-lang.org/overviews/collections/concrete-mutable-collection-classes.html#stacks "Scala's Collection Library overview"]]
* section on `Stacks` for more information.
* @define Coll Stack
* @define coll stack
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 49b00bebdb..e630a9dbed 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -36,7 +36,9 @@ import collection.parallel.Task
*
* @author Aleksandar Prokopec
* @since 2.9
- *
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tries Scala's Parallel Collections Library overview]]
+ * section on Parallel Hash Tries for more information.
+ *
* @define Coll immutable.ParHashMap
* @define coll immutable parallel hash map
*/
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 11d92a27c9..084637c5dc 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -35,6 +35,8 @@ import collection.parallel.Task
*
* @author Aleksandar Prokopec
* @since 2.9
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tries Scala's Parallel Collections Library overview]]
+ * section on Parallel Hash Tries for more information.
*
* @define Coll immutable.ParHashSet
* @define coll immutable parallel hash set
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index 9cac433460..277fd5fdd3 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -25,6 +25,8 @@ import scala.collection.Iterator
*
* @author Aleksandar Prokopec
* @since 2.9
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_range Scala's Parallel Collections Library overview]]
+ * section on `ParRange` for more information.
*
* @define Coll immutable.ParRange
* @define coll immutable parallel range
diff --git a/src/library/scala/collection/parallel/immutable/ParVector.scala b/src/library/scala/collection/parallel/immutable/ParVector.scala
index 5d9c431bc1..8baa84b77c 100644
--- a/src/library/scala/collection/parallel/immutable/ParVector.scala
+++ b/src/library/scala/collection/parallel/immutable/ParVector.scala
@@ -34,6 +34,8 @@ import immutable.VectorIterator
*
* @author Aleksandar Prokopec
* @since 2.9
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_vector Scala's Parallel Collections Library overview]]
+ * section on `ParVector` for more information.
*
* @define Coll immutable.ParVector
* @define coll immutable parallel vector
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index c33495bd39..8cc0b95997 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -44,10 +44,14 @@ import scala.collection.GenTraversableOnce
*
* @tparam T type of the elements in the array
*
+ * @author Aleksandar Prokopec
+ * @since 2.9
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_array Scala's Parallel Collections Library overview]]
+ * section on `ParArray` for more information.
+ *
* @define Coll ParArray
* @define coll parallel array
*
- * @author Aleksandar Prokopec
*/
@SerialVersionUID(1L)
class ParArray[T] private[mutable] (val arrayseq: ArraySeq[T])
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 6ce6c45460..23b23d55a1 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -32,6 +32,8 @@ import collection.parallel.Task
* @define coll parallel hash map
*
* @author Aleksandar Prokopec
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tables Scala's Parallel Collections Library overview]]
+ * section on Parallel Hash Tables for more information.
*/
@SerialVersionUID(1L)
class ParHashMap[K, V] private[collection] (contents: HashTable.Contents[K, DefaultEntry[K, V]])
diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
index e0a2ab03df..4e9a38c13f 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
@@ -29,6 +29,8 @@ import collection.parallel.Task
* @define coll parallel hash set
*
* @author Aleksandar Prokopec
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tables Scala's Parallel Collections Library overview]]
+ * section on Parallel Hash Tables for more information.
*/
@SerialVersionUID(1L)
class ParHashSet[T] private[collection] (contents: FlatHashTable.Contents[T])
diff --git a/src/library/scala/collection/parallel/mutable/ParTrieMap.scala b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala
index fa19990b91..359c35f1dd 100644
--- a/src/library/scala/collection/parallel/mutable/ParTrieMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala
@@ -33,6 +33,8 @@ import scala.collection.concurrent.TrieMapIterator
*
* @author Aleksandar Prokopec
* @since 2.10
+ * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_concurrent_tries Scala's Parallel Collections Library overview]]
+ * section on `ParTrieMap` for more information.
*/
final class ParTrieMap[K, V] private[collection] (private val ctrie: TrieMap[K, V])
extends ParMap[K, V]
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
index 6c9995eb05..052e6e2366 100644
--- a/src/library/scala/concurrent/Awaitable.scala
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -16,8 +16,13 @@ import scala.concurrent.util.Duration
trait Awaitable[+T] {
- @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.")
- def await(atMost: Duration)(implicit canawait: CanAwait): T
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type
+
+ /**
+ * Throws exceptions if cannot produce a T within the specified time
+ * This method should not be called directly.
+ */
+ def result(atMost: Duration)(implicit permit: CanAwait): T
}
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index 3471095051..ba98757906 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -59,14 +59,18 @@ abstract class ConcurrentPackageObject {
/* concurrency constructs */
def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =
- execCtx future body
+ Future[T](body)
def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] =
- execCtx promise
+ Promise[T]()
/** Wraps a block of code into an awaitable object. */
def body2awaitable[T](body: =>T) = new Awaitable[T] {
- def await(atMost: Duration)(implicit cb: CanAwait) = body
+ def ready(atMost: Duration)(implicit permit: CanAwait) = {
+ body
+ this
+ }
+ def result(atMost: Duration)(implicit permit: CanAwait) = body
}
/** Used to block on a piece of code which potentially blocks.
@@ -78,8 +82,8 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext): T =
- executionContext.blocking(atMost)(body)
+ def blocking[T](body: =>T)(implicit execCtx: ExecutionContext): T =
+ executionContext.blocking(body)
/** Blocks on an awaitable object.
*
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index c4a45f9fb5..a206a2d4ea 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -22,19 +22,11 @@ import collection._
trait ExecutionContext {
- protected implicit object CanAwaitEvidence extends CanAwait
-
def execute(runnable: Runnable): Unit
def execute[U](body: () => U): Unit
- def promise[T]: Promise[T]
-
- def future[T](body: Callable[T]): Future[T] = future(body.call())
-
- def future[T](body: => T): Future[T]
-
- def blocking[T](atMost: Duration)(body: =>T): T
+ def blocking[T](body: =>T): T
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
@@ -44,89 +36,8 @@ trait ExecutionContext {
private implicit val executionContext = this
- def keptPromise[T](result: T): Promise[T] = {
- val p = promise[T]
- p success result
- }
-
- def brokenPromise[T](t: Throwable): Promise[T] = {
- val p = promise[T]
- p failure t
- }
-
- /** TODO some docs
- *
- */
- def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
- import nondeterministic._
- val buffer = new mutable.ArrayBuffer[T]
- val counter = new AtomicInteger(1) // how else could we do this?
- val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
- var idx = 0
-
- def tryFinish() = if (counter.decrementAndGet() == 0) {
- val builder = cbf(futures)
- builder ++= buffer
- p success builder.result
- }
-
- for (f <- futures) {
- val currentIndex = idx
- buffer += null.asInstanceOf[T]
- counter.incrementAndGet()
- f onComplete {
- case Failure(t) =>
- p tryFailure t
- case Success(v) =>
- buffer(currentIndex) = v
- tryFinish()
- }
- idx += 1
- }
-
- tryFinish()
-
- p.future
- }
-
- /** TODO some docs
- *
- */
- def any[T](futures: Traversable[Future[T]]): Future[T] = {
- val p = promise[T]
- val completeFirst: Try[T] => Unit = elem => p tryComplete elem
-
- futures foreach (_ onComplete completeFirst)
-
- p.future
- }
-
- /** TODO some docs
- *
- */
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
- if (futures.isEmpty) Promise.kept[Option[T]](None).future
- else {
- val result = promise[Option[T]]
- val count = new AtomicInteger(futures.size)
- val search: Try[T] => Unit = {
- v => v match {
- case Success(r) => if (predicate(r)) result trySuccess Some(r)
- case _ =>
- }
- if (count.decrementAndGet() == 0) result trySuccess None
- }
-
- futures.foreach(_ onComplete search)
-
- result.future
- }
- }
-
}
-sealed trait CanAwait
-
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 1dc8e38355..d73801aa90 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -134,8 +134,26 @@ self =>
/** Creates a new promise.
*/
def newPromise[S]: Promise[S]
-
-
+
+ /** Returns whether the future has already been completed with
+ * a value or an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return `true` if the future is already completed, `false` otherwise
+ */
+ def isCompleted: Boolean
+
+ /** The value of this `Future`.
+ *
+ * If the future is not completed the returned value will be `None`.
+ * If the future is completed the value will be `Some(Success(t))`
+ * if it contains a valid result, or `Some(Failure(error))` if it contains
+ * an exception.
+ */
+ def value: Option[Try[T]]
+
+
/* Projections */
/** Returns a failed projection of this future.
@@ -235,8 +253,8 @@ self =>
* val f = future { 5 }
* val g = f filter { _ % 2 == 1 }
* val h = f filter { _ % 2 == 0 }
- * await(0) g // evaluates to 5
- * await(0) h // throw a NoSuchElementException
+ * await(g, 0) // evaluates to 5
+ * await(h, 0) // throw a NoSuchElementException
* }}}
*/
def filter(pred: T => Boolean): Future[T] = {
@@ -272,8 +290,8 @@ self =>
* val h = f collect {
* case x if x > 0 => x * 2
* }
- * await(0) g // evaluates to 5
- * await(0) h // throw a NoSuchElementException
+ * await(g, 0) // evaluates to 5
+ * await(h, 0) // throw a NoSuchElementException
* }}}
*/
def collect[S](pf: PartialFunction[T, S]): Future[S] = {
@@ -383,7 +401,7 @@ self =>
* val f = future { sys.error("failed") }
* val g = future { 5 }
* val h = f orElse g
- * await(0) h // evaluates to 5
+ * await(h, 0) // evaluates to 5
* }}}
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
@@ -445,7 +463,7 @@ self =>
* val f = future { sys.error("failed") }
* val g = future { 5 }
* val h = f either g
- * await(0) h // evaluates to either 5 or throws a runtime exception
+ * await(h, 0) // evaluates to either 5 or throws a runtime exception
* }}}
*/
def either[U >: T](that: Future[U]): Future[U] = {
@@ -466,26 +484,102 @@ self =>
-/** TODO some docs
+/** Future companion object.
*
* @define nonDeterministic
* Note: using this method yields nondeterministic dataflow programs.
*/
object Future {
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body)
+
+ import scala.collection.mutable.Builder
+ import scala.collection.generic.CanBuildFrom
+
+ /** Simple version of `Futures.traverse`. Transforms a `Traversable[Future[A]]` into a `Future[Traversable[A]]`.
+ * Useful for reducing many `Future`s into a single `Future`.
+ */
+ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
+ in.foldLeft(Promise.successful(cbf(in)).future) {
+ (fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)
+ } map (_.result)
+ }
+
+ /** Returns a `Future` to the result of the first future in the list that is completed.
+ */
+ def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]()
+
+ val completeFirst: Try[T] => Unit = p tryComplete _
+ futures.foreach(_ onComplete completeFirst)
- // TODO make more modular by encoding all other helper methods within the execution context
- /** TODO some docs
+ p.future
+ }
+
+ /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate.
*/
- def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] =
- ec.all[T, Coll](futures)
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
+ if (futures.isEmpty) Promise.successful[Option[T]](None).future
+ else {
+ val result = Promise[Option[T]]()
+ val ref = new AtomicInteger(futures.size)
+ val search: Try[T] => Unit = v => try {
+ v match {
+ case Success(r) => if (predicate(r)) result tryComplete Success(Some(r))
+ case _ =>
+ }
+ } finally {
+ if (ref.decrementAndGet == 0)
+ result tryComplete Success(None)
+ }
- // move this to future companion object
- @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body)
+ futures.foreach(_ onComplete search)
- def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures)
+ result.future
+ }
+ }
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate)
+ /** A non-blocking fold over the specified futures, with the start value of the given zero.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ *
+ * Example:
+ * {{{
+ * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
+ * }}}
+ */
+ def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
+ if (futures.isEmpty) Promise.successful(zero).future
+ else sequence(futures).map(_.foldLeft(zero)(foldFun))
+ }
+ /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first.
+ *
+ * Example:
+ * {{{
+ * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
+ * }}}
+ */
+ def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
+ if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future
+ else sequence(futures).map(_ reduceLeft op)
+ }
+
+ /** Transforms a `Traversable[A]` into a `Future[Traversable[B]]` using the provided function `A => Future[B]`.
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel:
+ *
+ * {{{
+ * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x)))
+ * }}}
+ */
+ def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
+ in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) =>
+ val fb = fn(a.asInstanceOf[A])
+ for (r <- fr; b <- fb) yield (r += b)
+ }.map(_.result)
+
}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 4404e90971..61e21606e6 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -30,8 +30,6 @@ import scala.util.{ Try, Success, Failure }
*/
trait Promise[T] {
- import nondeterministic._
-
/** Future containing the value of this promise.
*/
def future: Future[T]
@@ -114,12 +112,18 @@ trait Promise[T] {
object Promise {
- def kept[T](result: T)(implicit execctx: ExecutionContext): Promise[T] =
- execctx keptPromise result
+ /** Creates a new promise.
+ */
+ def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]()
- def broken[T](t: Throwable)(implicit execctx: ExecutionContext): Promise[T] =
- execctx brokenPromise t
+ /** Creates an already completed Promise with the specified exception
+ */
+ def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception))
+ /** Creates an already completed Promise with the specified result
+ */
+ def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Success(result))
+
}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 8ac745fd25..5dc440f42b 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -15,7 +15,6 @@ import scala.concurrent.forkjoin._
import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.{ Duration }
-import scala.collection.mutable.Stack
@@ -38,32 +37,12 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E
def run() = body()
})
- def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this)
-
- def future[T](body: =>T): Future[T] = {
- val p = promise[T]
-
- dispatchFuture {
- () =>
- p complete {
- try {
- Success(body)
- } catch {
- case e => resolver(e)
- }
- }
- }
-
- p.future
- }
-
- def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+ def blocking[T](body: =>T): T = blocking(body2awaitable(body), Duration.fromNanos(0))
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(awaitable) // inside an execution context thread
- }
+ Future.releaseStack(this)
+
+ awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
}
def reportFailure(t: Throwable) = t match {
@@ -71,61 +50,6 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E
case t => t.printStackTrace()
}
- /** Only callable from the tasks running on the same execution context. */
- private def blockingCall[T](body: Awaitable[T]): T = {
- releaseStack()
-
- // TODO see what to do with timeout
- body.await(Duration.fromNanos(0))(CanAwaitEvidence)
- }
-
- // an optimization for batching futures
- // TODO we should replace this with a public queue,
- // so that it can be stolen from
- // OR: a push to the local task queue should be so cheap that this is
- // not even needed, but stealing is still possible
- private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
-
- private def releaseStack(): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && stack.nonEmpty =>
- val tasks = stack.elems
- stack.clear()
- _taskStack.remove()
- dispatchFuture(() => _taskStack.get.elems = tasks, true)
- case null =>
- // do nothing - there is no local batching stack anymore
- case _ =>
- _taskStack.remove()
- }
-
- private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && !force => stack push task
- case _ => this.execute(
- new Runnable {
- def run() {
- try {
- val taskStack = Stack[() => Unit](task)
- _taskStack set taskStack
- while (taskStack.nonEmpty) {
- val next = taskStack.pop()
- try {
- next.apply()
- } catch {
- case e =>
- // TODO catching all and continue isn't good for OOME
- reportFailure(e)
- }
- }
- } finally {
- _taskStack.remove()
- }
- }
- }
- )
- }
-
}
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index b4385ea34a..6833b2467f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -8,13 +8,17 @@
package scala.concurrent.impl
+
+
import scala.concurrent.{Awaitable, ExecutionContext}
import scala.util.{ Try, Success, Failure }
-//import scala.util.continuations._
+import scala.collection.mutable.Stack
+
+
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
- implicit def executor: ExecutionContextImpl
+ implicit def executor: ExecutionContext
/** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
@@ -40,7 +44,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[T](implicit m: Manifest[T]) = {
- val p = executor.promise[T]
+ val p = new Promise.DefaultPromise[T]
onComplete {
case f @ Failure(t) => p complete f.asInstanceOf[Try[T]]
@@ -48,7 +52,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
p complete (try {
Success(Future.boxedType(m.erasure).cast(v).asInstanceOf[T])
} catch {
- case e: ClassCastException ⇒ Failure(e)
+ case e: ClassCastException => Failure(e)
})
}
@@ -86,4 +90,66 @@ object Future {
def boxedType(c: Class[_]): Class[_] = {
if (c.isPrimitive) toBoxed(c) else c
}
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+ val promise = new Promise.DefaultPromise[T]()
+ executor.execute(new Runnable {
+ def run = {
+ promise complete {
+ try {
+ Success(body)
+ } catch {
+ case e => scala.concurrent.resolver(e)
+ }
+ }
+ }
+ })
+ promise.future
+ }
+
+ // an optimization for batching futures
+ // TODO we should replace this with a public queue,
+ // so that it can be stolen from
+ // OR: a push to the local task queue should be so cheap that this is
+ // not even needed, but stealing is still possible
+ private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private[impl] def releaseStack(executor: ExecutionContext): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && stack.nonEmpty =>
+ val tasks = stack.elems
+ stack.clear()
+ _taskStack.remove()
+ dispatchFuture(executor, () => _taskStack.get.elems = tasks, true)
+ case null =>
+ // do nothing - there is no local batching stack anymore
+ case _ =>
+ _taskStack.remove()
+ }
+
+ private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && !force => stack push task
+ case _ => executor.execute(new Runnable {
+ def run() {
+ try {
+ val taskStack = Stack[() => Unit](task)
+ _taskStack set taskStack
+ while (taskStack.nonEmpty) {
+ val next = taskStack.pop()
+ try {
+ next.apply()
+ } catch {
+ case e =>
+ // TODO catching all and continue isn't good for OOME
+ executor.reportFailure(e)
+ }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+ })
+ }
+
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 4a983b5001..c79b0d02cc 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -26,7 +26,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
def future = this
- def newPromise[S]: Promise[S] = executor promise
+ def newPromise[S]: scala.concurrent.Promise[S] = new Promise.DefaultPromise()
// TODO refine answer and return types here from Any to type parameters
// then move this up in the hierarchy
@@ -75,6 +75,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
object Promise {
+
def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue
def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
@@ -101,7 +102,7 @@ object Promise {
/** Default promise implementation.
*/
- class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] {
+ class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
self =>
updater.set(this, Promise.EmptyPending())
@@ -126,14 +127,14 @@ object Promise {
value.isDefined
}
- executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0))
+ executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
}
- private def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (value.isDefined || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
- def await(atMost: Duration)(implicit permit: CanAwait): T =
+ def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case util.Failure(e) => throw e
case util.Success(r) => r
@@ -176,9 +177,9 @@ object Promise {
case null => false
case cs if cs.isEmpty => true
case cs =>
- executor dispatchFuture {
+ Future.dispatchFuture(executor, {
() => cs.foreach(f => notifyCompleted(f, value))
- }
+ })
true
}
}
@@ -197,9 +198,9 @@ object Promise {
if (tryAddCallback()) {
val result = value.get
- executor dispatchFuture {
+ Future.dispatchFuture(executor, {
() => notifyCompleted(func, result)
- }
+ })
}
this
@@ -218,22 +219,22 @@ object Promise {
*
* Useful in Future-composition when a value to contribute is already available.
*/
- final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] {
+ final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContext) extends Promise[T] {
val value = Some(resolve(suppliedValue))
def tryComplete(value: Try[T]): Boolean = false
def onComplete[U](func: Try[T] => U): this.type = {
val completedAs = value.get
- executor dispatchFuture {
+ Future.dispatchFuture(executor, {
() => func(completedAs)
- }
+ })
this
}
- private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
- def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
+ def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
case util.Failure(e) => throw e
case util.Success(r) => r
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 204b3f2673..e2ae17498f 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -20,27 +20,17 @@ package object concurrent extends scala.concurrent.ConcurrentPackageObject {
}
package concurrent {
- object await {
- def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = {
- try blocking(awaitable, atMost)
- catch { case _ => }
- awaitable
- }
-
- def result[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = {
- blocking(awaitable, atMost)
- }
+
+ sealed trait CanAwait
+
+ object Await {
+ private[concurrent] implicit val canAwaitEvidence = new CanAwait {}
+
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = awaitable.ready(atMost)
+
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
}
- /** Importing this object allows using some concurrency primitives
- * on futures and promises that can yield nondeterministic programs.
- *
- * While program determinism is broken when using these primitives,
- * some programs cannot be written without them (e.g. multiple client threads
- * cannot send requests to a server thread through regular promises and futures).
- */
- object nondeterministic { }
-
/** A timeout exception.
*
* Futures are failed with a timeout exception when their timeout expires.
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index 70221c0de1..75e2b92ff6 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -74,7 +74,7 @@ trait FutureCallbacks extends TestBase {
done()
throw new Exception
}
- f onSuccess {
+ f onSuccess {
case _ => assert(false)
}
}