From 778e7d1a1b87431449cbe7335ca3a66fbe7c8366 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Thu, 19 Jan 2012 19:55:41 +0100 Subject: Add implicit conversion for futures that enables calling nondeterministic methods. --- .../scala/concurrent/ExecutionContext.scala | 77 +++++++++++++++- src/library/scala/concurrent/Future.scala | 32 +------ src/library/scala/concurrent/Promise.scala | 4 +- src/library/scala/concurrent/package.scala | 100 +++++++-------------- 4 files changed, 113 insertions(+), 100 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index ebd5bf6bd3..0657121de2 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -10,13 +10,16 @@ package scala.concurrent +import java.util.concurrent.atomic.{ AtomicInteger } import java.util.concurrent.{ Executors, Future => JFuture, Callable } import scala.util.Duration import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } +import scala.collection.generic.CanBuildFrom +import collection._ -trait ExecutionContext extends ExecutionContextBase { +trait ExecutionContext { protected implicit object CanAwaitEvidence extends CanAwait @@ -34,6 +37,78 @@ trait ExecutionContext extends ExecutionContextBase { def blocking[T](awaitable: Awaitable[T], atMost: Duration): T + /* implementations follow */ + + private implicit val executionContext = this + + /** TODO some docs + * + */ + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { + val buffer = new mutable.ArrayBuffer[T] + val counter = new AtomicInteger(1) // how else could we do this? + val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature + var idx = 0 + + def tryFinish() = if (counter.decrementAndGet() == 0) { + val builder = cbf(futures) + builder ++= buffer + p success builder.result + } + + for (f <- futures) { + val currentIndex = idx + buffer += null.asInstanceOf[T] + counter.incrementAndGet() + f onComplete { + case Left(t) => + p tryFailure t + case Right(v) => + buffer(currentIndex) = v + tryFinish() + } + idx += 1 + } + + tryFinish() + + p.future + } + + /** TODO some docs + * + */ + def any[T](futures: Traversable[Future[T]]): Future[T] = { + val p = promise[T] + val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem + + futures foreach (_ onComplete completeFirst) + + p.future + } + + /** TODO some docs + * + */ + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { + if (futures.isEmpty) Promise.successful[Option[T]](None).future + else { + val result = promise[Option[T]] + val count = new AtomicInteger(futures.size) + val search: Either[Throwable, T] => Unit = { + v => v match { + case Right(r) => if (predicate(r)) result trySuccess Some(r) + case _ => + } + if (count.decrementAndGet() == 0) result trySuccess None + } + + futures.foreach(_ onComplete search) + + result.future + } + } + } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index e68e6077bb..92e50b1f89 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -77,9 +77,6 @@ import scala.collection.generic.CanBuildFrom * {{{ * f flatMap { (x: Int) => g map { (y: Int) => x + y } } * }}} - * - * @define nonDeterministic - * Note: using this method yields nondeterministic dataflow programs. */ trait Future[+T] extends Awaitable[T] { self => @@ -336,6 +333,8 @@ self => * * Using this method will not cause concurrent programs to become nondeterministic. * + * + * * Example: * {{{ * val f = future { sys.error("failed") } @@ -358,33 +357,6 @@ self => p.future } - /** Creates a new future which holds the result of either this future or `that` future, depending on - * which future was completed first. - * - * $nonDeterministic - * - * Example: - * {{{ - * val f = future { sys.error("failed") } - * val g = future { 5 } - * val h = f any g - * await(0) h // evaluates to either 5 or throws a runtime exception - * }}} - */ - def either[U >: T](that: Future[U]): Future[U] = { - val p = newPromise[U] - - val completePromise: PartialFunction[Either[Throwable, U], _] = { - case Left(t) => p tryFailure t - case Right(v) => p trySuccess v - } - - this onComplete completePromise - that onComplete completePromise - - p.future - } - } diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 43abe566de..e9b83cba73 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -34,8 +34,6 @@ trait Promise[T] { */ def future: Future[T] - private def throwCompleted = throw new IllegalStateException("Promise already completed.") - /** Completes the promise with either an exception or a value. * * @param result Either the value or the exception to complete the promise with. @@ -106,6 +104,8 @@ trait Promise[T] { case _ => new ExecutionException(t) } + private def throwCompleted = throw new IllegalStateException("Promise already completed.") + } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index ee8f484379..73da8469e6 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -10,10 +10,7 @@ package scala -import java.util.concurrent.atomic.{ AtomicInteger } -import scala.util.{ Timeout, Duration } -import collection._ -import scala.collection.generic.CanBuildFrom +import scala.util.Duration @@ -114,6 +111,19 @@ package object concurrent { } } + /** Importing this object allows using some concurrency primitives + * on futures and promises that can yield nondeterministic programs. + * + * While program determinism is broken when using these primitives, + * some programs cannot be written without them (e.g. multiple client threads + * cannot send requests to a server thread through regular promises and futures). + */ + object nondeterministic { + + implicit def future2nondeterministic[T](f: Future[T]) = new NondeterministicFuture[T](f) + + } + } @@ -130,79 +140,35 @@ package concurrent { def this(origin: Future[_]) = this(origin, "Future timed out.") } - trait ExecutionContextBase { - self: ExecutionContext => + private[concurrent] class NondeterministicFuture[+T](self: Future[T]) { - private implicit val executionContext = self - - /** TODO some docs + /** Creates a new future which holds the result of either this future or `that` future, depending on + * which future was completed first. + * + * $nonDeterministic * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f either g + * await(0) h // evaluates to either 5 or throws a runtime exception + * }}} */ - def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { - val buffer = new mutable.ArrayBuffer[T] - val counter = new AtomicInteger(1) // how else could we do this? - val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature - var idx = 0 - - def tryFinish() = if (counter.decrementAndGet() == 0) { - val builder = cbf(futures) - builder ++= buffer - p success builder.result - } + def either[U >: T](that: Future[U]): Future[U] = { + val p = self.newPromise[U] - for (f <- futures) { - val currentIndex = idx - buffer += null.asInstanceOf[T] - counter.incrementAndGet() - f onComplete { - case Left(t) => - p tryFailure t - case Right(v) => - buffer(currentIndex) = v - tryFinish() - } - idx += 1 + val completePromise: PartialFunction[Either[Throwable, U], _] = { + case Left(t) => p tryFailure t + case Right(v) => p trySuccess v } - tryFinish() + self onComplete completePromise + that onComplete completePromise p.future } - /** TODO some docs - * - */ - def any[T](futures: Traversable[Future[T]]): Future[T] = { - val p = promise[T] - val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem - - futures foreach (_ onComplete completeFirst) - - p.future - } - - /** TODO some docs - * - */ - def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { - if (futures.isEmpty) Promise.successful[Option[T]](None).future - else { - val result = promise[Option[T]] - val count = new AtomicInteger(futures.size) - val search: Either[Throwable, T] => Unit = { - v => v match { - case Right(r) => if (predicate(r)) result trySuccess Some(r) - case _ => - } - if (count.decrementAndGet() == 0) result trySuccess None - } - - futures.foreach(_ onComplete search) - - result.future - } - } - } } -- cgit v1.2.3