diff options
author | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-19 19:55:41 +0100 |
---|---|---|
committer | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-19 19:55:41 +0100 |
commit | 778e7d1a1b87431449cbe7335ca3a66fbe7c8366 (patch) | |
tree | f78a8d28830f7ffc775f18c443efc213200dc2ea /src/library/scala/concurrent/ExecutionContext.scala | |
parent | ebc3636a1b6dd64425630afa83eb398a8d7c43a4 (diff) | |
download | scala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.tar.gz scala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.tar.bz2 scala-778e7d1a1b87431449cbe7335ca3a66fbe7c8366.zip |
Add implicit conversion for futures that enables calling nondeterministic methods.
Diffstat (limited to 'src/library/scala/concurrent/ExecutionContext.scala')
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 77 |
1 files changed, 76 insertions, 1 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index ebd5bf6bd3..0657121de2 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -10,13 +10,16 @@ package scala.concurrent +import java.util.concurrent.atomic.{ AtomicInteger } import java.util.concurrent.{ Executors, Future => JFuture, Callable } import scala.util.Duration import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } +import scala.collection.generic.CanBuildFrom +import collection._ -trait ExecutionContext extends ExecutionContextBase { +trait ExecutionContext { protected implicit object CanAwaitEvidence extends CanAwait @@ -34,6 +37,78 @@ trait ExecutionContext extends ExecutionContextBase { def blocking[T](awaitable: Awaitable[T], atMost: Duration): T + /* implementations follow */ + + private implicit val executionContext = this + + /** TODO some docs + * + */ + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { + val buffer = new mutable.ArrayBuffer[T] + val counter = new AtomicInteger(1) // how else could we do this? + val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature + var idx = 0 + + def tryFinish() = if (counter.decrementAndGet() == 0) { + val builder = cbf(futures) + builder ++= buffer + p success builder.result + } + + for (f <- futures) { + val currentIndex = idx + buffer += null.asInstanceOf[T] + counter.incrementAndGet() + f onComplete { + case Left(t) => + p tryFailure t + case Right(v) => + buffer(currentIndex) = v + tryFinish() + } + idx += 1 + } + + tryFinish() + + p.future + } + + /** TODO some docs + * + */ + def any[T](futures: Traversable[Future[T]]): Future[T] = { + val p = promise[T] + val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem + + futures foreach (_ onComplete completeFirst) + + p.future + } + + /** TODO some docs + * + */ + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { + if (futures.isEmpty) Promise.successful[Option[T]](None).future + else { + val result = promise[Option[T]] + val count = new AtomicInteger(futures.size) + val search: Either[Throwable, T] => Unit = { + v => v match { + case Right(r) => if (predicate(r)) result trySuccess Some(r) + case _ => + } + if (count.decrementAndGet() == 0) result trySuccess None + } + + futures.foreach(_ onComplete search) + + result.future + } + } + } |