summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHeather Miller <heather.miller@epfl.ch>2012-08-04 21:55:35 +0200
committerHeather Miller <heather.miller@epfl.ch>2012-08-04 21:55:35 +0200
commit3cb0e784a05db7d0b542cec9bf4c5fbf3772a6cf (patch)
tree36c1a6040e0f0aa6e64c0eef6fb55bea61bf4141 /src
parentab63cca87f68d80aff0ff6cd83ecd85b9e1d0c7a (diff)
downloadscala-3cb0e784a05db7d0b542cec9bf4c5fbf3772a6cf.tar.gz
scala-3cb0e784a05db7d0b542cec9bf4c5fbf3772a6cf.tar.bz2
scala-3cb0e784a05db7d0b542cec9bf4c5fbf3772a6cf.zip
Basing Futures on Try instead of Either
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/concurrent/Future.scala85
-rw-r--r--src/library/scala/concurrent/Promise.scala18
-rw-r--r--src/library/scala/concurrent/impl/Future.scala4
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala62
-rw-r--r--src/library/scala/util/Try.scala23
5 files changed, 87 insertions, 105 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index d24fdbf005..09e29d971d 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -117,7 +117,7 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Right(v) if pf isDefinedAt v => pf(v)
+ case Success(v) if pf isDefinedAt v => pf(v)
case _ =>
}(executor)
@@ -135,7 +135,7 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Left(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t)
+ case Failure(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t)
case _ =>
}(executor)
@@ -148,7 +148,7 @@ trait Future[+T] extends Awaitable[T] {
* $multipleCallbacks
* $callbackInContext
*/
- def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit
/* Miscellaneous */
@@ -169,7 +169,7 @@ trait Future[+T] extends Awaitable[T] {
* if it contains a valid result, or `Some(Failure(error))` if it contains
* an exception.
*/
- def value: Option[Either[Throwable, T]]
+ def value: Option[Try[T]]
/* Projections */
@@ -190,8 +190,8 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[Throwable]()
onComplete {
- case Left(t) => p success t
- case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
+ case Failure(t) => p success t
+ case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
}
p.future
@@ -205,7 +205,7 @@ trait Future[+T] extends Awaitable[T] {
* Will not be called if the future fails.
*/
def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete {
- case Right(r) => f(r)
+ case Success(r) => f(r)
case _ => // do nothing
}(executor)
@@ -227,8 +227,8 @@ trait Future[+T] extends Awaitable[T] {
case result =>
try {
result match {
- case Left(t) => p failure f(t)
- case Right(r) => p success s(r)
+ case Failure(t) => p failure f(t)
+ case Success(r) => p success s(r)
}
} catch {
case NonFatal(t) => p failure t
@@ -251,8 +251,8 @@ trait Future[+T] extends Awaitable[T] {
case result =>
try {
result match {
- case Right(r) => p success f(r)
- case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
+ case Success(r) => p success f(r)
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
}
} catch {
case NonFatal(t) => p failure t
@@ -273,12 +273,12 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[S]()
onComplete {
- case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
- case Right(v) =>
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(v) =>
try {
f(v).onComplete({
- case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
- case Right(v) => p success v
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(v) => p success v
})(internalExecutor)
} catch {
case NonFatal(t) => p failure t
@@ -308,8 +308,8 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[T]()
onComplete {
- case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, T]]
- case Right(v) =>
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[T]]
+ case Success(v) =>
try {
if (pred(v)) p success v
else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
@@ -357,8 +357,8 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[S]()
onComplete {
- case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
- case Right(v) =>
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(v) =>
try {
if (pf.isDefinedAt(v)) p success pf(v)
else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
@@ -377,22 +377,15 @@ trait Future[+T] extends Awaitable[T] {
* Example:
*
* {{{
- * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
- * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
- * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
+ * future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
+ * future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception
+ * future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
* }}}
*/
def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
- onComplete {
- case Left(t) if pf isDefinedAt t =>
- try { p success pf(t) }
- catch {
- case NonFatal(t) => p failure t
- }
- case otherwise => p complete otherwise
- }(executor)
+ onComplete { case tr => p.complete(tr recover pf) }(executor)
p.future
}
@@ -414,7 +407,7 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[U]()
onComplete {
- case Left(t) if pf isDefinedAt t =>
+ case Failure(t) if pf isDefinedAt t =>
try {
p completeWith pf(t)
} catch {
@@ -438,8 +431,8 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[(T, U)]()
this onComplete {
- case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, (T, U)]]
- case Right(r) =>
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]]
+ case Success(r) =>
that onSuccess {
case r2 => p success ((r, r2))
}
@@ -468,8 +461,8 @@ trait Future[+T] extends Awaitable[T] {
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
val p = Promise[U]()
onComplete {
- case r @ Right(_) ⇒ p complete r
- case _ ⇒ p completeWith that
+ case s @ Success(_) => p complete s
+ case _ => p completeWith that
}
p.future
}
@@ -485,12 +478,12 @@ trait Future[+T] extends Awaitable[T] {
val p = Promise[S]()
onComplete {
- case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]]
- case Right(t) =>
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(t) =>
p complete (try {
- Right(boxedType(tag.runtimeClass).cast(t).asInstanceOf[S])
+ Success(boxedType(tag.runtimeClass).cast(t).asInstanceOf[S])
} catch {
- case e: ClassCastException => Left(e)
+ case e: ClassCastException => Failure(e)
})
}
@@ -520,7 +513,7 @@ trait Future[+T] extends Awaitable[T] {
* }
* }}}
*/
- def andThen[U](pf: PartialFunction[Either[Throwable, T], U])(implicit executor: ExecutionContext): Future[T] = {
+ def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
onComplete {
@@ -545,7 +538,7 @@ trait Future[+T] extends Awaitable[T] {
*/
def either[U >: T](that: Future[U]): Future[U] = {
val p = Promise[U]()
- val completePromise: PartialFunction[Either[Throwable, U], _] = { case result => p tryComplete result }
+ val completePromise: PartialFunction[Try[U], _] = { case result => p tryComplete result }
this onComplete completePromise
that onComplete completePromise
@@ -615,7 +608,7 @@ object Future {
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
- val completeFirst: Either[Throwable, T] => Unit = p tryComplete _
+ val completeFirst: Try[T] => Unit = p tryComplete _
futures.foreach(_ onComplete completeFirst)
p.future
@@ -629,14 +622,14 @@ object Future {
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futures.size)
- val search: Either[Throwable, T] => Unit = v => try {
+ val search: Try[T] => Unit = v => try {
v match {
- case Right(r) => if (predicate(r)) result tryComplete Right(Some(r))
- case _ =>
+ case Success(r) => if (predicate(r)) result tryComplete Success(Some(r))
+ case _ =>
}
} finally {
if (ref.decrementAndGet == 0) {
- result tryComplete Right(None)
+ result tryComplete Success(None)
}
}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 5d1b2c00b6..b873939c15 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -8,6 +8,8 @@
package scala.concurrent
+import scala.util.{ Try, Success, Failure }
+
/** Promise is an object which can be completed with a value or failed
* with an exception.
*
@@ -49,7 +51,7 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def complete(result: Either[Throwable, T]): this.type =
+ def complete(result: Try[T]): this.type =
if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
/** Tries to complete the promise with either a value or the exception.
@@ -58,7 +60,7 @@ trait Promise[T] {
*
* @return If the promise has already been completed returns `false`, or `true` otherwise.
*/
- def tryComplete(result: Either[Throwable, T]): Boolean
+ def tryComplete(result: Try[T]): Boolean
/** Completes this promise with the specified future, once that future is completed.
*
@@ -84,7 +86,7 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def success(v: T): this.type = complete(Right(v))
+ def success(v: T): this.type = complete(Success(v))
/** Tries to complete the promise with a value.
*
@@ -92,7 +94,7 @@ trait Promise[T] {
*
* @return If the promise has already been completed returns `false`, or `true` otherwise.
*/
- def trySuccess(value: T): Boolean = tryComplete(Right(value))
+ def trySuccess(value: T): Boolean = tryComplete(Success(value))
/** Completes the promise with an exception.
*
@@ -102,7 +104,7 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def failure(t: Throwable): this.type = complete(Left(t))
+ def failure(t: Throwable): this.type = complete(Failure(t))
/** Tries to complete the promise with an exception.
*
@@ -110,7 +112,7 @@ trait Promise[T] {
*
* @return If the promise has already been completed returns `false`, or `true` otherwise.
*/
- def tryFailure(t: Throwable): Boolean = tryComplete(Left(t))
+ def tryFailure(t: Throwable): Boolean = tryComplete(Failure(t))
}
@@ -129,14 +131,14 @@ object Promise {
* @tparam T the type of the value in the promise
* @return the newly created `Promise` object
*/
- def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception))
+ def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception))
/** Creates an already completed Promise with the specified result.
*
* @tparam T the type of the value in the promise
* @return the newly created `Promise` object
*/
- def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Right(result))
+ def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Success(result))
}
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 098008e958..d92691901f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -12,7 +12,7 @@ package scala.concurrent.impl
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
-
+import scala.util.{Try, Success, Failure}
private[concurrent] object Future {
@@ -21,7 +21,7 @@ private[concurrent] object Future {
override def run() = {
promise complete {
- try Right(body) catch { case NonFatal(e) => Left(e) }
+ try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index c2df9ac296..fab6b55c52 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -15,23 +15,23 @@ import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, Timeou
import scala.concurrent.util.Duration
import scala.annotation.tailrec
import scala.util.control.NonFatal
-
+import scala.util.{ Try, Success, Failure }
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
def future: this.type = this
}
-private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable {
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Try[T]) => Any) extends Runnable with OnCompleteRunnable {
// must be filled in before running it
- var value: Either[Throwable, T] = null
+ var value: Try[T] = null
override def run() = {
require(value ne null) // must set value to non-null before running!
try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
}
- def executeWithValue(v: Either[Throwable, T]): Unit = {
+ def executeWithValue(v: Try[T]): Unit = {
require(value eq null) // can't complete it twice
value = v
executor.execute(this)
@@ -40,17 +40,17 @@ private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete
private[concurrent] object Promise {
- private def resolveEither[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
- case Left(t) => resolver(t)
- case _ => source
+ private def resolveTry[T](source: Try[T]): Try[T] = source match {
+ case Failure(t) => resolver(t)
+ case _ => source
}
- private def resolver[T](throwable: Throwable): Either[Throwable, T] = throwable match {
- case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value.asInstanceOf[T])
- case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t))
- case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t))
- case e: Error => Left(new ExecutionException("Boxed Error", e))
- case t => Left(t)
+ private def resolver[T](throwable: Throwable): Try[T] = throwable match {
+ case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T])
+ case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t))
+ case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t))
+ case e: Error => Failure(new ExecutionException("Boxed Error", e))
+ case t => Failure(t)
}
/** Default promise implementation.
@@ -88,25 +88,25 @@ private[concurrent] object Promise {
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
- case Left(e) => throw e
- case Right(r) => r
+ case Failure(e) => throw e
+ case Success(r) => r
}
- def value: Option[Either[Throwable, T]] = getState match {
- case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]])
- case _ => None
+ def value: Option[Try[T]] = getState match {
+ case c: Try[_] => Some(c.asInstanceOf[Try[T]])
+ case _ => None
}
override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
- case _: Either[_, _] => true
- case _ => false
+ case _: Try[_] => true
+ case _ => false
}
- def tryComplete(value: Either[Throwable, T]): Boolean = {
- val resolved = resolveEither(value)
+ def tryComplete(value: Try[T]): Boolean = {
+ val resolved = resolveTry(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = {
+ def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
@@ -124,13 +124,13 @@ private[concurrent] object Promise {
}
}
- def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val runnable = new CallbackRunnable[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]])
+ case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
@@ -141,15 +141,15 @@ private[concurrent] object Promise {
*
* Useful in Future-composition when a value to contribute is already available.
*/
- final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
+ final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] {
- val value = Some(resolveEither(suppliedValue))
+ val value = Some(resolveTry(suppliedValue))
override def isCompleted(): Boolean = true
- def tryComplete(value: Either[Throwable, T]): Boolean = false
+ def tryComplete(value: Try[T]): Boolean = false
- def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get
(new CallbackRunnable(executor, func)).executeWithValue(completedAs)
}
@@ -157,8 +157,8 @@ private[concurrent] object Promise {
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
- case Left(e) => throw e
- case Right(r) => r
+ case Failure(e) => throw e
+ case Success(r) => r
}
}
diff --git a/src/library/scala/util/Try.scala b/src/library/scala/util/Try.scala
index f85bac0b84..487ddaced3 100644
--- a/src/library/scala/util/Try.scala
+++ b/src/library/scala/util/Try.scala
@@ -54,6 +54,7 @@ import language.implicitConversions
*
* `Try` comes to the Scala standard library after years of use as an integral part of Twitter's stack.
*
+ * @author based on Marius Eriksen's original implementation in com.twitter.util.
* @since 2.10
*/
sealed abstract class Try[+T] {
@@ -102,7 +103,7 @@ sealed abstract class Try[+T] {
* Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`.
* This is like `flatMap` for the exception.
*/
- def rescue[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U]
+ def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U]
/**
* Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`.
@@ -150,20 +151,6 @@ sealed abstract class Try[+T] {
object Try {
- implicit def try2either[T](tr: Try[T]): Either[Throwable, T] = {
- tr match {
- case Success(v) => Right(v)
- case Failure(t) => Left(t)
- }
- }
-
- implicit def either2try[T](ei: Either[Throwable, T]): Try[T] = {
- ei match {
- case Right(v) => Success(v)
- case Left(t) => Failure(t)
- }
- }
-
def apply[T](r: => T): Try[T] = {
try { Success(r) } catch {
case NonFatal(e) => Failure(e)
@@ -175,7 +162,7 @@ object Try {
final case class Failure[+T](val exception: Throwable) extends Try[T] {
def isFailure: Boolean = true
def isSuccess: Boolean = false
- def rescue[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] =
+ def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] =
if (f.isDefinedAt(exception)) f(exception) else this
def get: T = throw exception
def flatMap[U](f: T => Try[U]): Try[U] = Failure[U](exception)
@@ -201,12 +188,12 @@ final case class Failure[+T](val exception: Throwable) extends Try[T] {
final case class Success[+T](value: T) extends Try[T] {
def isFailure: Boolean = false
def isSuccess: Boolean = true
- def rescue[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = Success(value)
+ def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = Success(value)
def get = value
def flatMap[U](f: T => Try[U]): Try[U] =
try f(value)
catch {
- case e: Throwable => Failure(e)
+ case NonFatal(e) => Failure(e)
}
def flatten[U](implicit ev: T <:< Try[U]): Try[U] = value
def foreach[U](f: T => U): Unit = f(value)