summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 17:41:11 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 17:42:03 +0100
commit09deeec60db0e6e6b6904041db43535e492a0c2d (patch)
treec266868fbfbe3c42b6611b9c10b4d66a0c36416f
parent62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76 (diff)
downloadscala-09deeec60db0e6e6b6904041db43535e492a0c2d.tar.gz
scala-09deeec60db0e6e6b6904041db43535e492a0c2d.tar.bz2
scala-09deeec60db0e6e6b6904041db43535e492a0c2d.zip
Fix `all` combinator on futures, refactor execution context, remove disabled files.
-rw-r--r--src/library/scala/concurrent/AbstractPromise.java.disabled17
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala76
-rw-r--r--src/library/scala/concurrent/Future.scala17
-rw-r--r--src/library/scala/concurrent/Future.scala.disabled1051
-rw-r--r--src/library/scala/concurrent/Promise.scala1
-rw-r--r--src/library/scala/concurrent/package.scala80
-rw-r--r--src/library/scala/concurrent/package.scala.disabled108
7 files changed, 88 insertions, 1262 deletions
diff --git a/src/library/scala/concurrent/AbstractPromise.java.disabled b/src/library/scala/concurrent/AbstractPromise.java.disabled
deleted file mode 100644
index 726e6a3156..0000000000
--- a/src/library/scala/concurrent/AbstractPromise.java.disabled
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
- */
-
-package scala.concurrent;
-/*
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import scala.concurrent.forkjoin.*;
-
-abstract class AbstractPromise {
- //private volatile Object _ref = DefaultPromise.EmptyPending();
- protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
- AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
-
- protected void compute() { }
-}
-*/
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 5539b6858f..ebd5bf6bd3 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -11,14 +11,12 @@ package scala.concurrent
import java.util.concurrent.{ Executors, Future => JFuture, Callable }
-import java.util.concurrent.atomic.{ AtomicInteger }
-import scala.util.{ Duration, Timeout }
+import scala.util.Duration
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
-import scala.collection.generic.CanBuildFrom
-trait ExecutionContext {
+trait ExecutionContext extends ExecutionContextBase {
protected implicit object CanAwaitEvidence extends CanAwait
@@ -36,80 +34,10 @@ trait ExecutionContext {
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
- def futureUtilities: FutureUtilities = FutureUtilitiesImpl
-
}
sealed trait CanAwait
-trait FutureUtilities {
-
-/** TODO some docs
- *
- */
- def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
- val builder = cbf(futures)
- val p: Promise[Coll[T]] = promise[Coll[T]]
-
- if (futures.size == 1) futures.head onComplete {
- case Left(t) => p failure t
- case Right(v) => builder += v
- p success builder.result
- } else {
- val restFutures = all(futures.tail)
- futures.head onComplete {
- case Left(t) => p failure t
- case Right(v) => builder += v
- restFutures onComplete {
- case Left(t) => p failure t
- case Right(vs) => for (v <- vs) builder += v
- p success builder.result
- }
- }
- }
-
- p.future
- }
-
-/** TODO some docs
- *
- */
- def any[T](futures: Traversable[Future[T]]): Future[T] = {
- val futureResult = promise[T]()
-
- val completeFirst: Either[Throwable, T] => Unit = futureElem => futureResult tryComplete futureElem
-
- futures.foreach(_ onComplete completeFirst)
-
- futureResult.future
- }
-
-/** TODO some docs
- *
- */
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
- if (futures.isEmpty) Promise.successful[Option[T]](None).future
- else {
- val result = promise[Option[T]]()
- val ref = new AtomicInteger(futures.size)
- val search: Either[Throwable, T] ⇒ Unit = {
- v ⇒ v match {
- case Right(r) ⇒ if (predicate(r)) result trySuccess Some(r)
- case _ ⇒
- }
- if (ref.decrementAndGet == 0) result trySuccess None
- }
-
- futures.foreach(_ onComplete search)
-
- result.future
- }
- }
-
-}
-
-object FutureUtilitiesImpl extends FutureUtilities {
-}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 468683dcde..ff7da8433a 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -355,8 +355,7 @@ self =>
p.future
}
-
-/*
+
/** Creates a new future which holds the result of either this future or `that` future, depending on
* which future was completed first.
*
@@ -370,20 +369,20 @@ self =>
* await(0) h // evaluates to either 5 or throws a runtime exception
* }}}
*/
- def any[U >: T](that: Future[U]): Future[U] = {
+ def either[U >: T](that: Future[U]): Future[U] = {
val p = newPromise[U]
- val completePromise: PartialFunction[Either[Throwable, T], _] = {
+ val completePromise: PartialFunction[Either[Throwable, U], _] = {
case Left(t) => p tryFailure t
case Right(v) => p trySuccess v
}
+
this onComplete completePromise
that onComplete completePromise
p.future
}
-
-*/
+
}
@@ -399,14 +398,14 @@ object Future {
/** TODO some docs
*/
def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] =
- ec.futureUtilities.all[T, Coll](futures)
+ ec.all[T, Coll](futures)
// move this to future companion object
@inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body)
- def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.futureUtilities.any(futures)
+ def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures)
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.futureUtilities.find(futures)(predicate)
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate)
}
diff --git a/src/library/scala/concurrent/Future.scala.disabled b/src/library/scala/concurrent/Future.scala.disabled
deleted file mode 100644
index 3cd9bbeb6e..0000000000
--- a/src/library/scala/concurrent/Future.scala.disabled
+++ /dev/null
@@ -1,1051 +0,0 @@
-/*
-class FutureFactory(dispatcher: MessageDispatcher, timeout: Timeout) {
-
- // TODO: remove me ASAP !!!
- implicit val _dispatcher = dispatcher
-
- /**
- * Java API, equivalent to Future.apply
- */
- def future[T](body: Callable[T]): Future[T] =
- Future(body.call, timeout)
-
- /**
- * Java API, equivalent to Future.apply
- */
- def future[T](body: Callable[T], timeout: Timeout): Future[T] =
- Future(body.call, timeout)
-
- /**
- * Java API, equivalent to Future.apply
- */
- def future[T](body: Callable[T], timeout: Long): Future[T] =
- Future(body.call, timeout)
-
- /**
- * Java API, equivalent to Future.apply
- */
- def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
- Future(body.call)(dispatcher, timeout)
-
- /**
- * Java API, equivalent to Future.apply
- */
- def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] =
- Future(body.call)(dispatcher, timeout)
-
- /**
- * Java API, equivalent to Future.apply
- */
- def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
- Future(body.call)(dispatcher, timeout)
-
- /**
- * Java API.
- * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
- */
-/*
- def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
- val pred: T ⇒ Boolean = predicate.apply(_)
- Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)), timeout)(pred).map(JOption.fromScalaOption(_))(timeout)
- }
-
- def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = find(futures, predicate, timeout)
-*/
- /**
- * Java API.
- * Returns a Future to the result of the first future in the list that is completed
- */
-/*
- def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
- Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
-
- def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = firstCompletedOf(futures, timeout)
-*/
- /**
- * Java API
- * A non-blocking fold over the specified futures.
- * The fold is performed on the thread where the last future is completed,
- * the result will be the first failure of any of the futures, or any failure in the actual fold,
- * or the result of the fold.
- */
-/*
- def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
- Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(zero)(fun.apply _)
-
- def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
-
- def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout, futures, fun)
-
- /**
- * Java API.
- * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
- */
- def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
- Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
-
- def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
-
- def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout, fun)
-
- /**
- * Java API.
- * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
- * Useful for reducing many Futures into a single Future.
- */
- def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = {
- implicit val t = timeout
- scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒
- for (r ← fr; a ← fa) yield {
- r add a
- r
- })
- }
-
- def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, timeout)
-
- /**
- * Java API.
- * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B].
- * This is useful for performing a parallel map. For example, to apply a function to all items of a list
- * in parallel.
- */
- def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = {
- implicit val t = timeout
- scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒
- val fb = fn(a)
- for (r ← fr; b ← fb) yield {
- r add b
- r
- }
- }
- }
-
- def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, timeout, fn)
-
-*/
-}
-*/
-
-object Future {
- /*
- /**
- * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
- * The execution is performed by the specified Dispatcher.
- */
- def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
- val promise: Promise[T] = DefaultPromise[T](timeout)
-
- dispatcher dispatchTask { () ⇒
- promise complete {
- try {
- Right(body)
- } catch {
- case e ⇒ Left(e)
- }
- }
- }
-
- promise
- }
-
- def apply[T](body: ⇒ T, timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
- apply(body)(dispatcher, timeout)
-
- def apply[T](body: ⇒ T, timeout: Duration)(implicit dispatcher: MessageDispatcher): Future[T] =
- apply(body)(dispatcher, timeout)
-
- def apply[T](body: ⇒ T, timeout: Long)(implicit dispatcher: MessageDispatcher): Future[T] =
- apply(body)(dispatcher, timeout)
-
- import scala.collection.mutable.Builder
- import scala.collection.generic.CanBuildFrom
-
- /**
- * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
- * Useful for reducing many Futures into a single Future.
- */
- def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[A]] =
- in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
-
- def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
- sequence(in)(cbf, timeout, dispatcher)
-
- /**
- * Returns a Future to the result of the first future in the list that is completed
- */
- def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
- val futureResult = DefaultPromise[T](timeout)
-
- val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
- futures.foreach(_ onComplete completeFirst)
-
- futureResult
- }
-
- def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
- firstCompletedOf(futures)(dispatcher, timeout)
-
- /**
- * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
- */
- def find[T](futures: Iterable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[Option[T]] = {
- if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
- else {
- val result = DefaultPromise[Option[T]](timeout)
- val ref = new AtomicInteger(futures.size)
- val search: Future[T] ⇒ Unit = f ⇒ try {
- f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r))
- } finally {
- if (ref.decrementAndGet == 0)
- result completeWithResult None
- }
- futures.foreach(_ onComplete search)
-
- result
- }
- }
-
- def find[T](futures: Iterable[Future[T]], timeout: Timeout)(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] =
- find(futures)(predicate)(dispatcher, timeout)
-
- /**
- * A non-blocking fold over the specified futures.
- * The fold is performed on the thread where the last future is completed,
- * the result will be the first failure of any of the futures, or any failure in the actual fold,
- * or the result of the fold.
- * Example:
- * <pre>
- * val result = Futures.fold(0)(futures)(_ + _).await.result
- * </pre>
- */
-/*
- def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
- if (futures.isEmpty) {
- new KeptPromise[R](Right(zero))
- } else {
- val result = DefaultPromise[R](timeout)
- val results = new ConcurrentLinkedQueue[T]()
- val done = new Switch(false)
- val allDone = futures.size
-
- val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
- f.value.get match {
- case Right(value) ⇒
- val added = results add value
- if (added && results.size == allDone) { //Only one thread can get here
- if (done.switchOn) {
- try {
- val i = results.iterator
- var currentValue = zero
- while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
- result completeWithResult currentValue
- } catch {
- case e: Exception ⇒
- //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage))
- result completeWithException e
- } finally {
- results.clear
- }
- }
- }
- case Left(exception) ⇒
- if (done.switchOn) {
- result completeWithException exception
- results.clear
- }
- }
- }
-
- futures foreach { _ onComplete aggregate }
- result
- }
- }
-*/
-/*
- def fold[T, R](futures: Iterable[Future[T]], timeout: Timeout)(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] =
- fold(futures)(zero)(foldFun)(dispatcher, timeout)
-*/
- /**
- * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
- * Example:
- * <pre>
- * val result = Futures.reduce(futures)(_ + _).await.result
- * </pre>
- */
-/*
- def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
- if (futures.isEmpty)
- new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
- else {
- val result = DefaultPromise[R](timeout)
- val seedFound = new AtomicBoolean(false)
- val seedFold: Future[T] ⇒ Unit = f ⇒ {
- if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
- f.value.get match {
- case Right(value) ⇒ result.completeWith(fold(futures.filterNot(_ eq f))(value)(op))
- case Left(exception) ⇒ result.completeWithException(exception)
- }
- }
- }
- for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
- result
- }
- }
-*/
-/*
- def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout)(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] =
- reduce(futures)(op)(dispatcher, timeout)
-*/
- /**
- * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B].
- * This is useful for performing a parallel map. For example, to apply a function to all items of a list
- * in parallel:
- * <pre>
- * val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x)))
- * </pre>
- */
- def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[B]] =
- in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a) ⇒
- val fb = fn(a.asInstanceOf[A])
- for (r ← fr; b ← fb) yield (r += b)
- }.map(_.result)
-
- def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
- traverse(in)(fn)(cbf, timeout, dispatcher)
-
- /**
- * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
- * Continuations plugin.
- *
- * Within the block, the result of a Future may be accessed by calling Future.apply. At that point
- * execution is suspended with the rest of the block being stored in a continuation until the result
- * of the Future is available. If an Exception is thrown while processing, it will be contained
- * within the resulting Future.
- *
- * This allows working with Futures in an imperative style without blocking for each result.
- *
- * Completing a Future using 'Promise << Future' will also suspend execution until the
- * value of the other Future is available.
- *
- * The Delimited Continuations compiler plugin must be enabled in order to use this method.
- */
-/*
- def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
- val future = Promise[A](timeout)
- dispatchTask({ () ⇒
- (reify(body) foreachFull (future completeWithResult, future completeWithException): Future[Any]) onException {
- case e: Exception ⇒ future completeWithException e
- }
- }, true)
- future
- }
-*/
- // TODO make variant of flow(timeout)(body) which does NOT break type inference
-
- /**
- * Assures that any Future tasks initiated in the current thread will be
- * executed asynchronously, including any tasks currently queued to be
- * executed in the current thread. This is needed if the current task may
- * block, causing delays in executing the remaining tasks which in some
- * cases may cause a deadlock.
- *
- * Note: Calling 'Future.await' will automatically trigger this method.
- *
- * For example, in the following block of code the call to 'latch.open'
- * might not be executed until after the call to 'latch.await', causing
- * a deadlock. By adding 'Future.blocking()' the call to 'latch.open'
- * will instead be dispatched separately from the current block, allowing
- * it to be run in parallel:
- * <pre>
- * val latch = new StandardLatch
- * val future = Future() map { _ ⇒
- * Future.blocking()
- * val nested = Future()
- * nested foreach (_ ⇒ latch.open)
- * latch.await
- * }
- * </pre>
- */
- def blocking()(implicit dispatcher: MessageDispatcher): Unit =
- _taskStack.get match {
- case Some(taskStack) if taskStack.nonEmpty ⇒
- val tasks = taskStack.elems
- taskStack.clear()
- _taskStack set None
- dispatchTask(() ⇒ _taskStack.get.get.elems = tasks, true)
- case Some(_) ⇒ _taskStack set None
- case _ ⇒ // already None
- }
-
- private val _taskStack = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
- override def initialValue = None
- }
-
- private[concurrent] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit =
- _taskStack.get match {
- case Some(taskStack) if !force ⇒ taskStack push task
- case _ ⇒
- dispatcher dispatchTask { () ⇒
- try {
- val taskStack = Stack[() ⇒ Unit](task)
- _taskStack set Some(taskStack)
- while (taskStack.nonEmpty) {
- val next = taskStack.pop()
- try {
- next.apply()
- } catch {
- case e ⇒ e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks
- }
- }
- } finally { _taskStack set None }
- }
- }
- */
-}
-
-//trait Future[+T] {
- /*
- /**
- * For use only within a Future.flow block or another compatible Delimited Continuations reset block.
- *
- * Returns the result of this Future without blocking, by suspending execution and storing it as a
- * continuation until the result is available.
- */
- def apply()(implicit timeout: Timeout): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any]))
-
- /**
- * Blocks awaiting completion of this Future, then returns the resulting value,
- * or throws the completed exception
- *
- * Scala & Java API
- *
- * throws FutureTimeoutException if this Future times out when waiting for completion
- */
- def get: T = this.await.resultOrException.get
-
- /**
- * Blocks the current thread until the Future has been completed or the
- * timeout has expired. In the case of the timeout expiring a
- * FutureTimeoutException will be thrown.
- */
- def await: Future[T]
- */
-
- /**
- * Blocks the current thread until the Future has been completed or the
- * timeout has expired, additionally bounding the waiting period according to
- * the <code>atMost</code> parameter. The timeout will be the lesser value of
- * 'atMost' and the timeout supplied at the constructuion of this Future. In
- * the case of the timeout expiring a FutureTimeoutException will be thrown.
- * Other callers of this method are not affected by the additional bound
- * imposed by <code>atMost</code>.
- */
-// def await(atMost: Duration): Future[T]
-
-/*
- /**
- * Await completion of this Future and return its value if it conforms to A's
- * erased type. Will throw a ClassCastException if the value does not
- * conform, or any exception the Future was completed with. Will return None
- * in case of a timeout.
- */
- def as[A](implicit m: Manifest[A]): Option[A] = {
- try await catch { case _: FutureTimeoutException ⇒ }
- value match {
- case None ⇒ None
- case Some(Left(ex)) ⇒ throw ex
- case Some(Right(v)) ⇒
- try { Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) } catch {
- case c: ClassCastException ⇒
- if (v.asInstanceOf[AnyRef] eq null) throw new ClassCastException("null cannot be cast to " + m.erasure)
- else throw new ClassCastException("'" + v + "' of class " + v.asInstanceOf[AnyRef].getClass + " cannot be cast to " + m.erasure)
- }
- }
- }
-
- /**
- * Await completion of this Future and return its value if it conforms to A's
- * erased type, None otherwise. Will throw any exception the Future was
- * completed with. Will return None in case of a timeout.
- */
- def asSilently[A](implicit m: Manifest[A]): Option[A] = {
- try await catch { case _: FutureTimeoutException ⇒ }
- value match {
- case None ⇒ None
- case Some(Left(ex)) ⇒ throw ex
- case Some(Right(v)) ⇒
- try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
- catch { case _: ClassCastException ⇒ None }
- }
- }
-
- /**
- * Tests whether this Future has been completed.
- */
- final def isCompleted: Boolean = value.isDefined
-
- /**
- * Tests whether this Future's timeout has expired.
- *
- * Note that an expired Future may still contain a value, or it may be
- * completed with a value.
- */
- def isExpired: Boolean
-
- def timeout: Timeout
-
- /**
- * This Future's timeout in nanoseconds.
- */
- def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
-
- /**
- * The contained value of this Future. Before this Future is completed
- * the value will be None. After completion the value will be Some(Right(t))
- * if it contains a valid result, or Some(Left(error)) if it contains
- * an exception.
- */
- def value: Option[Either[Throwable, T]]
-
- /**
- * Returns the successful result of this Future if it exists.
- */
- final def result: Option[T] = value match {
- case Some(Right(r)) ⇒ Some(r)
- case _ ⇒ None
- }
-
- /**
- * Returns the contained exception of this Future if it exists.
- */
- final def exception: Option[Throwable] = value match {
- case Some(Left(e)) ⇒ Some(e)
- case _ ⇒ None
- }
-
- /**
- * When this Future is completed, apply the provided function to the
- * Future. If the Future has already been completed, this will apply
- * immediately. Will not be called in case of a timeout, which also holds if
- * corresponding Promise is attempted to complete after expiry. Multiple
- * callbacks may be registered; there is no guarantee that they will be
- * executed in a particular order.
- */
- def onComplete(func: Future[T] ⇒ Unit): this.type
-
- /**
- * When the future is completed with a valid result, apply the provided
- * PartialFunction to the result. See `onComplete` for more details.
- * <pre>
- * future onResult {
- * case Foo ⇒ target ! "foo"
- * case Bar ⇒ target ! "bar"
- * }
- * </pre>
- */
- final def onResult(pf: PartialFunction[T, Unit]): this.type = onComplete {
- _.value match {
- case Some(Right(r)) if pf isDefinedAt r ⇒ pf(r)
- case _ ⇒
- }
- }
-
- /**
- * When the future is completed with an exception, apply the provided
- * PartialFunction to the exception. See `onComplete` for more details.
- * <pre>
- * future onException {
- * case NumberFormatException ⇒ target ! "wrong format"
- * }
- * </pre>
- */
- final def onException(pf: PartialFunction[Throwable, Unit]): this.type = onComplete {
- _.value match {
- case Some(Left(ex)) if pf isDefinedAt ex ⇒ pf(ex)
- case _ ⇒
- }
- }
-
- def onTimeout(func: Future[T] ⇒ Unit): this.type
-
- def orElse[A >: T](fallback: ⇒ A): Future[A]
-
- /**
- * Creates a new Future that will handle any matching Throwable that this
- * Future might contain. If there is no match, or if this Future contains
- * a valid result then the new Future will contain the same.
- * Example:
- * <pre>
- * Future(6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
- * Future(6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
- * Future(6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
- * </pre>
- */
- final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = {
- val future = DefaultPromise[A](timeout)
- onComplete {
- _.value.get match {
- case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) })
- case otherwise ⇒ future complete otherwise
- }
- }
- future
- }
-
- /**
- * Creates a new Future by applying a function to the successful result of
- * this Future. If this Future is completed with an exception then the new
- * Future will also contain this exception.
- * Example:
- * <pre>
- * val future1 = for {
- * a: Int <- actor ? "Hello" // returns 5
- * b: String <- actor ? a // returns "10"
- * c: String <- actor ? 7 // returns "14"
- * } yield b + "-" + c
- * </pre>
- */
- final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = {
- val future = DefaultPromise[A](timeout)
- onComplete {
- _.value.get match {
- case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
- case Right(res) ⇒
- future complete (try {
- Right(f(res))
- } catch {
- case e: Exception ⇒
- //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
- Left(e)
- })
- }
- }
- future
- }
-
- /**
- * Creates a new Future[A] which is completed with this Future's result if
- * that conforms to A's erased type or a ClassCastException otherwise.
- */
- final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = {
- val fa = DefaultPromise[A](timeout)
- onComplete { ft ⇒
- fa complete (ft.value.get match {
- case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]]
- case Right(t) ⇒
- try {
- Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
- } catch {
- case e: ClassCastException ⇒ Left(e)
- }
- })
- }
- fa
- }
-
- /**
- * Creates a new Future by applying a function to the successful result of
- * this Future, and returns the result of the function as the new Future.
- * If this Future is completed with an exception then the new Future will
- * also contain this exception.
- * Example:
- * <pre>
- * val future1 = for {
- * a: Int <- actor ? "Hello" // returns 5
- * b: String <- actor ? a // returns "10"
- * c: String <- actor ? 7 // returns "14"
- * } yield b + "-" + c
- * </pre>
- */
- final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = {
- val future = DefaultPromise[A](timeout)
-
- onComplete {
- _.value.get match {
- case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
- case Right(r) ⇒ try {
- future.completeWith(f(r))
- } catch {
- case e: Exception ⇒
- //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
- future complete Left(e)
- }
- }
- }
- future
- }
-
- final def foreach(f: T ⇒ Unit): Unit = onComplete {
- _.value.get match {
- case Right(r) ⇒ f(r)
- case _ ⇒
- }
- }
-
- final def withFilter(p: T ⇒ Boolean)(implicit timeout: Timeout) = new FutureWithFilter[T](this, p)
-
- final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean)(implicit timeout: Timeout) {
- def foreach(f: A ⇒ Unit): Unit = self filter p foreach f
- def map[B](f: A ⇒ B): Future[B] = self filter p map f
- def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f
- def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
- }
-
- final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = {
- val future = DefaultPromise[T](timeout)
- onComplete {
- _.value.get match {
- case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, T]]
- case r @ Right(res) ⇒ future complete (try {
- if (p(res)) r else Left(new MatchError(res))
- } catch {
- case e: Exception ⇒
- //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
- Left(e)
- })
- }
- }
- future
- }
-
- /**
- * Returns the current result, throws the exception if one has been raised, else returns None
- */
- final def resultOrException: Option[T] = value match {
- case Some(Left(e)) ⇒ throw e
- case Some(Right(r)) ⇒ Some(r)
- case _ ⇒ None
- }
- */
-//}
-
-/**
- * Essentially this is the Promise (or write-side) of a Future (read-side).
- */
-//trait Promise[T] extends Future[T] {
-
- /*
- def start(): Unit
-
- /**
- * Completes this Future with the specified result, if not already completed.
- * @return this
- */
- def complete(value: Either[Throwable, T]): this.type
-
- /**
- * Completes this Future with the specified result, if not already completed.
- * @return this
- */
- final def completeWithResult(result: T): this.type = complete(Right(result))
-
- /**
- * Completes this Future with the specified exception, if not already completed.
- * @return this
- */
- final def completeWithException(exception: Throwable): this.type = complete(Left(exception))
-
- /**
- * Completes this Future with the specified other Future, when that Future is completed,
- * unless this Future has already been completed.
- * @return this.
- */
- final def completeWith(other: Future[T]): this.type = {
- other onComplete { f ⇒ complete(f.value.get) }
- this
- }
- */
-/*
- final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) }
-
- final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
- val fr = DefaultPromise[Any](this.timeout)
- this completeWith other onComplete { f ⇒
- try {
- fr completeWith cont(f)
- } catch {
- case e: Exception ⇒
- //dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
- fr completeWithException e
- }
- }
- fr
- }
-
- final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
- val fr = DefaultPromise[Any](this.timeout)
- stream.dequeue(this).onComplete { f ⇒
- try {
- fr completeWith cont(f)
- } catch {
- case e: Exception ⇒
- //dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
- fr completeWithException e
- }
- }
- fr
- }
-*/
-//}
-
-/**
- * Represents the internal state of the DefaultCompletableFuture
- */
-sealed abstract class FState[+T] { def value: Option[Either[Throwable, T]] }
-
-case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] {
- def value: Option[Either[Throwable, T]] = None
-}
-
-case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
- def result: T = value.get.right.get
-}
-
-case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
- def exception: Throwable = value.get.left.get
-}
-
-case object Expired extends FState[Nothing] {
- def value: Option[Either[Throwable, Nothing]] = None
-}
-
-//Companion object to FState, just to provide a cheap, immutable default entry
-private[concurrent] object FState {
- def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
- private val emptyPendingValue = Pending[Nothing](Nil)
-}
-
-private[concurrent] object DefaultPromise {
- def apply[T](within: Timeout)(implicit disp: MessageDispatcher): Promise[T] = new DefaultPromise[T] {
- val timeout = within
- implicit val dispatcher = disp
- }
-
- def apply[T]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[T] = apply(timeout)
-
- def apply[T](timeout: Long)(implicit dispatcher: MessageDispatcher): Promise[T] = apply(Timeout(timeout))
-
- def apply[T](timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher): Promise[T] = apply(Timeout(timeout, timeunit))
-}
-
-/**
- * The default concrete Future implementation.
- */
-trait DefaultPromise[T] extends /*AbstractPromise with*/ Promise[T] {
- self ⇒
-
-// @volatile private _ref: AnyRef = DefaultPromise.EmptyPending()
-
-// protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
-// AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
-
- val timeout: Timeout
- implicit val dispatcher: MessageDispatcher
-
- private val _startTimeInNanos = currentTimeInNanos
-
- @tailrec
- private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
- if (value.isEmpty && waitTimeNanos > 0) {
- val ms = NANOS.toMillis(waitTimeNanos)
- val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
- val start = currentTimeInNanos
- try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
-
- awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start))
- } else {
- value.isDefined
- }
- }
-
- def await(atMost: Duration): this.type = if (value.isDefined) this else {
- Future.blocking()
-
- val waitNanos =
- if (timeout.duration.isFinite && atMost.isFinite)
- atMost.toNanos min timeLeft()
- else if (atMost.isFinite)
- atMost.toNanos
- else if (timeout.duration.isFinite)
- timeLeft()
- else Long.MaxValue //If both are infinite, use Long.MaxValue
-
- if (awaitUnsafe(waitNanos)) this
- else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds")
- }
-
- def await = await(timeout.duration)
-
- def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
-
- def value: Option[Either[Throwable, T]] = getState.value
-
- @inline
- protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean =
- AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState)
-
- @inline
- protected final def getState: FState[T] = {
-
- @tailrec
- def read(): FState[T] = {
- val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
- if (cur.isInstanceOf[Pending[_]] && isExpired) {
- if (updateState(cur, Expired)) Expired else read()
- } else cur
- }
-
- read()
- }
-
- def complete(value: Either[Throwable, T]): this.type = {
- val callbacks = {
- try {
- @tailrec
- def tryComplete: List[Future[T] ⇒ Unit] = {
- val cur = getState
-
- cur match {
- case Pending(listeners) ⇒
- if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
- else tryComplete
- case _ ⇒ Nil
- }
- }
- tryComplete
- } finally {
- synchronized { notifyAll() } //Notify any evil blockers
- }
- }
-
- if (callbacks.nonEmpty) Future.dispatchTask(() ⇒ callbacks foreach notifyCompleted)
-
- this
- }
-
- def onComplete(func: Future[T] ⇒ Unit): this.type = {
- @tailrec //Returns whether the future has already been completed or not
- def tryAddCallback(): Boolean = {
- val cur = getState
- cur match {
- case _: Success[_] | _: Failure[_] ⇒ true
- case Expired ⇒ false
- case p: Pending[_] ⇒
- val pt = p.asInstanceOf[Pending[T]]
- if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false
- else tryAddCallback()
- }
- }
-
- if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
-
- this
- }
-
- def onTimeout(func: Future[T] ⇒ Unit): this.type = {
- val runNow =
- if (!timeout.duration.isFinite) false //Not possible
- else if (value.isEmpty) {
- if (!isExpired) {
- val runnable = new Runnable {
- def run() {
- if (!isCompleted) {
- if (!isExpired) {
- // TODO FIXME: add scheduler
- //dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
- } else func(DefaultPromise.this)
- }
- }
- }
- /*
- TODO FIXME: add scheduler
- val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
- onComplete(_ ⇒ timeoutFuture.cancel())
- */
- false
- } else true
- } else false
-
- if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func))
-
- this
- }
-
- final def orElse[A >: T](fallback: ⇒ A): Future[A] =
- if (timeout.duration.isFinite) {
- getState match {
- case _: Success[_] | _: Failure[_] ⇒ this
- case Expired ⇒ Future[A](fallback, timeout)
- case _: Pending[_] ⇒
- val promise = DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
- promise completeWith this
- val runnable = new Runnable {
- def run() {
- if (!isCompleted) {
- if (!isExpired) {
- // TODO FIXME add scheduler
- //dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
- } else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) })
- }
- }
- }
- // TODO FIXME add
- //dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
- promise
- }
- } else this
-
- private def notifyCompleted(func: Future[T] ⇒ Unit) {
- try { func(this) } catch { case e ⇒ /*dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception"))*/ } //TODO catch, everything? Really?
- }
-
- @inline
- private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)?
- //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs.
- @inline
- private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
-
- private def timeLeftNoinline(): Long = timeLeft()
-//}
-
-/**
- * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
- * a Future-composition but you already have a value to contribute.
- */
-sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
- val value = Some(suppliedValue)
-
- def complete(value: Either[Throwable, T]): this.type = this
- def onComplete(func: Future[T] ⇒ Unit): this.type = {
- Future dispatchTask (() ⇒ func(this))
- this
- }
- def await(atMost: Duration): this.type = this
- def await: this.type = this
- def isExpired: Boolean = true
- def timeout: Timeout = Timeout.zero
-
- final def onTimeout(func: Future[T] ⇒ Unit): this.type = this
- final def orElse[A >: T](fallback: ⇒ A): Future[A] = this
- */
-//}
-
-object BoxedType {
-
- private val toBoxed = Map[Class[_], Class[_]](
- classOf[Boolean] -> classOf[jl.Boolean],
- classOf[Byte] -> classOf[jl.Byte],
- classOf[Char] -> classOf[jl.Character],
- classOf[Short] -> classOf[jl.Short],
- classOf[Int] -> classOf[jl.Integer],
- classOf[Long] -> classOf[jl.Long],
- classOf[Float] -> classOf[jl.Float],
- classOf[Double] -> classOf[jl.Double],
- classOf[Unit] -> classOf[scala.runtime.BoxedUnit])
-
- def apply(c: Class[_]): Class[_] = {
- if (c.isPrimitive) toBoxed(c) else c
- }
-
-}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 41a41dd611..43abe566de 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -10,7 +10,6 @@ package scala.concurrent
-import scala.util.Timeout
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 23f26dd3b5..ee8f484379 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -6,13 +6,14 @@
** |/ **
\* */
-
-
package scala
+import java.util.concurrent.atomic.{ AtomicInteger }
import scala.util.{ Timeout, Duration }
+import collection._
+import scala.collection.generic.CanBuildFrom
@@ -129,6 +130,81 @@ package concurrent {
def this(origin: Future[_]) = this(origin, "Future timed out.")
}
+ trait ExecutionContextBase {
+ self: ExecutionContext =>
+
+ private implicit val executionContext = self
+
+ /** TODO some docs
+ *
+ */
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
+ val buffer = new mutable.ArrayBuffer[T]
+ val counter = new AtomicInteger(1) // how else could we do this?
+ val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
+ var idx = 0
+
+ def tryFinish() = if (counter.decrementAndGet() == 0) {
+ val builder = cbf(futures)
+ builder ++= buffer
+ p success builder.result
+ }
+
+ for (f <- futures) {
+ val currentIndex = idx
+ buffer += null.asInstanceOf[T]
+ counter.incrementAndGet()
+ f onComplete {
+ case Left(t) =>
+ p tryFailure t
+ case Right(v) =>
+ buffer(currentIndex) = v
+ tryFinish()
+ }
+ idx += 1
+ }
+
+ tryFinish()
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def any[T](futures: Traversable[Future[T]]): Future[T] = {
+ val p = promise[T]
+ val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem
+
+ futures foreach (_ onComplete completeFirst)
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
+ if (futures.isEmpty) Promise.successful[Option[T]](None).future
+ else {
+ val result = promise[Option[T]]
+ val count = new AtomicInteger(futures.size)
+ val search: Either[Throwable, T] => Unit = {
+ v => v match {
+ case Right(r) => if (predicate(r)) result trySuccess Some(r)
+ case _ =>
+ }
+ if (count.decrementAndGet() == 0) result trySuccess None
+ }
+
+ futures.foreach(_ onComplete search)
+
+ result.future
+ }
+ }
+
+ }
+
}
diff --git a/src/library/scala/concurrent/package.scala.disabled b/src/library/scala/concurrent/package.scala.disabled
deleted file mode 100644
index 42b4bf954c..0000000000
--- a/src/library/scala/concurrent/package.scala.disabled
+++ /dev/null
@@ -1,108 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-
-package scala
-
-
-
-
-/** This package object contains primitives for parallel programming.
- */
-package object concurrent {
-
- /** Performs a call which can potentially block execution.
- *
- * Example:
- * {{{
- * val lock = new ReentrantLock
- *
- * // ... do something ...
- *
- * blocking {
- * if (!lock.hasLock) lock.lock()
- * }
- * }}}
- *
- * '''Note:''' calling methods that wait arbitrary amounts of time
- * (e.g. for I/O operations or locks) may severely decrease performance
- * or even result in deadlocks. This does not include waiting for
- * results of futures.
- *
- * @tparam T the result type of the blocking operation
- * @param body the blocking operation
- * @param runner the runner used for parallel computations
- * @return the result of the potentially blocking operation
- */
- def blocking[T](body: =>T)(implicit runner: TaskRunner): T = {
- null.asInstanceOf[T]
- }
-
- /** Invokes a computation asynchronously. Does not wait for the computation
- * to finish.
- *
- * @tparam U the result type of the operation
- * @param p the computation to be invoked asynchronously
- * @param runner the runner used for parallel computations
- */
- def spawn[U](p: =>U)(implicit runner: TaskRunner): Unit = {
- }
-
- /** Starts 2 parallel computations and returns once they are completed.
- *
- * $invokingPar
- *
- * @tparam T1 the type of the result of 1st the parallel computation
- * @tparam T2 the type of the result of 2nd the parallel computation
- * @param b1 the 1st computation to be invoked in parallel
- * @param b2 the 2nd computation to be invoked in parallel
- * @param runner the runner used for parallel computations
- * @return a tuple of results corresponding to parallel computations
- */
- def par[T1, T2](b1: =>T1)(b2: =>T2)(implicit runner: TaskRunner): (T1, T2) = {
- null
- }
-
- /** Starts 3 parallel computations and returns once they are completed.
- *
- * $invokingPar
- *
- * @tparam T1 the type of the result of 1st the parallel computation
- * @tparam T2 the type of the result of 2nd the parallel computation
- * @tparam T3 the type of the result of 3rd the parallel computation
- * @param b1 the 1st computation to be invoked in parallel
- * @param b2 the 2nd computation to be invoked in parallel
- * @param b3 the 3rd computation to be invoked in parallel
- * @param runner the runner used for parallel computations
- * @return a tuple of results corresponding to parallel computations
- */
- def par[T1, T2, T3](b1: =>T1)(b2: =>T2)(b3: =>T3)(implicit runner: TaskRunner): (T1, T2, T3) = {
- null
- }
-
- /** Starts 4 parallel computations and returns once they are completed.
- *
- * $invokingPar
- *
- * @tparam T1 the type of the result of 1st the parallel computation
- * @tparam T2 the type of the result of 2nd the parallel computation
- * @tparam T3 the type of the result of 3rd the parallel computation
- * @tparam T4 the type of the result of 4th the parallel computation
- * @param b1 the 1st computation to be invoked in parallel
- * @param b2 the 2nd computation to be invoked in parallel
- * @param b3 the 3rd computation to be invoked in parallel
- * @param b4 the 4th computation to be invoked in parallel
- * @param runner the runner used for parallel computations
- * @return a tuple of results corresponding to parallel computations
- */
- def par[T1, T2, T3, T4](b1: =>T1)(b2: =>T2)(b3: =>T3)(b4: =>T4)(implicit runner: TaskRunner): (T1, T2, T3, T4) = {
- null
- }
-
-}