summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-12 19:55:50 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-12 19:55:50 +0100
commit5d2acb2b3d6b2880ba36f039bbf98c583ce85a21 (patch)
treef72cfe923f7244f7d8636f7669f0d549bea312f9
parentd595324efe2be1c552bad8201aaef9ce383e5c95 (diff)
downloadscala-5d2acb2b3d6b2880ba36f039bbf98c583ce85a21.tar.gz
scala-5d2acb2b3d6b2880ba36f039bbf98c583ce85a21.tar.bz2
scala-5d2acb2b3d6b2880ba36f039bbf98c583ce85a21.zip
Port of akka Future implementation in progress.
-rw-r--r--src/library/scala/concurrent/Future.scala75
-rw-r--r--src/library/scala/concurrent/Promise.scala27
-rw-r--r--src/library/scala/concurrent/akka/Future.scala177
-rw-r--r--src/library/scala/concurrent/package.scala6
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala15
5 files changed, 283 insertions, 17 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 748d08be9f..d074dbfaaa 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -61,6 +61,9 @@ import scala.collection.generic.CanBuildFrom
* {{{
* f flatMap { (x: Int) => g map { (y: Int) => x + y } }
* }}}
+ *
+ * @define nonDeterministic
+ * Note: using this method yields nondeterministic dataflow programs.
*/
trait Future[+T] extends Awaitable[T] {
self =>
@@ -113,11 +116,11 @@ self =>
/** The execution context of the future.
*/
- def executionContext: ExecutionContext
+ def executor: ExecutionContext
/** Creates a new promise.
*/
- def newPromise[S]: Promise[S] = executionContext promise
+ def newPromise[S]: Promise[S] = executor promise
/* Projections */
@@ -135,7 +138,7 @@ self =>
* and throws a corresponding exception if the original future fails.
*/
def failed: Future[Throwable] = new Future[Throwable] {
- def executionContext = self.executionContext
+ def executor = self.executor
def onComplete[U](func: Either[Throwable, Throwable] => U) = {
self.onComplete {
case Left(t) => func(Right(t))
@@ -242,8 +245,8 @@ self =>
* val f = future { 5 }
* val g = f filter { _ % 2 == 1 }
* val h = f filter { _ % 2 == 0 }
- * block on g // evaluates to 5
- * block on h // throw a NoSuchElementException
+ * await(0) g // evaluates to 5
+ * await(0) h // throw a NoSuchElementException
* }}}
*/
def filter(pred: T => Boolean): Future[T] = {
@@ -259,7 +262,6 @@ self =>
/** Creates a new future by mapping the value of the current future if the given partial function is defined at that value.
*
- *
* If the current future contains a value for which the partial function is defined, the new future will also hold that value.
* Otherwise, the resulting future will fail with a `NoSuchElementException`.
*
@@ -274,8 +276,8 @@ self =>
* val h = f collect {
* case x if x > 0 => x * 2
* }
- * block on g // evaluates to 5
- * block on h // throw a NoSuchElementException
+ * await(0) g // evaluates to 5
+ * await(0) h // throw a NoSuchElementException
* }}}
*/
def collect[S](pf: PartialFunction[T, S]): Future[S] = {
@@ -289,14 +291,68 @@ self =>
p.future
}
+ /** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
+ * the result of the `that` future if `that` is completed successfully.
+ * If both futures are failed, the resulting future holds the throwable object of the first future.
+ *
+ * Example:
+ * {{{
+ * val f = future { sys.error("failed") }
+ * val g = future { 5 }
+ * val h = f orElse g
+ * await(0) h // evaluates to 5
+ * }}}
+ */
+ def orElse[U >: T](that: Future[U]): Future[U] = {
+ val p = newPromise[U]
+
+ onComplete {
+ case Left(t) => that onComplete {
+ case Left(_) => p failure t
+ case Right(v) => p success v
+ }
+ case Right(v) => p success v
+ }
+
+ p.future
+ }
+
+ /** Creates a new future which holds the result of either this future or `that` future, depending on
+ * which future was completed first.
+ *
+ * $nonDeterministic
+ *
+ * Example:
+ * {{{
+ * val f = future { sys.error("failed") }
+ * val g = future { 5 }
+ * val h = f orElse g
+ * await(0) h // evaluates to either 5 or throws a runtime exception
+ * }}}
+ */
+ def or[U >: T](that: Future[U]): Future[U] = {
+ val p = newPromise[U]
+
+ val completePromise: PartialFunction[Either[Throwable, T], _] = {
+ case Left(t) => p tryFailure t
+ case Right(v) => p trySuccess v
+ }
+ this onComplete completePromise
+ this onComplete completePromise
+
+ p.future
+ }
+
}
object Future {
+ /*
+ // TODO make more modular by encoding this within the execution context
def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
val builder = cbf(futures)
- val p: Promise[Coll[T]] = executionContext.promise[Coll[T]]
+ val p: Promise[Coll[T]] = executor.promise[Coll[T]]
if (futures.size == 1) futures.head onComplete {
case Left(t) => p failure t
@@ -317,6 +373,7 @@ object Future {
p.future
}
+ */
@inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body)
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index aae0135af4..f6ea252f73 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -25,6 +25,9 @@ import scala.util.Timeout
* If the throwable used to fail this promise is an error, a control exception
* or an interrupted exception, it will be wrapped as a cause within an
* `ExecutionException` which will fail the promise.
+ *
+ * @define nonDeterministic
+ * Note: Using this method may result in non-deterministic concurrent programs.
*/
trait Promise[T] {
@@ -38,7 +41,15 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def success(value: T): Unit
+ def success(v: T): this.type = if (trySuccess(v)) this else throw new IllegalStateException("Promise already completed.")
+
+ /** Tries to complete the promise with a value.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def trySuccess(value: T): Boolean
/** Completes the promise with an exception.
*
@@ -48,8 +59,16 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def failure(t: Throwable): Unit
-
+ def failure(t: Throwable): this.type = if (tryFailure(t)) this else throw new IllegalStateException("Promise already completed.")
+
+ /** Tries to complete the promise with an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def tryFailure(t: Throwable): Boolean
+
/** Wraps a `Throwable` in an `ExecutionException` if necessary.
*
* $allowedThrowables
@@ -58,7 +77,7 @@ trait Promise[T] {
case t: Throwable if isFutureThrowable(t) => t
case _ => new ExecutionException(t)
}
-
+
}
diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala
index e359456736..2b41c0c62e 100644
--- a/src/library/scala/concurrent/akka/Future.scala
+++ b/src/library/scala/concurrent/akka/Future.scala
@@ -1,4 +1,4 @@
-/* __ *\
+/*/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
@@ -6,11 +6,182 @@
** |/ **
\* */
-package scala.concurrent
-package akka
+package scala.concurrent.akka
+sealed trait Future[+T] extends scala.concurrent.Future with Awaitable[T] {
+
+ implicit def executor: ExecutionContext
+
+ /**
+ * For use only within a Future.flow block or another compatible Delimited Continuations reset block.
+ *
+ * Returns the result of this Future without blocking, by suspending execution and storing it as a
+ * continuation until the result is available.
+ */
+ def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any]))
+
+ /**
+ * Tests whether this Future has been completed.
+ */
+ final def isCompleted: Boolean = value.isDefined
+
+ /**
+ * The contained value of this Future. Before this Future is completed
+ * the value will be None. After completion the value will be Some(Right(t))
+ * if it contains a valid result, or Some(Left(error)) if it contains
+ * an exception.
+ */
+ def value: Option[Either[Throwable, T]]
+
+ def onComplete(func: Either[Throwable, T] => Unit): this.type
+
+ /**
+ * Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this.
+ * This is semantically the same as: Future.firstCompletedOf(Seq(this, that))
+ */
+ //FIXME implement as the result of any of the Futures, or if both failed, the first failure
+ def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize
+
+ final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
+ val future = Promise[A]()
+ onComplete {
+ case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) })
+ case otherwise ⇒ future complete otherwise
+ }
+ future
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future. If this Future is completed with an exception then the new
+ * Future will also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor ? "Hello" // returns 5
+ * b: String <- actor ? a // returns "10"
+ * c: String <- actor ? 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def map[A](f: T ⇒ A): Future[A] = {
+ val future = Promise[A]()
+ onComplete {
+ case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
+ case Right(res) ⇒
+ future complete (try {
+ Right(f(res))
+ } catch {
+ case e ⇒
+ logError("Future.map", e)
+ Left(e)
+ })
+ }
+ future
+ }
+
+ /**
+ * Creates a new Future[A] which is completed with this Future's result if
+ * that conforms to A's erased type or a ClassCastException otherwise.
+ */
+ final def mapTo[A](implicit m: Manifest[A]): Future[A] = {
+ val fa = Promise[A]()
+ onComplete {
+ case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]]
+ case Right(t) ⇒
+ fa complete (try {
+ Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
+ } catch {
+ case e: ClassCastException ⇒ Left(e)
+ })
+ }
+ fa
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future, and returns the result of the function as the new Future.
+ * If this Future is completed with an exception then the new Future will
+ * also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor ? "Hello" // returns 5
+ * b: String <- actor ? a // returns "10"
+ * c: String <- actor ? 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def flatMap[A](f: T ⇒ Future[A]): Future[A] = {
+ val p = Promise[A]()
+
+ onComplete {
+ case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]]
+ case Right(r) ⇒
+ try {
+ p completeWith f(r)
+ } catch {
+ case e ⇒
+ p complete Left(e)
+ logError("Future.flatMap", e)
+ }
+ }
+ p
+ }
+
+ /**
+ * Same as onSuccess { case r => f(r) } but is also used in for-comprehensions
+ */
+ final def foreach(f: T ⇒ Unit): Unit = onComplete {
+ case Right(r) ⇒ f(r)
+ case _ ⇒
+ }
+
+ /**
+ * Used by for-comprehensions
+ */
+ final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p)
+
+ final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) {
+ def foreach(f: A ⇒ Unit): Unit = self filter p foreach f
+ def map[B](f: A ⇒ B): Future[B] = self filter p map f
+ def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f
+ def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
+ }
+
+ /**
+ * Returns a new Future that will hold the successful result of this Future if it matches
+ * the given predicate, if it doesn't match, the resulting Future will be a failed Future
+ * with a MatchError, of if this Future fails, that failure will be propagated to the returned Future
+ */
+ final def filter(pred: T ⇒ Boolean): Future[T] = {
+ val p = Promise[T]()
+ onComplete {
+ case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]]
+ case r @ Right(res) ⇒ p complete (try {
+ if (pred(res)) r else Left(new MatchError(res))
+ } catch {
+ case e ⇒
+ logError("Future.filter", e)
+ Left(e)
+ })
+ }
+ p
+ }
+
+ protected def logError(msg: String, problem: Throwable): Unit = {
+ executor match {
+ case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage))
+ case other ⇒ problem.printStackTrace()
+ }
+ }
+}
+
+
+
+*/
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 666e12456d..c35ece5668 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -50,6 +50,12 @@ package object concurrent {
case _ => true
}
+ private[concurrent] def resolveThrowable[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
+ case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T])
+ case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t))
+ case _ => source
+ }
+
/* concurrency constructs */
def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index d62561c92d..abd363cedf 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -184,6 +184,17 @@ trait FutureCombinators extends TestBase {
done()
}
+ // collect: stub
+ def testCollectSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testCollectFailure(): Unit = once {
+ done =>
+ done()
+ }
+
// foreach: stub
def testForeachSuccess(): Unit = once {
done =>
@@ -229,13 +240,15 @@ trait FutureCombinators extends TestBase {
assert(any == cause)
}
}
-
+
testMapSuccess()
testMapFailure()
testFlatMapSuccess()
testFlatMapFailure()
testFilterSuccess()
testFilterFailure()
+ testCollectSuccess()
+ testCollectFailure()
testForeachSuccess()
testForeachFailure()
testRecoverSuccess()