diff options
author | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-19 17:41:11 +0100 |
---|---|---|
committer | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-19 17:42:03 +0100 |
commit | 09deeec60db0e6e6b6904041db43535e492a0c2d (patch) | |
tree | c266868fbfbe3c42b6611b9c10b4d66a0c36416f /src/library/scala/concurrent/package.scala | |
parent | 62bfdb1c8d4d508b976c2ab6ffdae98e35bd4b76 (diff) | |
download | scala-09deeec60db0e6e6b6904041db43535e492a0c2d.tar.gz scala-09deeec60db0e6e6b6904041db43535e492a0c2d.tar.bz2 scala-09deeec60db0e6e6b6904041db43535e492a0c2d.zip |
Fix `all` combinator on futures, refactor execution context, remove disabled files.
Diffstat (limited to 'src/library/scala/concurrent/package.scala')
-rw-r--r-- | src/library/scala/concurrent/package.scala | 80 |
1 files changed, 78 insertions, 2 deletions
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 23f26dd3b5..ee8f484379 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -6,13 +6,14 @@ ** |/ ** \* */ - - package scala +import java.util.concurrent.atomic.{ AtomicInteger } import scala.util.{ Timeout, Duration } +import collection._ +import scala.collection.generic.CanBuildFrom @@ -129,6 +130,81 @@ package concurrent { def this(origin: Future[_]) = this(origin, "Future timed out.") } + trait ExecutionContextBase { + self: ExecutionContext => + + private implicit val executionContext = self + + /** TODO some docs + * + */ + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { + val buffer = new mutable.ArrayBuffer[T] + val counter = new AtomicInteger(1) // how else could we do this? + val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature + var idx = 0 + + def tryFinish() = if (counter.decrementAndGet() == 0) { + val builder = cbf(futures) + builder ++= buffer + p success builder.result + } + + for (f <- futures) { + val currentIndex = idx + buffer += null.asInstanceOf[T] + counter.incrementAndGet() + f onComplete { + case Left(t) => + p tryFailure t + case Right(v) => + buffer(currentIndex) = v + tryFinish() + } + idx += 1 + } + + tryFinish() + + p.future + } + + /** TODO some docs + * + */ + def any[T](futures: Traversable[Future[T]]): Future[T] = { + val p = promise[T] + val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem + + futures foreach (_ onComplete completeFirst) + + p.future + } + + /** TODO some docs + * + */ + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { + if (futures.isEmpty) Promise.successful[Option[T]](None).future + else { + val result = promise[Option[T]] + val count = new AtomicInteger(futures.size) + val search: Either[Throwable, T] => Unit = { + v => v match { + case Right(r) => if (predicate(r)) result trySuccess Some(r) + case _ => + } + if (count.decrementAndGet() == 0) result trySuccess None + } + + futures.foreach(_ onComplete search) + + result.future + } + } + + } + } |