summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bincompat-forward.whitelist.conf4
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala85
2 files changed, 46 insertions, 43 deletions
diff --git a/bincompat-forward.whitelist.conf b/bincompat-forward.whitelist.conf
index fd6451d3bc..83864f48ee 100644
--- a/bincompat-forward.whitelist.conf
+++ b/bincompat-forward.whitelist.conf
@@ -467,6 +467,10 @@ filter {
{
matchName="scala.concurrent.forkjoin.ForkJoinPool.helpJoinOnce"
problemName=IncompatibleResultTypeProblem
+ },
+ {
+ matchName="scala.concurrent.impl.Promise$CompletionLatch"
+ problemName=MissingClassProblem
}
]
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 52f1075137..ed4039e2a5 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -8,11 +8,14 @@
package scala.concurrent.impl
-import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
+import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException, blocking }
+import scala.concurrent.Future.InternalCallbackExecutor
import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration, NANOSECONDS }
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.util.{ Try, Success, Failure }
+import java.io.ObjectInputStream
+import java.util.concurrent.locks.AbstractQueuedSynchronizer
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
def future: this.type = this
@@ -52,70 +55,69 @@ private[concurrent] object Promise {
case e: Error => Failure(new ExecutionException("Boxed Error", e))
case t => Failure(t)
}
+
+ /*
+ * Inspired by: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ */
+ private final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) {
+ override protected def tryAcquireShared(ignored: Int): Int = if (getState != 0) 1 else -1
+ override protected def tryReleaseShared(ignore: Int): Boolean = {
+ setState(1)
+ true
+ }
+ override def apply(ignored: Try[T]): Unit = releaseShared(1)
+ }
+
/** Default promise implementation.
*/
class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
updateState(null, Nil) // Start at "No callbacks"
- protected final def tryAwait(atMost: Duration): Boolean = {
- @tailrec
- def awaitUnsafe(deadline: Deadline, nextWait: FiniteDuration): Boolean = {
- if (!isCompleted && nextWait > Duration.Zero) {
- val ms = nextWait.toMillis
- val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec
-
- synchronized { if (!isCompleted) wait(ms, ns) }
-
- awaitUnsafe(deadline, deadline.timeLeft)
- } else
- isCompleted
- }
- @tailrec
- def awaitUnbounded(): Boolean = {
- if (isCompleted) true
- else {
- synchronized { if (!isCompleted) wait() }
- awaitUnbounded()
- }
- }
-
+ protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) {
import Duration.Undefined
+ import scala.concurrent.Future.InternalCallbackExecutor
atMost match {
- case u if u eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
- case Duration.Inf => awaitUnbounded
- case Duration.MinusInf => isCompleted
- case f: FiniteDuration => if (f > Duration.Zero) awaitUnsafe(f.fromNow, f) else isCompleted
+ case e if e eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
+ case Duration.Inf =>
+ val l = new CompletionLatch[T]()
+ onComplete(l)(InternalCallbackExecutor)
+ l.acquireSharedInterruptibly(1)
+ case Duration.MinusInf => // Drop out
+ case f: FiniteDuration =>
+ if (f > Duration.Zero) {
+ val l = new CompletionLatch[T]()
+ onComplete(l)(InternalCallbackExecutor)
+ l.tryAcquireSharedNanos(1, f.toNanos)
+ }
}
- }
+
+ isCompleted
+ } else true // Already completed
@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
- if (isCompleted || tryAwait(atMost)) this
+ if (tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost + "]")
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
- ready(atMost).value.get match {
- case Failure(e) => throw e
- case Success(r) => r
- }
+ ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here
def value: Option[Try[T]] = getState match {
case c: Try[_] => Some(c.asInstanceOf[Try[T]])
case _ => None
}
- override def isCompleted: Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
- case _: Try[_] => true
- case _ => false
- }
+ override def isCompleted: Boolean = getState.isInstanceOf[Try[_]]
def tryComplete(value: Try[T]): Boolean = {
val resolved = resolveTry(value)
- (try {
- @tailrec
+ @tailrec
def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
@@ -124,10 +126,7 @@ private[concurrent] object Promise {
case _ => null
}
}
- tryComplete(resolved)
- } finally {
- synchronized { notifyAll() } //Notify any evil blockers
- }) match {
+ tryComplete(resolved) match {
case null => false
case rs if rs.isEmpty => true
case rs => rs.foreach(r => r.executeWithValue(resolved)); true