diff options
Diffstat (limited to 'src/library/scala/concurrent/Future.scala')
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 106 |
1 files changed, 68 insertions, 38 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index d73801aa90..5f703ac23b 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -17,7 +17,6 @@ import java.util.{ LinkedList => JLinkedList } import java.{ lang => jl } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } -import scala.util.{ Try, Success, Failure } import scala.concurrent.util.Duration import scala.Option @@ -97,8 +96,8 @@ self => * $multipleCallbacks */ def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete { - case Failure(t) => // do nothing - case Success(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ } + case Left(t) => // do nothing + case Right(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ } } /** When this future is completed with a failure (i.e. with a throwable), @@ -114,8 +113,8 @@ self => * $multipleCallbacks */ def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete { - case Failure(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ } - case Success(v) => // do nothing + case Left(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ } + case Right(v) => // do nothing } /** When this future is completed, either through an exception, a timeout, or a value, @@ -126,7 +125,7 @@ self => * * $multipleCallbacks */ - def onComplete[U](func: Try[T] => U): this.type + def onComplete[U](func: Either[Throwable, T] => U): this.type /* Miscellaneous */ @@ -151,7 +150,7 @@ self => * if it contains a valid result, or `Some(Failure(error))` if it contains * an exception. */ - def value: Option[Try[T]] + def value: Option[Either[Throwable, T]] /* Projections */ @@ -175,8 +174,8 @@ self => val p = newPromise[Throwable] onComplete { - case Failure(t) => p success t - case Success(v) => p failure noSuchElem(v) + case Left(t) => p success t + case Right(v) => p failure noSuchElem(v) } p.future @@ -190,8 +189,8 @@ self => * Will not be called if the future fails. */ def foreach[U](f: T => U): Unit = onComplete { - case Success(r) => f(r) - case Failure(_) => // do nothing + case Right(r) => f(r) + case Left(_) => // do nothing } /** Creates a new future by applying a function to the successful result of @@ -204,8 +203,8 @@ self => val p = newPromise[S] onComplete { - case Failure(t) => p failure t - case Success(v) => + case Left(t) => p failure t + case Right(v) => try p success f(v) catch { case t => p complete resolver(t) @@ -226,12 +225,12 @@ self => val p = newPromise[S] onComplete { - case Failure(t) => p failure t - case Success(v) => + case Left(t) => p failure t + case Right(v) => try { f(v) onComplete { - case Failure(t) => p failure t - case Success(v) => p success v + case Left(t) => p failure t + case Right(v) => p success v } } catch { case t: Throwable => p complete resolver(t) @@ -261,8 +260,8 @@ self => val p = newPromise[T] onComplete { - case Failure(t) => p failure t - case Success(v) => + case Left(t) => p failure t + case Right(v) => try { if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) @@ -274,6 +273,18 @@ self => p.future } + /** Used by for-comprehensions. + */ + final def withFilter(p: T => Boolean): Future[T] = filter(p) + // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) + + // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) { + // def foreach(f: S => Unit): Unit = self filter p foreach f + // def map[R](f: S => R) = self filter p map f + // def flatMap[R](f: S => Future[R]) = self filter p flatMap f + // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x)) + // } + /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. * * If the current future contains a value for which the partial function is defined, the new future will also hold that value. @@ -298,8 +309,8 @@ self => val p = newPromise[S] onComplete { - case Failure(t) => p failure t - case Success(v) => + case Left(t) => p failure t + case Right(v) => try { if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) @@ -327,7 +338,7 @@ self => val p = newPromise[U] onComplete { - case Failure(t) if pf isDefinedAt t => + case Left(t) if pf isDefinedAt t => try { p success pf(t) } catch { case t: Throwable => p complete resolver(t) } case otherwise => p complete otherwise @@ -353,7 +364,7 @@ self => val p = newPromise[U] onComplete { - case Failure(t) if pf isDefinedAt t => + case Left(t) if pf isDefinedAt t => try { p completeWith pf(t) } catch { @@ -377,8 +388,8 @@ self => val p = newPromise[(T, U)] this onComplete { - case Failure(t) => p failure t - case Success(r) => that onSuccess { + case Left(t) => p failure t + case Right(r) => that onSuccess { case r2 => p success ((r, r2)) } } @@ -408,16 +419,35 @@ self => val p = newPromise[U] onComplete { - case Failure(t) => that onComplete { - case Failure(_) => p failure t - case Success(v) => p success v + case Left(t) => that onComplete { + case Left(_) => p failure t + case Right(v) => p success v } - case Success(v) => p success v + case Right(v) => p success v } p.future } - + + /** Creates a new `Future[S]` which is completed with this `Future`'s result if + * that conforms to `S`'s erased type or a `ClassCastException` otherwise. + */ + def mapTo[S](implicit m: Manifest[S]): Future[S] = { + val p = newPromise[S] + + onComplete { + case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]] + case Right(t) => + p complete (try { + Right(impl.Future.boxedType(m.erasure).cast(t).asInstanceOf[S]) + } catch { + case e: ClassCastException => Left(e) + }) + } + + p.future + } + /** Applies the side-effecting function to the result of this future, and returns * a new future with the result of this future. * @@ -441,7 +471,7 @@ self => * } * }}} */ - def andThen[U](pf: PartialFunction[Try[T], U]): Future[T] = { + def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = { val p = newPromise[T] onComplete { @@ -469,9 +499,9 @@ self => def either[U >: T](that: Future[U]): Future[U] = { val p = self.newPromise[U] - val completePromise: PartialFunction[Try[U], _] = { - case Failure(t) => p tryFailure t - case Success(v) => p trySuccess v + val completePromise: PartialFunction[Either[Throwable, U], _] = { + case Left(t) => p tryFailure t + case Right(v) => p trySuccess v } self onComplete completePromise @@ -510,7 +540,7 @@ object Future { def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() - val completeFirst: Try[T] => Unit = p tryComplete _ + val completeFirst: Either[Throwable, T] => Unit = p tryComplete _ futures.foreach(_ onComplete completeFirst) p.future @@ -523,14 +553,14 @@ object Future { else { val result = Promise[Option[T]]() val ref = new AtomicInteger(futures.size) - val search: Try[T] => Unit = v => try { + val search: Either[Throwable, T] => Unit = v => try { v match { - case Success(r) => if (predicate(r)) result tryComplete Success(Some(r)) + case Right(r) => if (predicate(r)) result tryComplete Right(Some(r)) case _ => } } finally { if (ref.decrementAndGet == 0) - result tryComplete Success(None) + result tryComplete Right(None) } futures.foreach(_ onComplete search) |