diff options
Diffstat (limited to 'src/library/scala/concurrent/impl/Promise.scala')
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 167 |
1 files changed, 116 insertions, 51 deletions
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index b15601058e..078ad45be9 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -16,14 +16,42 @@ import scala.util.control.NonFatal import scala.util.{ Try, Success, Failure } import java.io.ObjectInputStream import java.util.concurrent.locks.AbstractQueuedSynchronizer +import java.util.concurrent.atomic.AtomicReference private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this + + import scala.concurrent.Future + import scala.concurrent.impl.Promise.DefaultPromise + + override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { + val p = new DefaultPromise[S]() + onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } + p.future + } + + // If possible, link DefaultPromises to avoid space leaks + override def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = { + val p = new DefaultPromise[S]() + onComplete { + v => try f(v) match { + case fut if fut eq this => p complete v.asInstanceOf[Try[S]] + case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p) + case fut => p completeWith fut + } catch { case NonFatal(t) => p failure t } + } + p.future + } + + override def toString: String = value match { + case Some(result) => "Future("+result+")" + case None => "Future(<not completed>)" + } } /* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`. */ -private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { +private final class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it var value: Try[T] = null @@ -89,7 +117,7 @@ private[concurrent] object Promise { * incomplete, or as complete with the same result value. * * A DefaultPromise stores its state entirely in the AnyRef cell exposed by - * AbstractPromise. The type of object stored in the cell fully describes the + * AtomicReference. The type of object stored in the cell fully describes the * current state of the promise. * * 1. List[CallbackRunnable] - The promise is incomplete and has zero or more callbacks @@ -150,8 +178,7 @@ private[concurrent] object Promise { * DefaultPromises, and `linkedRootOf` is currently only designed to be called * by Future.flatMap. */ - class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => - updateState(null, Nil) // The promise is incomplete and has no callbacks + final class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T] { /** Get the root promise for this promise, compressing the link chain to that * promise if necessary. @@ -167,14 +194,23 @@ private[concurrent] object Promise { * be garbage collected. Also, subsequent calls to this method should be * faster as the link chain will be shorter. */ - @tailrec - private def compressedRoot(): DefaultPromise[T] = { - getState match { - case linked: DefaultPromise[_] => - val target = linked.asInstanceOf[DefaultPromise[T]].root - if (linked eq target) target else if (updateState(linked, target)) target else compressedRoot() + private def compressedRoot(): DefaultPromise[T] = + get() match { + case linked: DefaultPromise[_] => compressedRoot(linked) case _ => this } + + @tailrec + private[this] final def compressedRoot(linked: DefaultPromise[_]): DefaultPromise[T] = { + val target = linked.asInstanceOf[DefaultPromise[T]].root + if (linked eq target) target + else if (compareAndSet(linked, target)) target + else { + get() match { + case newLinked: DefaultPromise[_] => compressedRoot(newLinked) + case _ => this + } + } } /** Get the promise at the root of the chain of linked promises. Used by `compressedRoot()`. @@ -182,18 +218,16 @@ private[concurrent] object Promise { * to compress the link chain whenever possible. */ @tailrec - private def root: DefaultPromise[T] = { - getState match { + private def root: DefaultPromise[T] = + get() match { case linked: DefaultPromise[_] => linked.asInstanceOf[DefaultPromise[T]].root case _ => this } - } /** Try waiting for this promise to be completed. */ protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) { import Duration.Undefined - import scala.concurrent.Future.InternalCallbackExecutor atMost match { case e if e eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period") case Duration.Inf => @@ -225,18 +259,18 @@ private[concurrent] object Promise { def value: Option[Try[T]] = value0 @tailrec - private def value0: Option[Try[T]] = getState match { + private def value0: Option[Try[T]] = get() match { case c: Try[_] => Some(c.asInstanceOf[Try[T]]) - case _: DefaultPromise[_] => compressedRoot().value0 + case dp: DefaultPromise[_] => compressedRoot(dp).value0 case _ => None } override def isCompleted: Boolean = isCompleted0 @tailrec - private def isCompleted0: Boolean = getState match { + private def isCompleted0: Boolean = get() match { case _: Try[_] => true - case _: DefaultPromise[_] => compressedRoot().isCompleted0 + case dp: DefaultPromise[_] => compressedRoot(dp).isCompleted0 case _ => false } @@ -254,21 +288,17 @@ private[concurrent] object Promise { */ @tailrec private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = { - getState match { + get() match { case raw: List[_] => val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] - if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v) - case _: DefaultPromise[_] => - compressedRoot().tryCompleteAndGetListeners(v) + if (compareAndSet(cur, v)) cur else tryCompleteAndGetListeners(v) + case dp: DefaultPromise[_] => compressedRoot(dp).tryCompleteAndGetListeners(v) case _ => null } } - def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { - val preparedEC = executor.prepare() - val runnable = new CallbackRunnable[T](preparedEC, func) - dispatchOrAddCallback(runnable) - } + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = + dispatchOrAddCallback(new CallbackRunnable[T](executor.prepare(), func)) /** Tries to add the callback, if already completed, it dispatches the callback to be executed. * Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks @@ -276,15 +306,16 @@ private[concurrent] object Promise { */ @tailrec private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = { - getState match { + get() match { case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) - case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable) - case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable) + case dp: DefaultPromise[_] => compressedRoot(dp).dispatchOrAddCallback(runnable) + case listeners: List[_] => if (compareAndSet(listeners, runnable :: listeners)) () + else dispatchOrAddCallback(runnable) } } /** Link this promise to the root of another promise using `link()`. Should only be - * be called by Future.flatMap. + * be called by transformWith. */ protected[concurrent] final def linkRootOf(target: DefaultPromise[T]): Unit = link(target.compressedRoot()) @@ -299,18 +330,17 @@ private[concurrent] object Promise { */ @tailrec private def link(target: DefaultPromise[T]): Unit = if (this ne target) { - getState match { + get() match { case r: Try[_] => - if (!target.tryComplete(r.asInstanceOf[Try[T]])) { - // Currently linking is done from Future.flatMap, which should ensure only - // one promise can be completed. Therefore this situation is unexpected. + if (!target.tryComplete(r.asInstanceOf[Try[T]])) throw new IllegalStateException("Cannot link completed promises together") - } - case _: DefaultPromise[_] => - compressedRoot().link(target) - case listeners: List[_] => if (updateState(listeners, target)) { - if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) - } else link(target) + case dp: DefaultPromise[_] => + compressedRoot(dp).link(target) + case listeners: List[_] if compareAndSet(listeners, target) => + if (listeners.nonEmpty) + listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) + case _ => + link(target) } } } @@ -319,23 +349,58 @@ private[concurrent] object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] { + object KeptPromise { + import scala.concurrent.Future + import scala.reflect.ClassTag + + private[this] sealed trait Kept[T] extends Promise[T] { + def result: Try[T] + + override def value: Option[Try[T]] = Some(result) - val value = Some(resolveTry(suppliedValue)) + override def isCompleted: Boolean = true - override def isCompleted: Boolean = true + override def tryComplete(value: Try[T]): Boolean = false - def tryComplete(value: Try[T]): Boolean = false + override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = + (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result) - def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { - val completedAs = value.get - val preparedEC = executor.prepare() - (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs) + override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + + override def result(atMost: Duration)(implicit permit: CanAwait): T = result.get } - def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + private[this] final class Successful[T](val result: Success[T]) extends Kept[T] { + override def onFailure[U](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = () + override def failed: Future[Throwable] = KeptPromise(Failure(new NoSuchElementException("Future.failed not completed with a throwable."))).future + override def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = this + override def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = this + override def fallbackTo[U >: T](that: Future[U]): Future[U] = this + } - def result(atMost: Duration)(implicit permit: CanAwait): T = value.get.get + private[this] final class Failed[T](val result: Failure[T]) extends Kept[T] { + private[this] final def thisAs[S]: Future[S] = future.asInstanceOf[Future[S]] + + override def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = () + override def failed: Future[Throwable] = thisAs[Throwable] + override def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = () + override def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = thisAs[S] + override def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = thisAs[S] + override def flatten[S](implicit ev: T <:< Future[S]): Future[S] = thisAs[S] + override def filter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = this + override def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = thisAs[S] + override def zip[U](that: Future[U]): Future[(T, U)] = thisAs[(T,U)] + override def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = thisAs[R] + override def fallbackTo[U >: T](that: Future[U]): Future[U] = + if (this eq that) this else that.recoverWith({ case _ => this })(InternalCallbackExecutor) + override def mapTo[S](implicit tag: ClassTag[S]): Future[S] = thisAs[S] + } + + def apply[T](result: Try[T]): scala.concurrent.Promise[T] = + resolveTry(result) match { + case s @ Success(_) => new Successful(s) + case f @ Failure(_) => new Failed(f) + } } } |