summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/ExecutionContext.scala
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 19:55:41 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 19:55:41 +0100
commit778e7d1a1b87431449cbe7335ca3a66fbe7c8366 (patch)
treef78a8d28830f7ffc775f18c443efc213200dc2ea /src/library/scala/concurrent/ExecutionContext.scala
parentebc3636a1b6dd64425630afa83eb398a8d7c43a4 (diff)
downloadscala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.tar.gz
scala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.tar.bz2
scala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.zip
Add implicit conversion for futures that enables calling nondeterministic methods.
Diffstat (limited to 'src/library/scala/concurrent/ExecutionContext.scala')
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala77
1 files changed, 76 insertions, 1 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index ebd5bf6bd3..0657121de2 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -10,13 +10,16 @@ package scala.concurrent
+import java.util.concurrent.atomic.{ AtomicInteger }
import java.util.concurrent.{ Executors, Future => JFuture, Callable }
import scala.util.Duration
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+import scala.collection.generic.CanBuildFrom
+import collection._
-trait ExecutionContext extends ExecutionContextBase {
+trait ExecutionContext {
protected implicit object CanAwaitEvidence extends CanAwait
@@ -34,6 +37,78 @@ trait ExecutionContext extends ExecutionContextBase {
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
+ /* implementations follow */
+
+ private implicit val executionContext = this
+
+ /** TODO some docs
+ *
+ */
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
+ val buffer = new mutable.ArrayBuffer[T]
+ val counter = new AtomicInteger(1) // how else could we do this?
+ val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
+ var idx = 0
+
+ def tryFinish() = if (counter.decrementAndGet() == 0) {
+ val builder = cbf(futures)
+ builder ++= buffer
+ p success builder.result
+ }
+
+ for (f <- futures) {
+ val currentIndex = idx
+ buffer += null.asInstanceOf[T]
+ counter.incrementAndGet()
+ f onComplete {
+ case Left(t) =>
+ p tryFailure t
+ case Right(v) =>
+ buffer(currentIndex) = v
+ tryFinish()
+ }
+ idx += 1
+ }
+
+ tryFinish()
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def any[T](futures: Traversable[Future[T]]): Future[T] = {
+ val p = promise[T]
+ val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem
+
+ futures foreach (_ onComplete completeFirst)
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
+ if (futures.isEmpty) Promise.successful[Option[T]](None).future
+ else {
+ val result = promise[Option[T]]
+ val count = new AtomicInteger(futures.size)
+ val search: Either[Throwable, T] => Unit = {
+ v => v match {
+ case Right(r) => if (predicate(r)) result trySuccess Some(r)
+ case _ =>
+ }
+ if (count.decrementAndGet() == 0) result trySuccess None
+ }
+
+ futures.foreach(_ onComplete search)
+
+ result.future
+ }
+ }
+
}