diff options
Diffstat (limited to 'src/library/scala/concurrent/impl/Promise.scala')
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 73 |
1 files changed, 38 insertions, 35 deletions
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index c2df9ac296..b19bed004b 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -15,42 +15,46 @@ import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, Timeou import scala.concurrent.util.Duration import scala.annotation.tailrec import scala.util.control.NonFatal - +import scala.util.{ Try, Success, Failure } private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this } -private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable { +/* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`. + */ +private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it - var value: Either[Throwable, T] = null + var value: Try[T] = null override def run() = { require(value ne null) // must set value to non-null before running! try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } } - def executeWithValue(v: Either[Throwable, T]): Unit = { + def executeWithValue(v: Try[T]): Unit = { require(value eq null) // can't complete it twice value = v + // Note that we cannot prepare the ExecutionContext at this point, since we might + // already be running on a different thread! executor.execute(this) } } private[concurrent] object Promise { - private def resolveEither[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { - case Left(t) => resolver(t) - case _ => source + private def resolveTry[T](source: Try[T]): Try[T] = source match { + case Failure(t) => resolver(t) + case _ => source } - private def resolver[T](throwable: Throwable): Either[Throwable, T] = throwable match { - case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value.asInstanceOf[T]) - case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t)) - case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) - case e: Error => Left(new ExecutionException("Boxed Error", e)) - case t => Left(t) + private def resolver[T](throwable: Throwable): Try[T] = throwable match { + case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T]) + case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t)) + case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t)) + case e: Error => Failure(new ExecutionException("Boxed Error", e)) + case t => Failure(t) } /** Default promise implementation. @@ -88,25 +92,25 @@ private[concurrent] object Promise { @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { - case Left(e) => throw e - case Right(r) => r + case Failure(e) => throw e + case Success(r) => r } - def value: Option[Either[Throwable, T]] = getState match { - case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]]) - case _ => None + def value: Option[Try[T]] = getState match { + case c: Try[_] => Some(c.asInstanceOf[Try[T]]) + case _ => None } override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value" - case _: Either[_, _] => true - case _ => false + case _: Try[_] => true + case _ => false } - def tryComplete(value: Either[Throwable, T]): Boolean = { - val resolved = resolveEither(value) + def tryComplete(value: Try[T]): Boolean = { + val resolved = resolveTry(value) (try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = { + def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = { getState match { case raw: List[_] => val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] @@ -124,13 +128,14 @@ private[concurrent] object Promise { } } - def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { - val runnable = new CallbackRunnable[T](executor, func) + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { + val preparedEC = executor.prepare + val runnable = new CallbackRunnable[T](preparedEC, func) @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { - case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]]) + case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() @@ -141,25 +146,23 @@ private[concurrent] object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { + final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] { - val value = Some(resolveEither(suppliedValue)) + val value = Some(resolveTry(suppliedValue)) override def isCompleted(): Boolean = true - def tryComplete(value: Either[Throwable, T]): Boolean = false + def tryComplete(value: Try[T]): Boolean = false - def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get - (new CallbackRunnable(executor, func)).executeWithValue(completedAs) + val preparedEC = executor.prepare + (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this - def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { - case Left(e) => throw e - case Right(r) => r - } + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get.get } } |