From 730564354514a9e6d3506f0f1211fa4032cfafc9 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Mon, 5 Dec 2011 17:33:38 +0100 Subject: Completed default implementations of monadic ops on futures. --- src/library/scala/concurrent/Future.scala | 56 +++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 16741ad50c..361bed4d2b 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -124,7 +124,11 @@ self => def onComplete[U](func: Either[Throwable, T] => U): this.type - /* Various info */ + /* Miscellaneous */ + + /** Creates a new promise. + */ + def newPromise[S]: Promise[S] /** Tests whether this Future's timeout has expired. * @@ -151,6 +155,7 @@ self => /* Projections */ def failed: Future[Throwable] = new Future[Throwable] { + def newPromise[S] = self.newPromise[S] def onComplete[U](func: Either[Throwable, Throwable] => U) = self.onComplete { case Left(t) => func(Right(t)) case Right(v) => // do nothing @@ -160,6 +165,7 @@ self => } def timedout: Future[TimeoutException] = new Future[TimeoutException] { + def newPromise[S] = self.newPromise[S] def onComplete[U](func: Either[Throwable, TimeoutException] => U) = self.onComplete { case Left(te: TimeoutException) => func(Right(te)) case _ => // do nothing @@ -183,7 +189,16 @@ self => * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3 * }}} */ - def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit timeout: Timeout): Future[U] + def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit timeout: Timeout): Future[U] = { + val p = newPromise[U] + + onComplete { + case Left(t) => if (pf isDefinedAt t) p fulfill pf(t) else p fail t + case Right(v) => p fulfill v + } + + p.future + } /** Asynchronously processes the value in the future once the value becomes available. * @@ -191,7 +206,7 @@ self => * * This method typically registers an `onResult` callback. */ - def foreach[U](f: T => U): Unit + def foreach[U](f: T => U): Unit = onResult f /** Creates a new future by applying a function to the successful result of * this future. If this future is completed with an exception then the new @@ -199,7 +214,16 @@ self => * * $forComprehensionExample */ - def map[S](f: T => S)(implicit timeout: Timeout): Future[S] + def map[S](f: T => S)(implicit timeout: Timeout): Future[S] = { + val p = newPromise[S] + + onComplete { + case Left(t) => p fail t + case Right(v) => p fulfill f(v) + } + + p.future + } /** Creates a new future by applying a function to the successful result of * this future, and returns the result of the function as the new future. @@ -208,7 +232,18 @@ self => * * $forComprehensionExample */ - def flatMap[S](f: T => Future[S])(implicit timeout: Timeout): Future[S] + def flatMap[S](f: T => Future[S])(implicit timeout: Timeout): Future[S] = { + val p = newPromise[S] + + onComplete { + case Left(t) => p fail t + case Right(f) => f onComplete { + p fulfill _ + } + } + + p.future + } /** Creates a new future by filtering the value of the current future with a predicate. * @@ -226,7 +261,16 @@ self => * block on h // throw a NoSuchElementException * }}} */ - def filter(p: T => Boolean)(implicit timeout: Timeout): Future[T] + def filter(p: T => Boolean)(implicit timeout: Timeout): Future[T] = { + val p = newPromise[T] + + onComplete { + case Left(t) => p fail t + case Right(v) => if (p(v)) p fulfill v else p fail new NoSuchElementException + } + + p.future + } } -- cgit v1.2.3