summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/Future.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/concurrent/Future.scala')
-rw-r--r--src/library/scala/concurrent/Future.scala106
1 files changed, 68 insertions, 38 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index d73801aa90..5f703ac23b 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -17,7 +17,6 @@ import java.util.{ LinkedList => JLinkedList }
import java.{ lang => jl }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
-import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.Duration
import scala.Option
@@ -97,8 +96,8 @@ self =>
* $multipleCallbacks
*/
def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
- case Failure(t) => // do nothing
- case Success(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ }
+ case Left(t) => // do nothing
+ case Right(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ }
}
/** When this future is completed with a failure (i.e. with a throwable),
@@ -114,8 +113,8 @@ self =>
* $multipleCallbacks
*/
def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
- case Failure(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ }
- case Success(v) => // do nothing
+ case Left(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ }
+ case Right(v) => // do nothing
}
/** When this future is completed, either through an exception, a timeout, or a value,
@@ -126,7 +125,7 @@ self =>
*
* $multipleCallbacks
*/
- def onComplete[U](func: Try[T] => U): this.type
+ def onComplete[U](func: Either[Throwable, T] => U): this.type
/* Miscellaneous */
@@ -151,7 +150,7 @@ self =>
* if it contains a valid result, or `Some(Failure(error))` if it contains
* an exception.
*/
- def value: Option[Try[T]]
+ def value: Option[Either[Throwable, T]]
/* Projections */
@@ -175,8 +174,8 @@ self =>
val p = newPromise[Throwable]
onComplete {
- case Failure(t) => p success t
- case Success(v) => p failure noSuchElem(v)
+ case Left(t) => p success t
+ case Right(v) => p failure noSuchElem(v)
}
p.future
@@ -190,8 +189,8 @@ self =>
* Will not be called if the future fails.
*/
def foreach[U](f: T => U): Unit = onComplete {
- case Success(r) => f(r)
- case Failure(_) => // do nothing
+ case Right(r) => f(r)
+ case Left(_) => // do nothing
}
/** Creates a new future by applying a function to the successful result of
@@ -204,8 +203,8 @@ self =>
val p = newPromise[S]
onComplete {
- case Failure(t) => p failure t
- case Success(v) =>
+ case Left(t) => p failure t
+ case Right(v) =>
try p success f(v)
catch {
case t => p complete resolver(t)
@@ -226,12 +225,12 @@ self =>
val p = newPromise[S]
onComplete {
- case Failure(t) => p failure t
- case Success(v) =>
+ case Left(t) => p failure t
+ case Right(v) =>
try {
f(v) onComplete {
- case Failure(t) => p failure t
- case Success(v) => p success v
+ case Left(t) => p failure t
+ case Right(v) => p success v
}
} catch {
case t: Throwable => p complete resolver(t)
@@ -261,8 +260,8 @@ self =>
val p = newPromise[T]
onComplete {
- case Failure(t) => p failure t
- case Success(v) =>
+ case Left(t) => p failure t
+ case Right(v) =>
try {
if (pred(v)) p success v
else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
@@ -274,6 +273,18 @@ self =>
p.future
}
+ /** Used by for-comprehensions.
+ */
+ final def withFilter(p: T => Boolean): Future[T] = filter(p)
+ // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)
+
+ // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) {
+ // def foreach(f: S => Unit): Unit = self filter p foreach f
+ // def map[R](f: S => R) = self filter p map f
+ // def flatMap[R](f: S => Future[R]) = self filter p flatMap f
+ // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x))
+ // }
+
/** Creates a new future by mapping the value of the current future if the given partial function is defined at that value.
*
* If the current future contains a value for which the partial function is defined, the new future will also hold that value.
@@ -298,8 +309,8 @@ self =>
val p = newPromise[S]
onComplete {
- case Failure(t) => p failure t
- case Success(v) =>
+ case Left(t) => p failure t
+ case Right(v) =>
try {
if (pf.isDefinedAt(v)) p success pf(v)
else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
@@ -327,7 +338,7 @@ self =>
val p = newPromise[U]
onComplete {
- case Failure(t) if pf isDefinedAt t =>
+ case Left(t) if pf isDefinedAt t =>
try { p success pf(t) }
catch { case t: Throwable => p complete resolver(t) }
case otherwise => p complete otherwise
@@ -353,7 +364,7 @@ self =>
val p = newPromise[U]
onComplete {
- case Failure(t) if pf isDefinedAt t =>
+ case Left(t) if pf isDefinedAt t =>
try {
p completeWith pf(t)
} catch {
@@ -377,8 +388,8 @@ self =>
val p = newPromise[(T, U)]
this onComplete {
- case Failure(t) => p failure t
- case Success(r) => that onSuccess {
+ case Left(t) => p failure t
+ case Right(r) => that onSuccess {
case r2 => p success ((r, r2))
}
}
@@ -408,16 +419,35 @@ self =>
val p = newPromise[U]
onComplete {
- case Failure(t) => that onComplete {
- case Failure(_) => p failure t
- case Success(v) => p success v
+ case Left(t) => that onComplete {
+ case Left(_) => p failure t
+ case Right(v) => p success v
}
- case Success(v) => p success v
+ case Right(v) => p success v
}
p.future
}
-
+
+ /** Creates a new `Future[S]` which is completed with this `Future`'s result if
+ * that conforms to `S`'s erased type or a `ClassCastException` otherwise.
+ */
+ def mapTo[S](implicit m: Manifest[S]): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]]
+ case Right(t) =>
+ p complete (try {
+ Right(impl.Future.boxedType(m.erasure).cast(t).asInstanceOf[S])
+ } catch {
+ case e: ClassCastException => Left(e)
+ })
+ }
+
+ p.future
+ }
+
/** Applies the side-effecting function to the result of this future, and returns
* a new future with the result of this future.
*
@@ -441,7 +471,7 @@ self =>
* }
* }}}
*/
- def andThen[U](pf: PartialFunction[Try[T], U]): Future[T] = {
+ def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = {
val p = newPromise[T]
onComplete {
@@ -469,9 +499,9 @@ self =>
def either[U >: T](that: Future[U]): Future[U] = {
val p = self.newPromise[U]
- val completePromise: PartialFunction[Try[U], _] = {
- case Failure(t) => p tryFailure t
- case Success(v) => p trySuccess v
+ val completePromise: PartialFunction[Either[Throwable, U], _] = {
+ case Left(t) => p tryFailure t
+ case Right(v) => p trySuccess v
}
self onComplete completePromise
@@ -510,7 +540,7 @@ object Future {
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
- val completeFirst: Try[T] => Unit = p tryComplete _
+ val completeFirst: Either[Throwable, T] => Unit = p tryComplete _
futures.foreach(_ onComplete completeFirst)
p.future
@@ -523,14 +553,14 @@ object Future {
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futures.size)
- val search: Try[T] => Unit = v => try {
+ val search: Either[Throwable, T] => Unit = v => try {
v match {
- case Success(r) => if (predicate(r)) result tryComplete Success(Some(r))
+ case Right(r) => if (predicate(r)) result tryComplete Right(Some(r))
case _ =>
}
} finally {
if (ref.decrementAndGet == 0)
- result tryComplete Success(None)
+ result tryComplete Right(None)
}
futures.foreach(_ onComplete search)