From 47318105010786bc6eba835c957ce3cd4fe88d70 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Wed, 28 Mar 2012 19:17:49 +0200 Subject: Work on source compatibility between akka and scala futures. Removed some methods from execution contexts. Changed Awaitable interface. --- src/library/scala/concurrent/Awaitable.scala | 9 ++- .../scala/concurrent/ConcurrentPackageObject.scala | 14 ++-- .../scala/concurrent/ExecutionContext.scala | 91 +--------------------- src/library/scala/concurrent/Future.scala | 31 +++----- src/library/scala/concurrent/Promise.scala | 16 ++-- .../concurrent/impl/ExecutionContextImpl.scala | 84 +------------------- src/library/scala/concurrent/impl/Future.scala | 74 +++++++++++++++++- src/library/scala/concurrent/impl/Promise.scala | 29 +++---- src/library/scala/concurrent/package.scala | 28 +++---- test/files/jvm/scala-concurrent-tck.scala | 2 +- 10 files changed, 136 insertions(+), 242 deletions(-) diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala index 6c9995eb05..052e6e2366 100644 --- a/src/library/scala/concurrent/Awaitable.scala +++ b/src/library/scala/concurrent/Awaitable.scala @@ -16,8 +16,13 @@ import scala.concurrent.util.Duration trait Awaitable[+T] { - @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.") - def await(atMost: Duration)(implicit canawait: CanAwait): T + def ready(atMost: Duration)(implicit permit: CanAwait): this.type + + /** + * Throws exceptions if cannot produce a T within the specified time + * This method should not be called directly. + */ + def result(atMost: Duration)(implicit permit: CanAwait): T } diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala index 3471095051..ba98757906 100644 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala @@ -59,14 +59,18 @@ abstract class ConcurrentPackageObject { /* concurrency constructs */ def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = - execCtx future body + Future[T](body) def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] = - execCtx promise + Promise[T]() /** Wraps a block of code into an awaitable object. */ def body2awaitable[T](body: =>T) = new Awaitable[T] { - def await(atMost: Duration)(implicit cb: CanAwait) = body + def ready(atMost: Duration)(implicit permit: CanAwait) = { + body + this + } + def result(atMost: Duration)(implicit permit: CanAwait) = body } /** Used to block on a piece of code which potentially blocks. @@ -78,8 +82,8 @@ abstract class ConcurrentPackageObject { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext): T = - executionContext.blocking(atMost)(body) + def blocking[T](body: =>T)(implicit execCtx: ExecutionContext): T = + executionContext.blocking(body) /** Blocks on an awaitable object. * diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index c4a45f9fb5..a206a2d4ea 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -22,19 +22,11 @@ import collection._ trait ExecutionContext { - protected implicit object CanAwaitEvidence extends CanAwait - def execute(runnable: Runnable): Unit def execute[U](body: () => U): Unit - def promise[T]: Promise[T] - - def future[T](body: Callable[T]): Future[T] = future(body.call()) - - def future[T](body: => T): Future[T] - - def blocking[T](atMost: Duration)(body: =>T): T + def blocking[T](body: =>T): T def blocking[T](awaitable: Awaitable[T], atMost: Duration): T @@ -44,89 +36,8 @@ trait ExecutionContext { private implicit val executionContext = this - def keptPromise[T](result: T): Promise[T] = { - val p = promise[T] - p success result - } - - def brokenPromise[T](t: Throwable): Promise[T] = { - val p = promise[T] - p failure t - } - - /** TODO some docs - * - */ - def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { - import nondeterministic._ - val buffer = new mutable.ArrayBuffer[T] - val counter = new AtomicInteger(1) // how else could we do this? - val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature - var idx = 0 - - def tryFinish() = if (counter.decrementAndGet() == 0) { - val builder = cbf(futures) - builder ++= buffer - p success builder.result - } - - for (f <- futures) { - val currentIndex = idx - buffer += null.asInstanceOf[T] - counter.incrementAndGet() - f onComplete { - case Failure(t) => - p tryFailure t - case Success(v) => - buffer(currentIndex) = v - tryFinish() - } - idx += 1 - } - - tryFinish() - - p.future - } - - /** TODO some docs - * - */ - def any[T](futures: Traversable[Future[T]]): Future[T] = { - val p = promise[T] - val completeFirst: Try[T] => Unit = elem => p tryComplete elem - - futures foreach (_ onComplete completeFirst) - - p.future - } - - /** TODO some docs - * - */ - def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { - if (futures.isEmpty) Promise.kept[Option[T]](None).future - else { - val result = promise[Option[T]] - val count = new AtomicInteger(futures.size) - val search: Try[T] => Unit = { - v => v match { - case Success(r) => if (predicate(r)) result trySuccess Some(r) - case _ => - } - if (count.decrementAndGet() == 0) result trySuccess None - } - - futures.foreach(_ onComplete search) - - result.future - } - } - } -sealed trait CanAwait - diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 1dc8e38355..fa4c61c227 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -235,8 +235,8 @@ self => * val f = future { 5 } * val g = f filter { _ % 2 == 1 } * val h = f filter { _ % 2 == 0 } - * await(0) g // evaluates to 5 - * await(0) h // throw a NoSuchElementException + * await(g, 0) // evaluates to 5 + * await(h, 0) // throw a NoSuchElementException * }}} */ def filter(pred: T => Boolean): Future[T] = { @@ -272,8 +272,8 @@ self => * val h = f collect { * case x if x > 0 => x * 2 * } - * await(0) g // evaluates to 5 - * await(0) h // throw a NoSuchElementException + * await(g, 0) // evaluates to 5 + * await(h, 0) // throw a NoSuchElementException * }}} */ def collect[S](pf: PartialFunction[T, S]): Future[S] = { @@ -383,7 +383,7 @@ self => * val f = future { sys.error("failed") } * val g = future { 5 } * val h = f orElse g - * await(0) h // evaluates to 5 + * await(h, 0) // evaluates to 5 * }}} */ def fallbackTo[U >: T](that: Future[U]): Future[U] = { @@ -445,7 +445,7 @@ self => * val f = future { sys.error("failed") } * val g = future { 5 } * val h = f either g - * await(0) h // evaluates to either 5 or throws a runtime exception + * await(h, 0) // evaluates to either 5 or throws a runtime exception * }}} */ def either[U >: T](that: Future[U]): Future[U] = { @@ -466,26 +466,15 @@ self => -/** TODO some docs +/** Future companion object. * * @define nonDeterministic * Note: using this method yields nondeterministic dataflow programs. */ object Future { - - // TODO make more modular by encoding all other helper methods within the execution context - /** TODO some docs - */ - def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] = - ec.all[T, Coll](futures) - - // move this to future companion object - @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) - - def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures) - - def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate) - + + def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) + } diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 4404e90971..61e21606e6 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -30,8 +30,6 @@ import scala.util.{ Try, Success, Failure } */ trait Promise[T] { - import nondeterministic._ - /** Future containing the value of this promise. */ def future: Future[T] @@ -114,12 +112,18 @@ trait Promise[T] { object Promise { - def kept[T](result: T)(implicit execctx: ExecutionContext): Promise[T] = - execctx keptPromise result + /** Creates a new promise. + */ + def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]() - def broken[T](t: Throwable)(implicit execctx: ExecutionContext): Promise[T] = - execctx brokenPromise t + /** Creates an already completed Promise with the specified exception + */ + def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception)) + /** Creates an already completed Promise with the specified result + */ + def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Success(result)) + } diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 8ac745fd25..5dc440f42b 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -15,7 +15,6 @@ import scala.concurrent.forkjoin._ import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} import scala.util.{ Try, Success, Failure } import scala.concurrent.util.{ Duration } -import scala.collection.mutable.Stack @@ -38,32 +37,12 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E def run() = body() }) - def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this) - - def future[T](body: =>T): Future[T] = { - val p = promise[T] - - dispatchFuture { - () => - p complete { - try { - Success(body) - } catch { - case e => resolver(e) - } - } - } - - p.future - } - - def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + def blocking[T](body: =>T): T = blocking(body2awaitable(body), Duration.fromNanos(0)) def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { - currentExecutionContext.get match { - case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(awaitable) // inside an execution context thread - } + Future.releaseStack(this) + + awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) } def reportFailure(t: Throwable) = t match { @@ -71,61 +50,6 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E case t => t.printStackTrace() } - /** Only callable from the tasks running on the same execution context. */ - private def blockingCall[T](body: Awaitable[T]): T = { - releaseStack() - - // TODO see what to do with timeout - body.await(Duration.fromNanos(0))(CanAwaitEvidence) - } - - // an optimization for batching futures - // TODO we should replace this with a public queue, - // so that it can be stolen from - // OR: a push to the local task queue should be so cheap that this is - // not even needed, but stealing is still possible - private val _taskStack = new ThreadLocal[Stack[() => Unit]]() - - private def releaseStack(): Unit = - _taskStack.get match { - case stack if (stack ne null) && stack.nonEmpty => - val tasks = stack.elems - stack.clear() - _taskStack.remove() - dispatchFuture(() => _taskStack.get.elems = tasks, true) - case null => - // do nothing - there is no local batching stack anymore - case _ => - _taskStack.remove() - } - - private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit = - _taskStack.get match { - case stack if (stack ne null) && !force => stack push task - case _ => this.execute( - new Runnable { - def run() { - try { - val taskStack = Stack[() => Unit](task) - _taskStack set taskStack - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try { - next.apply() - } catch { - case e => - // TODO catching all and continue isn't good for OOME - reportFailure(e) - } - } - } finally { - _taskStack.remove() - } - } - } - ) - } - } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index b4385ea34a..6833b2467f 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -8,13 +8,17 @@ package scala.concurrent.impl + + import scala.concurrent.{Awaitable, ExecutionContext} import scala.util.{ Try, Success, Failure } -//import scala.util.continuations._ +import scala.collection.mutable.Stack + + private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { - implicit def executor: ExecutionContextImpl + implicit def executor: ExecutionContext /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. * @@ -40,7 +44,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa * that conforms to A's erased type or a ClassCastException otherwise. */ final def mapTo[T](implicit m: Manifest[T]) = { - val p = executor.promise[T] + val p = new Promise.DefaultPromise[T] onComplete { case f @ Failure(t) => p complete f.asInstanceOf[Try[T]] @@ -48,7 +52,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa p complete (try { Success(Future.boxedType(m.erasure).cast(v).asInstanceOf[T]) } catch { - case e: ClassCastException ⇒ Failure(e) + case e: ClassCastException => Failure(e) }) } @@ -86,4 +90,66 @@ object Future { def boxedType(c: Class[_]): Class[_] = { if (c.isPrimitive) toBoxed(c) else c } + + def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { + val promise = new Promise.DefaultPromise[T]() + executor.execute(new Runnable { + def run = { + promise complete { + try { + Success(body) + } catch { + case e => scala.concurrent.resolver(e) + } + } + } + }) + promise.future + } + + // an optimization for batching futures + // TODO we should replace this with a public queue, + // so that it can be stolen from + // OR: a push to the local task queue should be so cheap that this is + // not even needed, but stealing is still possible + private val _taskStack = new ThreadLocal[Stack[() => Unit]]() + + private[impl] def releaseStack(executor: ExecutionContext): Unit = + _taskStack.get match { + case stack if (stack ne null) && stack.nonEmpty => + val tasks = stack.elems + stack.clear() + _taskStack.remove() + dispatchFuture(executor, () => _taskStack.get.elems = tasks, true) + case null => + // do nothing - there is no local batching stack anymore + case _ => + _taskStack.remove() + } + + private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit = + _taskStack.get match { + case stack if (stack ne null) && !force => stack push task + case _ => executor.execute(new Runnable { + def run() { + try { + val taskStack = Stack[() => Unit](task) + _taskStack set taskStack + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case e => + // TODO catching all and continue isn't good for OOME + executor.reportFailure(e) + } + } + } finally { + _taskStack.remove() + } + } + }) + } + } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 4a983b5001..c79b0d02cc 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -26,7 +26,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu def future = this - def newPromise[S]: Promise[S] = executor promise + def newPromise[S]: scala.concurrent.Promise[S] = new Promise.DefaultPromise() // TODO refine answer and return types here from Any to type parameters // then move this up in the hierarchy @@ -75,6 +75,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu object Promise { + def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] @@ -101,7 +102,7 @@ object Promise { /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] { + class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => updater.set(this, Promise.EmptyPending()) @@ -126,14 +127,14 @@ object Promise { value.isDefined } - executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0)) + executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) } - private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = if (value.isDefined || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") - def await(atMost: Duration)(implicit permit: CanAwait): T = + def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { case util.Failure(e) => throw e case util.Success(r) => r @@ -176,9 +177,9 @@ object Promise { case null => false case cs if cs.isEmpty => true case cs => - executor dispatchFuture { + Future.dispatchFuture(executor, { () => cs.foreach(f => notifyCompleted(f, value)) - } + }) true } } @@ -197,9 +198,9 @@ object Promise { if (tryAddCallback()) { val result = value.get - executor dispatchFuture { + Future.dispatchFuture(executor, { () => notifyCompleted(func, result) - } + }) } this @@ -218,22 +219,22 @@ object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] { + final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContext) extends Promise[T] { val value = Some(resolve(suppliedValue)) def tryComplete(value: Try[T]): Boolean = false def onComplete[U](func: Try[T] => U): this.type = { val completedAs = value.get - executor dispatchFuture { + Future.dispatchFuture(executor, { () => func(completedAs) - } + }) this } - private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this - def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match { + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { case util.Failure(e) => throw e case util.Success(r) => r } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 204b3f2673..e2ae17498f 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -20,27 +20,17 @@ package object concurrent extends scala.concurrent.ConcurrentPackageObject { } package concurrent { - object await { - def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = { - try blocking(awaitable, atMost) - catch { case _ => } - awaitable - } - - def result[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = { - blocking(awaitable, atMost) - } + + sealed trait CanAwait + + object Await { + private[concurrent] implicit val canAwaitEvidence = new CanAwait {} + + def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = awaitable.ready(atMost) + + def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) } - /** Importing this object allows using some concurrency primitives - * on futures and promises that can yield nondeterministic programs. - * - * While program determinism is broken when using these primitives, - * some programs cannot be written without them (e.g. multiple client threads - * cannot send requests to a server thread through regular promises and futures). - */ - object nondeterministic { } - /** A timeout exception. * * Futures are failed with a timeout exception when their timeout expires. diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 70221c0de1..75e2b92ff6 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -74,7 +74,7 @@ trait FutureCallbacks extends TestBase { done() throw new Exception } - f onSuccess { + f onSuccess { case _ => assert(false) } } -- cgit v1.2.3 From 802162b5b1260bde50aafdc5ae9534c54472b109 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Thu, 29 Mar 2012 10:25:28 +0200 Subject: Add methods in the Future companion object. --- src/library/scala/concurrent/Future.scala | 109 +++++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 2 deletions(-) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index fa4c61c227..d73801aa90 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -134,8 +134,26 @@ self => /** Creates a new promise. */ def newPromise[S]: Promise[S] - - + + /** Returns whether the future has already been completed with + * a value or an exception. + * + * $nonDeterministic + * + * @return `true` if the future is already completed, `false` otherwise + */ + def isCompleted: Boolean + + /** The value of this `Future`. + * + * If the future is not completed the returned value will be `None`. + * If the future is completed the value will be `Some(Success(t))` + * if it contains a valid result, or `Some(Failure(error))` if it contains + * an exception. + */ + def value: Option[Try[T]] + + /* Projections */ /** Returns a failed projection of this future. @@ -474,6 +492,93 @@ self => object Future { def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) + + import scala.collection.mutable.Builder + import scala.collection.generic.CanBuildFrom + + /** Simple version of `Futures.traverse`. Transforms a `Traversable[Future[A]]` into a `Future[Traversable[A]]`. + * Useful for reducing many `Future`s into a single `Future`. + */ + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { + in.foldLeft(Promise.successful(cbf(in)).future) { + (fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a) + } map (_.result) + } + + /** Returns a `Future` to the result of the first future in the list that is completed. + */ + def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { + val p = Promise[T]() + + val completeFirst: Try[T] => Unit = p tryComplete _ + futures.foreach(_ onComplete completeFirst) + + p.future + } + + /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate. + */ + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { + if (futures.isEmpty) Promise.successful[Option[T]](None).future + else { + val result = Promise[Option[T]]() + val ref = new AtomicInteger(futures.size) + val search: Try[T] => Unit = v => try { + v match { + case Success(r) => if (predicate(r)) result tryComplete Success(Some(r)) + case _ => + } + } finally { + if (ref.decrementAndGet == 0) + result tryComplete Success(None) + } + + futures.foreach(_ onComplete search) + + result.future + } + } + + /** A non-blocking fold over the specified futures, with the start value of the given zero. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * + * Example: + * {{{ + * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) + * }}} + */ + def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + if (futures.isEmpty) Promise.successful(zero).future + else sequence(futures).map(_.foldLeft(zero)(foldFun)) + } + + /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first. + * + * Example: + * {{{ + * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds) + * }}} + */ + def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future + else sequence(futures).map(_ reduceLeft op) + } + + /** Transforms a `Traversable[A]` into a `Future[Traversable[B]]` using the provided function `A => Future[B]`. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel: + * + * {{{ + * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x))) + * }}} + */ + def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = + in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) => + val fb = fn(a.asInstanceOf[A]) + for (r <- fr; b <- fb) yield (r += b) + }.map(_.result) } -- cgit v1.2.3 From 794104f74d7a91bd36a6bdf8e05d1f96dac8240a Mon Sep 17 00:00:00 2001 From: Heather Miller Date: Thu, 29 Mar 2012 18:22:18 +0200 Subject: Adds links to parallel collections overview in API docs Now that there's an extensive parallel collections overview on http://docs.scala-lang.org, API comments should link to it ("see also" field). This commit also fixes a couple of broken links on some sequential collection types. --- src/library/scala/collection/immutable/List.scala | 2 +- src/library/scala/collection/mutable/Stack.scala | 2 +- src/library/scala/collection/parallel/immutable/ParHashMap.scala | 4 +++- src/library/scala/collection/parallel/immutable/ParHashSet.scala | 2 ++ src/library/scala/collection/parallel/immutable/ParRange.scala | 2 ++ src/library/scala/collection/parallel/immutable/ParVector.scala | 2 ++ src/library/scala/collection/parallel/mutable/ParArray.scala | 6 +++++- src/library/scala/collection/parallel/mutable/ParHashMap.scala | 2 ++ src/library/scala/collection/parallel/mutable/ParHashSet.scala | 2 ++ src/library/scala/collection/parallel/mutable/ParTrieMap.scala | 2 ++ 10 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/library/scala/collection/immutable/List.scala b/src/library/scala/collection/immutable/List.scala index 870c179b2d..1b75c10113 100644 --- a/src/library/scala/collection/immutable/List.scala +++ b/src/library/scala/collection/immutable/List.scala @@ -58,7 +58,7 @@ import java.io._ * @author Martin Odersky and others * @version 2.8 * @since 1.0 - * @see [["http://docs.scala-lang.org/overviews/collections/concrete-immutable-collection-classes.html#lists" "Scala's Collection Library overview"]] + * @see [[http://docs.scala-lang.org/overviews/collections/concrete-immutable-collection-classes.html#lists "Scala's Collection Library overview"]] * section on `Lists` for more information. * * @define coll list diff --git a/src/library/scala/collection/mutable/Stack.scala b/src/library/scala/collection/mutable/Stack.scala index 8fad131009..b70df05c55 100644 --- a/src/library/scala/collection/mutable/Stack.scala +++ b/src/library/scala/collection/mutable/Stack.scala @@ -44,7 +44,7 @@ object Stack extends SeqFactory[Stack] { * @author Martin Odersky * @version 2.8 * @since 1 - * @see [[http://docs.scala-lang.org/overviews/collections/concrete-mutable-collection-classes.html#stacks"Scala's Collection Library overview"]] + * @see [[http://docs.scala-lang.org/overviews/collections/concrete-mutable-collection-classes.html#stacks "Scala's Collection Library overview"]] * section on `Stacks` for more information. * @define Coll Stack * @define coll stack diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 49b00bebdb..e630a9dbed 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -36,7 +36,9 @@ import collection.parallel.Task * * @author Aleksandar Prokopec * @since 2.9 - * + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tries Scala's Parallel Collections Library overview]] + * section on Parallel Hash Tries for more information. + * * @define Coll immutable.ParHashMap * @define coll immutable parallel hash map */ diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index 11d92a27c9..084637c5dc 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -35,6 +35,8 @@ import collection.parallel.Task * * @author Aleksandar Prokopec * @since 2.9 + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tries Scala's Parallel Collections Library overview]] + * section on Parallel Hash Tries for more information. * * @define Coll immutable.ParHashSet * @define coll immutable parallel hash set diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala index 9cac433460..277fd5fdd3 100644 --- a/src/library/scala/collection/parallel/immutable/ParRange.scala +++ b/src/library/scala/collection/parallel/immutable/ParRange.scala @@ -25,6 +25,8 @@ import scala.collection.Iterator * * @author Aleksandar Prokopec * @since 2.9 + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_range Scala's Parallel Collections Library overview]] + * section on `ParRange` for more information. * * @define Coll immutable.ParRange * @define coll immutable parallel range diff --git a/src/library/scala/collection/parallel/immutable/ParVector.scala b/src/library/scala/collection/parallel/immutable/ParVector.scala index 5d9c431bc1..8baa84b77c 100644 --- a/src/library/scala/collection/parallel/immutable/ParVector.scala +++ b/src/library/scala/collection/parallel/immutable/ParVector.scala @@ -34,6 +34,8 @@ import immutable.VectorIterator * * @author Aleksandar Prokopec * @since 2.9 + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_vector Scala's Parallel Collections Library overview]] + * section on `ParVector` for more information. * * @define Coll immutable.ParVector * @define coll immutable parallel vector diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index c33495bd39..8cc0b95997 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -44,10 +44,14 @@ import scala.collection.GenTraversableOnce * * @tparam T type of the elements in the array * + * @author Aleksandar Prokopec + * @since 2.9 + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_array Scala's Parallel Collections Library overview]] + * section on `ParArray` for more information. + * * @define Coll ParArray * @define coll parallel array * - * @author Aleksandar Prokopec */ @SerialVersionUID(1L) class ParArray[T] private[mutable] (val arrayseq: ArraySeq[T]) diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index 6ce6c45460..23b23d55a1 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -32,6 +32,8 @@ import collection.parallel.Task * @define coll parallel hash map * * @author Aleksandar Prokopec + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tables Scala's Parallel Collections Library overview]] + * section on Parallel Hash Tables for more information. */ @SerialVersionUID(1L) class ParHashMap[K, V] private[collection] (contents: HashTable.Contents[K, DefaultEntry[K, V]]) diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala index e0a2ab03df..4e9a38c13f 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala @@ -29,6 +29,8 @@ import collection.parallel.Task * @define coll parallel hash set * * @author Aleksandar Prokopec + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_hash_tables Scala's Parallel Collections Library overview]] + * section on Parallel Hash Tables for more information. */ @SerialVersionUID(1L) class ParHashSet[T] private[collection] (contents: FlatHashTable.Contents[T]) diff --git a/src/library/scala/collection/parallel/mutable/ParTrieMap.scala b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala index fa19990b91..359c35f1dd 100644 --- a/src/library/scala/collection/parallel/mutable/ParTrieMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala @@ -33,6 +33,8 @@ import scala.collection.concurrent.TrieMapIterator * * @author Aleksandar Prokopec * @since 2.10 + * @see [[http://docs.scala-lang.org/overviews/parallel-collections/concrete-parallel-collections.html#parallel_concurrent_tries Scala's Parallel Collections Library overview]] + * section on `ParTrieMap` for more information. */ final class ParTrieMap[K, V] private[collection] (private val ctrie: TrieMap[K, V]) extends ParMap[K, V] -- cgit v1.2.3