summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Klang <viktor.klang@gmail.com>2012-04-13 15:38:50 +0200
committerViktor Klang <viktor.klang@gmail.com>2012-04-13 15:38:50 +0200
commit7c049a15f6cb3992abc6debabe2b53b2097ffb8a (patch)
treed3fb6209c8c18fd9a840a0e9978344d4816af174
parent225d205f83ceb7fc6f0af005f0085bf7ab493b38 (diff)
downloadscala-7c049a15f6cb3992abc6debabe2b53b2097ffb8a.tar.gz
scala-7c049a15f6cb3992abc6debabe2b53b2097ffb8a.tar.bz2
scala-7c049a15f6cb3992abc6debabe2b53b2097ffb8a.zip
Updating to latest version of Akka's DefaultPromise
-rw-r--r--src/library/scala/concurrent/Future.scala20
-rw-r--r--src/library/scala/concurrent/impl/AbstractPromise.java6
-rw-r--r--src/library/scala/concurrent/impl/Future.scala2
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala135
4 files changed, 58 insertions, 105 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 1463dbcebf..16432f6aac 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -511,16 +511,15 @@ trait Future[+T] extends Awaitable[T] {
* Note: using this method yields nondeterministic dataflow programs.
*/
object Future {
-
- /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
- *
- * The result becomes available once the asynchronous computation is completed.
- *
- * @tparam T the type of the result
- * @param body the asychronous computation
- * @param execctx the execution context on which the future is run
- * @return the `Future` holding the result of the computation
- */
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ *
+ * The result becomes available once the asynchronous computation is completed.
+ *
+ * @tparam T the type of the result
+ * @param body the asychronous computation
+ * @param execctx the execution context on which the future is run
+ * @return the `Future` holding the result of the computation
+ */
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body)
import scala.collection.mutable.Builder
@@ -614,4 +613,3 @@ object Future {
-
diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java
index 5280d67854..8aac5de042 100644
--- a/src/library/scala/concurrent/impl/AbstractPromise.java
+++ b/src/library/scala/concurrent/impl/AbstractPromise.java
@@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
abstract class AbstractPromise {
- private volatile Object _ref = null;
+ private volatile Object _ref;
protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
- AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
-}
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+} \ No newline at end of file
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index a3c8ed3095..72ffa6a014 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -28,7 +28,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
/** Tests whether this Future has been completed.
*/
- final def isCompleted: Boolean = value.isDefined
+ def isCompleted: Boolean
/** The contained value of this Future. Before this Future is completed
* the value will be None. After completion the value will be Some(Right(t))
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 07b6d1f278..ef87f27d63 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -74,37 +74,10 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
object Promise {
-
- def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue
-
- def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
-
- /** Represents the internal state.
- *
- * [adriaan] it's unsound to make FState covariant (tryComplete won't type check)
- */
- sealed trait FState[T] { def value: Option[Either[Throwable, T]] }
-
- case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] {
- def value: Option[Either[Throwable, T]] = None
- }
-
- case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
- def result: T = value.get.right.get
- }
-
- case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
- def exception: Throwable = value.get.left.get
- }
-
- private val emptyPendingValue = Pending[Nothing](Nil)
-
/** Default promise implementation.
*/
- class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
- self =>
-
- updater.set(this, Promise.EmptyPending())
+ class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self =>
+ updater.set(this, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU
protected final def tryAwait(atMost: Duration): Boolean = {
@tailrec
@@ -115,7 +88,7 @@ object Promise {
val start = System.nanoTime()
try {
synchronized {
- while (value.isEmpty) wait(ms, ns)
+ while (!isCompleted) wait(ms, ns)
}
} catch {
case e: InterruptedException =>
@@ -123,93 +96,91 @@ object Promise {
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
} else
- value.isDefined
+ isCompleted
}
-
- blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
+ //FIXME do not do this if there'll be no waiting
+ blocking(Future.body2awaitable(awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)), atMost)
}
+ @throws(classOf[TimeoutException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
- if (value.isDefined || tryAwait(atMost)) this
+ if (isCompleted || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
+ @throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case Left(e) => throw e
case Right(r) => r
}
- def value: Option[Either[Throwable, T]] = getState.value
+ def value: Option[Either[Throwable, T]] = getState match {
+ case _: List[_] ⇒ None
+ case c: Either[_, _] ⇒ Some(c.asInstanceOf[Either[Throwable, T]])
+ }
+
+ override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
+ case _: Either[_, _] ⇒ true
+ case _ ⇒ false
+ }
@inline
- private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
+ private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]]
@inline
- protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
+ protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState)
@inline
- protected final def getState: FState[T] = updater.get(this)
+ protected final def getState: AnyRef = updater.get(this)
def tryComplete(value: Either[Throwable, T]): Boolean = {
- val callbacks: List[Either[Throwable, T] => Any] = {
+ val callbacks: List[Either[Throwable, T] ⇒ Unit] = {
try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = {
+ def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = {
getState match {
- case cur @ Pending(listeners) =>
- val newState =
- if (v.isLeft) Failure(Some(v.asInstanceOf[Left[Throwable, T]]))
- else Success(Some(v.asInstanceOf[Right[Throwable, T]]))
-
- if (updateState(cur, newState)) listeners
- else tryComplete(v)
- case _ => null
+ case raw: List[_] ⇒
+ val cur = raw.asInstanceOf[List[Either[Throwable, T] ⇒ Unit]]
+ if (updateState(cur, v)) cur else tryComplete(v)
+ case _ ⇒ null
}
}
tryComplete(resolveEither(value))
} finally {
- synchronized { notifyAll() } // notify any blockers from `tryAwait`
+ synchronized { notifyAll() } //Notify any evil blockers
}
}
callbacks match {
- case null => false
- case cs if cs.isEmpty => true
- case cs =>
- Future.dispatchFuture(executor, {
- () => cs.foreach(f => notifyCompleted(f, value))
- })
- true
+ case null ⇒ false
+ case cs if cs.isEmpty ⇒ true
+ case cs ⇒ Future.dispatchFuture(executor, () ⇒ cs.foreach(f ⇒ notifyCompleted(f, value))); true
}
}
- def onComplete[U](func: Either[Throwable, T] => U): this.type = {
- @tailrec // Returns whether the future has already been completed or not
- def tryAddCallback(): Boolean = {
+ def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = {
+ @tailrec //Returns whether the future has already been completed or not
+ def tryAddCallback(): Either[Throwable, T] = {
val cur = getState
cur match {
- case _: Success[_] | _: Failure[_] => true
- case p: Pending[_] =>
- val pt = p.asInstanceOf[Pending[T]]
- if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
+ case r: Either[_, _] ⇒ r.asInstanceOf[Either[Throwable, T]]
+ case listeners: List[_] ⇒ if (updateState(listeners, func :: listeners)) null else tryAddCallback()
}
}
- if (tryAddCallback()) {
- val result = value.get
- Future.dispatchFuture(executor, {
- () => notifyCompleted(func, result)
- })
+ tryAddCallback() match {
+ case null ⇒ this
+ case completed ⇒
+ Future.dispatchFuture(executor, () ⇒ notifyCompleted(func, completed))
+ this
}
-
- this
}
private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
try {
func(result)
} catch {
- case e => executor.reportFailure(e)
+ case /*NonFatal(*/e/*)*/ => executor.reportFailure(e)
}
}
}
@@ -222,13 +193,13 @@ object Promise {
val value = Some(resolveEither(suppliedValue))
+ override def isCompleted(): Boolean = true
+
def tryComplete(value: Either[Throwable, T]): Boolean = false
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
val completedAs = value.get
- Future.dispatchFuture(executor, {
- () => func(completedAs)
- })
+ Future.dispatchFuture(executor, () => func(completedAs))
this
}
@@ -241,19 +212,3 @@ object Promise {
}
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-