From 7c049a15f6cb3992abc6debabe2b53b2097ffb8a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Apr 2012 15:38:50 +0200 Subject: Updating to latest version of Akka's DefaultPromise --- src/library/scala/concurrent/Future.scala | 20 ++- .../scala/concurrent/impl/AbstractPromise.java | 6 +- src/library/scala/concurrent/impl/Future.scala | 2 +- src/library/scala/concurrent/impl/Promise.scala | 135 +++++++-------------- 4 files changed, 58 insertions(+), 105 deletions(-) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 1463dbcebf..16432f6aac 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -511,16 +511,15 @@ trait Future[+T] extends Awaitable[T] { * Note: using this method yields nondeterministic dataflow programs. */ object Future { - - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. - * - * The result becomes available once the asynchronous computation is completed. - * - * @tparam T the type of the result - * @param body the asychronous computation - * @param execctx the execution context on which the future is run - * @return the `Future` holding the result of the computation - */ + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) import scala.collection.mutable.Builder @@ -614,4 +613,3 @@ object Future { - diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java index 5280d67854..8aac5de042 100644 --- a/src/library/scala/concurrent/impl/AbstractPromise.java +++ b/src/library/scala/concurrent/impl/AbstractPromise.java @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; abstract class AbstractPromise { - private volatile Object _ref = null; + private volatile Object _ref; protected final static AtomicReferenceFieldUpdater updater = - AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); -} + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +} \ No newline at end of file diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index a3c8ed3095..72ffa6a014 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -28,7 +28,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa /** Tests whether this Future has been completed. */ - final def isCompleted: Boolean = value.isDefined + def isCompleted: Boolean /** The contained value of this Future. Before this Future is completed * the value will be None. After completion the value will be Some(Right(t)) diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 07b6d1f278..ef87f27d63 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -74,37 +74,10 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu object Promise { - - def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue - - def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] - - /** Represents the internal state. - * - * [adriaan] it's unsound to make FState covariant (tryComplete won't type check) - */ - sealed trait FState[T] { def value: Option[Either[Throwable, T]] } - - case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] { - def value: Option[Either[Throwable, T]] = None - } - - case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def result: T = value.get.right.get - } - - case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def exception: Throwable = value.get.left.get - } - - private val emptyPendingValue = Pending[Nothing](Nil) - /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { - self => - - updater.set(this, Promise.EmptyPending()) + class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => + updater.set(this, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU protected final def tryAwait(atMost: Duration): Boolean = { @tailrec @@ -115,7 +88,7 @@ object Promise { val start = System.nanoTime() try { synchronized { - while (value.isEmpty) wait(ms, ns) + while (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException => @@ -123,93 +96,91 @@ object Promise { awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) } else - value.isDefined + isCompleted } - - blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) + //FIXME do not do this if there'll be no waiting + blocking(Future.body2awaitable(awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)), atMost) } + @throws(classOf[TimeoutException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (value.isDefined || tryAwait(atMost)) this + if (isCompleted || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { case Left(e) => throw e case Right(r) => r } - def value: Option[Either[Throwable, T]] = getState.value + def value: Option[Either[Throwable, T]] = getState match { + case _: List[_] ⇒ None + case c: Either[_, _] ⇒ Some(c.asInstanceOf[Either[Throwable, T]]) + } + + override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value" + case _: Either[_, _] ⇒ true + case _ ⇒ false + } @inline - private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]] @inline - protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState) @inline - protected final def getState: FState[T] = updater.get(this) + protected final def getState: AnyRef = updater.get(this) def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] => Any] = { + val callbacks: List[Either[Throwable, T] ⇒ Unit] = { try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = { + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = { getState match { - case cur @ Pending(listeners) => - val newState = - if (v.isLeft) Failure(Some(v.asInstanceOf[Left[Throwable, T]])) - else Success(Some(v.asInstanceOf[Right[Throwable, T]])) - - if (updateState(cur, newState)) listeners - else tryComplete(v) - case _ => null + case raw: List[_] ⇒ + val cur = raw.asInstanceOf[List[Either[Throwable, T] ⇒ Unit]] + if (updateState(cur, v)) cur else tryComplete(v) + case _ ⇒ null } } tryComplete(resolveEither(value)) } finally { - synchronized { notifyAll() } // notify any blockers from `tryAwait` + synchronized { notifyAll() } //Notify any evil blockers } } callbacks match { - case null => false - case cs if cs.isEmpty => true - case cs => - Future.dispatchFuture(executor, { - () => cs.foreach(f => notifyCompleted(f, value)) - }) - true + case null ⇒ false + case cs if cs.isEmpty ⇒ true + case cs ⇒ Future.dispatchFuture(executor, () ⇒ cs.foreach(f ⇒ notifyCompleted(f, value))); true } } - def onComplete[U](func: Either[Throwable, T] => U): this.type = { - @tailrec // Returns whether the future has already been completed or not - def tryAddCallback(): Boolean = { + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { + @tailrec //Returns whether the future has already been completed or not + def tryAddCallback(): Either[Throwable, T] = { val cur = getState cur match { - case _: Success[_] | _: Failure[_] => true - case p: Pending[_] => - val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + case r: Either[_, _] ⇒ r.asInstanceOf[Either[Throwable, T]] + case listeners: List[_] ⇒ if (updateState(listeners, func :: listeners)) null else tryAddCallback() } } - if (tryAddCallback()) { - val result = value.get - Future.dispatchFuture(executor, { - () => notifyCompleted(func, result) - }) + tryAddCallback() match { + case null ⇒ this + case completed ⇒ + Future.dispatchFuture(executor, () ⇒ notifyCompleted(func, completed)) + this } - - this } private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { try { func(result) } catch { - case e => executor.reportFailure(e) + case /*NonFatal(*/e/*)*/ => executor.reportFailure(e) } } } @@ -222,13 +193,13 @@ object Promise { val value = Some(resolveEither(suppliedValue)) + override def isCompleted(): Boolean = true + def tryComplete(value: Either[Throwable, T]): Boolean = false def onComplete[U](func: Either[Throwable, T] => U): this.type = { val completedAs = value.get - Future.dispatchFuture(executor, { - () => func(completedAs) - }) + Future.dispatchFuture(executor, () => func(completedAs)) this } @@ -241,19 +212,3 @@ object Promise { } } - - - - - - - - - - - - - - - - -- cgit v1.2.3