From 73d494dd4b4b45277c447b774570c52b4df67869 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 18 Feb 2013 17:58:08 +0100 Subject: Reimplementing much of the DefaultPromise methods Optimizations: 1) Avoiding to call 'synchronized' in tryComplete and in tryAwait 2) Implementing blocking by using an optimized latch so no blocking ops for non-blockers 3) Reducing method size of isCompleted to be cheaper to inline 4) 'result' to use Try.get instead of patmat --- src/library/scala/concurrent/impl/Promise.scala | 85 ++++++++++++------------- 1 file changed, 42 insertions(+), 43 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 52f1075137..ed4039e2a5 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -8,11 +8,14 @@ package scala.concurrent.impl -import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException } +import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException, blocking } +import scala.concurrent.Future.InternalCallbackExecutor import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration, NANOSECONDS } import scala.annotation.tailrec import scala.util.control.NonFatal import scala.util.{ Try, Success, Failure } +import java.io.ObjectInputStream +import java.util.concurrent.locks.AbstractQueuedSynchronizer private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this @@ -52,70 +55,69 @@ private[concurrent] object Promise { case e: Error => Failure(new ExecutionException("Boxed Error", e)) case t => Failure(t) } + + /* + * Inspired by: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + private final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) { + override protected def tryAcquireShared(ignored: Int): Int = if (getState != 0) 1 else -1 + override protected def tryReleaseShared(ignore: Int): Boolean = { + setState(1) + true + } + override def apply(ignored: Try[T]): Unit = releaseShared(1) + } + /** Default promise implementation. */ class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => updateState(null, Nil) // Start at "No callbacks" - protected final def tryAwait(atMost: Duration): Boolean = { - @tailrec - def awaitUnsafe(deadline: Deadline, nextWait: FiniteDuration): Boolean = { - if (!isCompleted && nextWait > Duration.Zero) { - val ms = nextWait.toMillis - val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec - - synchronized { if (!isCompleted) wait(ms, ns) } - - awaitUnsafe(deadline, deadline.timeLeft) - } else - isCompleted - } - @tailrec - def awaitUnbounded(): Boolean = { - if (isCompleted) true - else { - synchronized { if (!isCompleted) wait() } - awaitUnbounded() - } - } - + protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) { import Duration.Undefined + import scala.concurrent.Future.InternalCallbackExecutor atMost match { - case u if u eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period") - case Duration.Inf => awaitUnbounded - case Duration.MinusInf => isCompleted - case f: FiniteDuration => if (f > Duration.Zero) awaitUnsafe(f.fromNow, f) else isCompleted + case e if e eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period") + case Duration.Inf => + val l = new CompletionLatch[T]() + onComplete(l)(InternalCallbackExecutor) + l.acquireSharedInterruptibly(1) + case Duration.MinusInf => // Drop out + case f: FiniteDuration => + if (f > Duration.Zero) { + val l = new CompletionLatch[T]() + onComplete(l)(InternalCallbackExecutor) + l.tryAcquireSharedNanos(1, f.toNanos) + } } - } + + isCompleted + } else true // Already completed @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (isCompleted || tryAwait(atMost)) this + if (tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost + "]") @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = - ready(atMost).value.get match { - case Failure(e) => throw e - case Success(r) => r - } + ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here def value: Option[Try[T]] = getState match { case c: Try[_] => Some(c.asInstanceOf[Try[T]]) case _ => None } - override def isCompleted: Boolean = getState match { // Cheaper than boxing result into Option due to "def value" - case _: Try[_] => true - case _ => false - } + override def isCompleted: Boolean = getState.isInstanceOf[Try[_]] def tryComplete(value: Try[T]): Boolean = { val resolved = resolveTry(value) - (try { - @tailrec + @tailrec def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = { getState match { case raw: List[_] => @@ -124,10 +126,7 @@ private[concurrent] object Promise { case _ => null } } - tryComplete(resolved) - } finally { - synchronized { notifyAll() } //Notify any evil blockers - }) match { + tryComplete(resolved) match { case null => false case rs if rs.isEmpty => true case rs => rs.foreach(r => r.executeWithValue(resolved)); true -- cgit v1.2.3