summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/package.scala
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 17:41:11 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-19 17:42:03 +0100
commit09deeec60db0e6e6b6904041db43535e492a0c2d (patch)
treec266868fbfbe3c42b6611b9c10b4d66a0c36416f /src/library/scala/concurrent/package.scala
parent62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76 (diff)
downloadscala-09deeec60db0e6e6b6904041db43535e492a0c2d.tar.gz
scala-09deeec60db0e6e6b6904041db43535e492a0c2d.tar.bz2
scala-09deeec60db0e6e6b6904041db43535e492a0c2d.zip
Fix `all` combinator on futures, refactor execution context, remove disabled files.
Diffstat (limited to 'src/library/scala/concurrent/package.scala')
-rw-r--r--src/library/scala/concurrent/package.scala80
1 files changed, 78 insertions, 2 deletions
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 23f26dd3b5..ee8f484379 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -6,13 +6,14 @@
** |/ **
\* */
-
-
package scala
+import java.util.concurrent.atomic.{ AtomicInteger }
import scala.util.{ Timeout, Duration }
+import collection._
+import scala.collection.generic.CanBuildFrom
@@ -129,6 +130,81 @@ package concurrent {
def this(origin: Future[_]) = this(origin, "Future timed out.")
}
+ trait ExecutionContextBase {
+ self: ExecutionContext =>
+
+ private implicit val executionContext = self
+
+ /** TODO some docs
+ *
+ */
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
+ val buffer = new mutable.ArrayBuffer[T]
+ val counter = new AtomicInteger(1) // how else could we do this?
+ val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
+ var idx = 0
+
+ def tryFinish() = if (counter.decrementAndGet() == 0) {
+ val builder = cbf(futures)
+ builder ++= buffer
+ p success builder.result
+ }
+
+ for (f <- futures) {
+ val currentIndex = idx
+ buffer += null.asInstanceOf[T]
+ counter.incrementAndGet()
+ f onComplete {
+ case Left(t) =>
+ p tryFailure t
+ case Right(v) =>
+ buffer(currentIndex) = v
+ tryFinish()
+ }
+ idx += 1
+ }
+
+ tryFinish()
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def any[T](futures: Traversable[Future[T]]): Future[T] = {
+ val p = promise[T]
+ val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem
+
+ futures foreach (_ onComplete completeFirst)
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
+ if (futures.isEmpty) Promise.successful[Option[T]](None).future
+ else {
+ val result = promise[Option[T]]
+ val count = new AtomicInteger(futures.size)
+ val search: Either[Throwable, T] => Unit = {
+ v => v match {
+ case Right(r) => if (predicate(r)) result trySuccess Some(r)
+ case _ =>
+ }
+ if (count.decrementAndGet() == 0) result trySuccess None
+ }
+
+ futures.foreach(_ onComplete search)
+
+ result.future
+ }
+ }
+
+ }
+
}