diff options
author | Aleksandar Prokopec <aleksandar.prokopec@gmail.com> | 2011-12-05 17:33:38 +0100 |
---|---|---|
committer | Aleksandar Prokopec <aleksandar.prokopec@gmail.com> | 2011-12-05 17:33:38 +0100 |
commit | 730564354514a9e6d3506f0f1211fa4032cfafc9 (patch) | |
tree | 2ffb1f1eae46b07e718b8af13eb141e5d2346edf | |
parent | 4448140cfd6cfdbf469478f1321f692a4512c9fe (diff) | |
download | scala-730564354514a9e6d3506f0f1211fa4032cfafc9.tar.gz scala-730564354514a9e6d3506f0f1211fa4032cfafc9.tar.bz2 scala-730564354514a9e6d3506f0f1211fa4032cfafc9.zip |
Completed default implementations of monadic ops on futures.
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 56 |
1 files changed, 50 insertions, 6 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 16741ad50c..361bed4d2b 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -124,7 +124,11 @@ self => def onComplete[U](func: Either[Throwable, T] => U): this.type - /* Various info */ + /* Miscellaneous */ + + /** Creates a new promise. + */ + def newPromise[S]: Promise[S] /** Tests whether this Future's timeout has expired. * @@ -151,6 +155,7 @@ self => /* Projections */ def failed: Future[Throwable] = new Future[Throwable] { + def newPromise[S] = self.newPromise[S] def onComplete[U](func: Either[Throwable, Throwable] => U) = self.onComplete { case Left(t) => func(Right(t)) case Right(v) => // do nothing @@ -160,6 +165,7 @@ self => } def timedout: Future[TimeoutException] = new Future[TimeoutException] { + def newPromise[S] = self.newPromise[S] def onComplete[U](func: Either[Throwable, TimeoutException] => U) = self.onComplete { case Left(te: TimeoutException) => func(Right(te)) case _ => // do nothing @@ -183,7 +189,16 @@ self => * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3 * }}} */ - def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit timeout: Timeout): Future[U] + def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit timeout: Timeout): Future[U] = { + val p = newPromise[U] + + onComplete { + case Left(t) => if (pf isDefinedAt t) p fulfill pf(t) else p fail t + case Right(v) => p fulfill v + } + + p.future + } /** Asynchronously processes the value in the future once the value becomes available. * @@ -191,7 +206,7 @@ self => * * This method typically registers an `onResult` callback. */ - def foreach[U](f: T => U): Unit + def foreach[U](f: T => U): Unit = onResult f /** Creates a new future by applying a function to the successful result of * this future. If this future is completed with an exception then the new @@ -199,7 +214,16 @@ self => * * $forComprehensionExample */ - def map[S](f: T => S)(implicit timeout: Timeout): Future[S] + def map[S](f: T => S)(implicit timeout: Timeout): Future[S] = { + val p = newPromise[S] + + onComplete { + case Left(t) => p fail t + case Right(v) => p fulfill f(v) + } + + p.future + } /** Creates a new future by applying a function to the successful result of * this future, and returns the result of the function as the new future. @@ -208,7 +232,18 @@ self => * * $forComprehensionExample */ - def flatMap[S](f: T => Future[S])(implicit timeout: Timeout): Future[S] + def flatMap[S](f: T => Future[S])(implicit timeout: Timeout): Future[S] = { + val p = newPromise[S] + + onComplete { + case Left(t) => p fail t + case Right(f) => f onComplete { + p fulfill _ + } + } + + p.future + } /** Creates a new future by filtering the value of the current future with a predicate. * @@ -226,7 +261,16 @@ self => * block on h // throw a NoSuchElementException * }}} */ - def filter(p: T => Boolean)(implicit timeout: Timeout): Future[T] + def filter(p: T => Boolean)(implicit timeout: Timeout): Future[T] = { + val p = newPromise[T] + + onComplete { + case Left(t) => p fail t + case Right(v) => if (p(v)) p fulfill v else p fail new NoSuchElementException + } + + p.future + } } |