summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl/Promise.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/concurrent/impl/Promise.scala')
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala73
1 files changed, 38 insertions, 35 deletions
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index c2df9ac296..b19bed004b 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -15,42 +15,46 @@ import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, Timeou
import scala.concurrent.util.Duration
import scala.annotation.tailrec
import scala.util.control.NonFatal
-
+import scala.util.{ Try, Success, Failure }
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
def future: this.type = this
}
-private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable {
+/* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`.
+ */
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable {
// must be filled in before running it
- var value: Either[Throwable, T] = null
+ var value: Try[T] = null
override def run() = {
require(value ne null) // must set value to non-null before running!
try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
}
- def executeWithValue(v: Either[Throwable, T]): Unit = {
+ def executeWithValue(v: Try[T]): Unit = {
require(value eq null) // can't complete it twice
value = v
+ // Note that we cannot prepare the ExecutionContext at this point, since we might
+ // already be running on a different thread!
executor.execute(this)
}
}
private[concurrent] object Promise {
- private def resolveEither[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
- case Left(t) => resolver(t)
- case _ => source
+ private def resolveTry[T](source: Try[T]): Try[T] = source match {
+ case Failure(t) => resolver(t)
+ case _ => source
}
- private def resolver[T](throwable: Throwable): Either[Throwable, T] = throwable match {
- case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value.asInstanceOf[T])
- case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t))
- case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t))
- case e: Error => Left(new ExecutionException("Boxed Error", e))
- case t => Left(t)
+ private def resolver[T](throwable: Throwable): Try[T] = throwable match {
+ case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T])
+ case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t))
+ case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t))
+ case e: Error => Failure(new ExecutionException("Boxed Error", e))
+ case t => Failure(t)
}
/** Default promise implementation.
@@ -88,25 +92,25 @@ private[concurrent] object Promise {
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
- case Left(e) => throw e
- case Right(r) => r
+ case Failure(e) => throw e
+ case Success(r) => r
}
- def value: Option[Either[Throwable, T]] = getState match {
- case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]])
- case _ => None
+ def value: Option[Try[T]] = getState match {
+ case c: Try[_] => Some(c.asInstanceOf[Try[T]])
+ case _ => None
}
override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
- case _: Either[_, _] => true
- case _ => false
+ case _: Try[_] => true
+ case _ => false
}
- def tryComplete(value: Either[Throwable, T]): Boolean = {
- val resolved = resolveEither(value)
+ def tryComplete(value: Try[T]): Boolean = {
+ val resolved = resolveTry(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = {
+ def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
@@ -124,13 +128,14 @@ private[concurrent] object Promise {
}
}
- def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
- val runnable = new CallbackRunnable[T](executor, func)
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
+ val preparedEC = executor.prepare
+ val runnable = new CallbackRunnable[T](preparedEC, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]])
+ case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
@@ -141,25 +146,23 @@ private[concurrent] object Promise {
*
* Useful in Future-composition when a value to contribute is already available.
*/
- final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
+ final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] {
- val value = Some(resolveEither(suppliedValue))
+ val value = Some(resolveTry(suppliedValue))
override def isCompleted(): Boolean = true
- def tryComplete(value: Either[Throwable, T]): Boolean = false
+ def tryComplete(value: Try[T]): Boolean = false
- def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = {
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val completedAs = value.get
- (new CallbackRunnable(executor, func)).executeWithValue(completedAs)
+ val preparedEC = executor.prepare
+ (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
- def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
- case Left(e) => throw e
- case Right(r) => r
- }
+ def result(atMost: Duration)(implicit permit: CanAwait): T = value.get.get
}
}