From 5d2acb2b3d6b2880ba36f039bbf98c583ce85a21 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Thu, 12 Jan 2012 19:55:50 +0100 Subject: Port of akka Future implementation in progress. --- src/library/scala/concurrent/Future.scala | 75 +++++++++-- src/library/scala/concurrent/Promise.scala | 27 +++- src/library/scala/concurrent/akka/Future.scala | 177 ++++++++++++++++++++++++- src/library/scala/concurrent/package.scala | 6 + 4 files changed, 269 insertions(+), 16 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 748d08be9f..d074dbfaaa 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -61,6 +61,9 @@ import scala.collection.generic.CanBuildFrom * {{{ * f flatMap { (x: Int) => g map { (y: Int) => x + y } } * }}} + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. */ trait Future[+T] extends Awaitable[T] { self => @@ -113,11 +116,11 @@ self => /** The execution context of the future. */ - def executionContext: ExecutionContext + def executor: ExecutionContext /** Creates a new promise. */ - def newPromise[S]: Promise[S] = executionContext promise + def newPromise[S]: Promise[S] = executor promise /* Projections */ @@ -135,7 +138,7 @@ self => * and throws a corresponding exception if the original future fails. */ def failed: Future[Throwable] = new Future[Throwable] { - def executionContext = self.executionContext + def executor = self.executor def onComplete[U](func: Either[Throwable, Throwable] => U) = { self.onComplete { case Left(t) => func(Right(t)) @@ -242,8 +245,8 @@ self => * val f = future { 5 } * val g = f filter { _ % 2 == 1 } * val h = f filter { _ % 2 == 0 } - * block on g // evaluates to 5 - * block on h // throw a NoSuchElementException + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException * }}} */ def filter(pred: T => Boolean): Future[T] = { @@ -258,7 +261,6 @@ self => } /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. - * * * If the current future contains a value for which the partial function is defined, the new future will also hold that value. * Otherwise, the resulting future will fail with a `NoSuchElementException`. @@ -274,8 +276,8 @@ self => * val h = f collect { * case x if x > 0 => x * 2 * } - * block on g // evaluates to 5 - * block on h // throw a NoSuchElementException + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException * }}} */ def collect[S](pf: PartialFunction[T, S]): Future[S] = { @@ -289,14 +291,68 @@ self => p.future } + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, + * the result of the `that` future if `that` is completed successfully. + * If both futures are failed, the resulting future holds the throwable object of the first future. + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f orElse g + * await(0) h // evaluates to 5 + * }}} + */ + def orElse[U >: T](that: Future[U]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Left(t) => that onComplete { + case Left(_) => p failure t + case Right(v) => p success v + } + case Right(v) => p success v + } + + p.future + } + + /** Creates a new future which holds the result of either this future or `that` future, depending on + * which future was completed first. + * + * $nonDeterministic + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f orElse g + * await(0) h // evaluates to either 5 or throws a runtime exception + * }}} + */ + def or[U >: T](that: Future[U]): Future[U] = { + val p = newPromise[U] + + val completePromise: PartialFunction[Either[Throwable, T], _] = { + case Left(t) => p tryFailure t + case Right(v) => p trySuccess v + } + this onComplete completePromise + this onComplete completePromise + + p.future + } + } object Future { + /* + // TODO make more modular by encoding this within the execution context def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { val builder = cbf(futures) - val p: Promise[Coll[T]] = executionContext.promise[Coll[T]] + val p: Promise[Coll[T]] = executor.promise[Coll[T]] if (futures.size == 1) futures.head onComplete { case Left(t) => p failure t @@ -317,6 +373,7 @@ object Future { p.future } + */ @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index aae0135af4..f6ea252f73 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -25,6 +25,9 @@ import scala.util.Timeout * If the throwable used to fail this promise is an error, a control exception * or an interrupted exception, it will be wrapped as a cause within an * `ExecutionException` which will fail the promise. + * + * @define nonDeterministic + * Note: Using this method may result in non-deterministic concurrent programs. */ trait Promise[T] { @@ -38,7 +41,15 @@ trait Promise[T] { * * $promiseCompletion */ - def success(value: T): Unit + def success(v: T): this.type = if (trySuccess(v)) this else throw new IllegalStateException("Promise already completed.") + + /** Tries to complete the promise with a value. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def trySuccess(value: T): Boolean /** Completes the promise with an exception. * @@ -48,8 +59,16 @@ trait Promise[T] { * * $promiseCompletion */ - def failure(t: Throwable): Unit - + def failure(t: Throwable): this.type = if (tryFailure(t)) this else throw new IllegalStateException("Promise already completed.") + + /** Tries to complete the promise with an exception. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def tryFailure(t: Throwable): Boolean + /** Wraps a `Throwable` in an `ExecutionException` if necessary. * * $allowedThrowables @@ -58,7 +77,7 @@ trait Promise[T] { case t: Throwable if isFutureThrowable(t) => t case _ => new ExecutionException(t) } - + } diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala index e359456736..2b41c0c62e 100644 --- a/src/library/scala/concurrent/akka/Future.scala +++ b/src/library/scala/concurrent/akka/Future.scala @@ -1,4 +1,4 @@ -/* __ *\ +/*/* __ *\ ** ________ ___ / / ___ Scala API ** ** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** ** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** @@ -6,11 +6,182 @@ ** |/ ** \* */ -package scala.concurrent -package akka +package scala.concurrent.akka +sealed trait Future[+T] extends scala.concurrent.Future with Awaitable[T] { + + implicit def executor: ExecutionContext + + /** + * For use only within a Future.flow block or another compatible Delimited Continuations reset block. + * + * Returns the result of this Future without blocking, by suspending execution and storing it as a + * continuation until the result is available. + */ + def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any])) + + /** + * Tests whether this Future has been completed. + */ + final def isCompleted: Boolean = value.isDefined + + /** + * The contained value of this Future. Before this Future is completed + * the value will be None. After completion the value will be Some(Right(t)) + * if it contains a valid result, or Some(Left(error)) if it contains + * an exception. + */ + def value: Option[Either[Throwable, T]] + + def onComplete(func: Either[Throwable, T] => Unit): this.type + + /** + * Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this. + * This is semantically the same as: Future.firstCompletedOf(Seq(this, that)) + */ + //FIXME implement as the result of any of the Futures, or if both failed, the first failure + def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize + + final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { + val future = Promise[A]() + onComplete { + case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case otherwise ⇒ future complete otherwise + } + future + } + + /** + * Creates a new Future by applying a function to the successful result of + * this Future. If this Future is completed with an exception then the new + * Future will also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor ? "Hello" // returns 5
+   *   b: String <- actor ? a       // returns "10"
+   *   c: String <- actor ? 7       // returns "14"
+   * } yield b + "-" + c
+   * 
+ */ + final def map[A](f: T ⇒ A): Future[A] = { + val future = Promise[A]() + onComplete { + case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] + case Right(res) ⇒ + future complete (try { + Right(f(res)) + } catch { + case e ⇒ + logError("Future.map", e) + Left(e) + }) + } + future + } + + /** + * Creates a new Future[A] which is completed with this Future's result if + * that conforms to A's erased type or a ClassCastException otherwise. + */ + final def mapTo[A](implicit m: Manifest[A]): Future[A] = { + val fa = Promise[A]() + onComplete { + case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]] + case Right(t) ⇒ + fa complete (try { + Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + } catch { + case e: ClassCastException ⇒ Left(e) + }) + } + fa + } + + /** + * Creates a new Future by applying a function to the successful result of + * this Future, and returns the result of the function as the new Future. + * If this Future is completed with an exception then the new Future will + * also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor ? "Hello" // returns 5
+   *   b: String <- actor ? a       // returns "10"
+   *   c: String <- actor ? 7       // returns "14"
+   * } yield b + "-" + c
+   * 
+ */ + final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { + val p = Promise[A]() + + onComplete { + case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]] + case Right(r) ⇒ + try { + p completeWith f(r) + } catch { + case e ⇒ + p complete Left(e) + logError("Future.flatMap", e) + } + } + p + } + + /** + * Same as onSuccess { case r => f(r) } but is also used in for-comprehensions + */ + final def foreach(f: T ⇒ Unit): Unit = onComplete { + case Right(r) ⇒ f(r) + case _ ⇒ + } + + /** + * Used by for-comprehensions + */ + final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) + + final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { + def foreach(f: A ⇒ Unit): Unit = self filter p foreach f + def map[B](f: A ⇒ B): Future[B] = self filter p map f + def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f + def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) + } + + /** + * Returns a new Future that will hold the successful result of this Future if it matches + * the given predicate, if it doesn't match, the resulting Future will be a failed Future + * with a MatchError, of if this Future fails, that failure will be propagated to the returned Future + */ + final def filter(pred: T ⇒ Boolean): Future[T] = { + val p = Promise[T]() + onComplete { + case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]] + case r @ Right(res) ⇒ p complete (try { + if (pred(res)) r else Left(new MatchError(res)) + } catch { + case e ⇒ + logError("Future.filter", e) + Left(e) + }) + } + p + } + + protected def logError(msg: String, problem: Throwable): Unit = { + executor match { + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) + case other ⇒ problem.printStackTrace() + } + } +} + + + +*/ diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 666e12456d..c35ece5668 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -50,6 +50,12 @@ package object concurrent { case _ => true } + private[concurrent] def resolveThrowable[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { + case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T]) + case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t)) + case _ => source + } + /* concurrency constructs */ def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = -- cgit v1.2.3