summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-05 17:33:38 +0100
committerAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-05 17:33:38 +0100
commit730564354514a9e6d3506f0f1211fa4032cfafc9 (patch)
tree2ffb1f1eae46b07e718b8af13eb141e5d2346edf
parent4448140cfd6cfdbf469478f1321f692a4512c9fe (diff)
downloadscala-730564354514a9e6d3506f0f1211fa4032cfafc9.tar.gz
scala-730564354514a9e6d3506f0f1211fa4032cfafc9.tar.bz2
scala-730564354514a9e6d3506f0f1211fa4032cfafc9.zip
Completed default implementations of monadic ops on futures.
-rw-r--r--src/library/scala/concurrent/Future.scala56
1 files changed, 50 insertions, 6 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 16741ad50c..361bed4d2b 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -124,7 +124,11 @@ self =>
def onComplete[U](func: Either[Throwable, T] => U): this.type
- /* Various info */
+ /* Miscellaneous */
+
+ /** Creates a new promise.
+ */
+ def newPromise[S]: Promise[S]
/** Tests whether this Future's timeout has expired.
*
@@ -151,6 +155,7 @@ self =>
/* Projections */
def failed: Future[Throwable] = new Future[Throwable] {
+ def newPromise[S] = self.newPromise[S]
def onComplete[U](func: Either[Throwable, Throwable] => U) = self.onComplete {
case Left(t) => func(Right(t))
case Right(v) => // do nothing
@@ -160,6 +165,7 @@ self =>
}
def timedout: Future[TimeoutException] = new Future[TimeoutException] {
+ def newPromise[S] = self.newPromise[S]
def onComplete[U](func: Either[Throwable, TimeoutException] => U) = self.onComplete {
case Left(te: TimeoutException) => func(Right(te))
case _ => // do nothing
@@ -183,7 +189,16 @@ self =>
* future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
* }}}
*/
- def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit timeout: Timeout): Future[U]
+ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit timeout: Timeout): Future[U] = {
+ val p = newPromise[U]
+
+ onComplete {
+ case Left(t) => if (pf isDefinedAt t) p fulfill pf(t) else p fail t
+ case Right(v) => p fulfill v
+ }
+
+ p.future
+ }
/** Asynchronously processes the value in the future once the value becomes available.
*
@@ -191,7 +206,7 @@ self =>
*
* This method typically registers an `onResult` callback.
*/
- def foreach[U](f: T => U): Unit
+ def foreach[U](f: T => U): Unit = onResult f
/** Creates a new future by applying a function to the successful result of
* this future. If this future is completed with an exception then the new
@@ -199,7 +214,16 @@ self =>
*
* $forComprehensionExample
*/
- def map[S](f: T => S)(implicit timeout: Timeout): Future[S]
+ def map[S](f: T => S)(implicit timeout: Timeout): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p fail t
+ case Right(v) => p fulfill f(v)
+ }
+
+ p.future
+ }
/** Creates a new future by applying a function to the successful result of
* this future, and returns the result of the function as the new future.
@@ -208,7 +232,18 @@ self =>
*
* $forComprehensionExample
*/
- def flatMap[S](f: T => Future[S])(implicit timeout: Timeout): Future[S]
+ def flatMap[S](f: T => Future[S])(implicit timeout: Timeout): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p fail t
+ case Right(f) => f onComplete {
+ p fulfill _
+ }
+ }
+
+ p.future
+ }
/** Creates a new future by filtering the value of the current future with a predicate.
*
@@ -226,7 +261,16 @@ self =>
* block on h // throw a NoSuchElementException
* }}}
*/
- def filter(p: T => Boolean)(implicit timeout: Timeout): Future[T]
+ def filter(p: T => Boolean)(implicit timeout: Timeout): Future[T] = {
+ val p = newPromise[T]
+
+ onComplete {
+ case Left(t) => p fail t
+ case Right(v) => if (p(v)) p fulfill v else p fail new NoSuchElementException
+ }
+
+ p.future
+ }
}