From 7993ec04baf28cd12009d15979c2c904afad89d3 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Fri, 13 Jan 2012 19:32:48 +0100 Subject: Migrate akka promises. Changes to some of the interfaces. --- src/library/scala/concurrent/Awaitable.scala | 7 +- .../scala/concurrent/ExecutionContext.scala | 2 +- src/library/scala/concurrent/Future.scala | 24 ++- src/library/scala/concurrent/Promise.scala | 18 +-- .../scala/concurrent/akka/AbstractPromise.java | 21 +++ src/library/scala/concurrent/akka/Promise.scala | 165 ++++++++++++++++++++- src/library/scala/concurrent/akka/package.scala | 3 + .../scala/concurrent/default/TaskImpl.scala | 17 +-- src/library/scala/concurrent/package.scala | 32 ++-- test/files/jvm/scala-concurrent-tck.scala | 12 +- 10 files changed, 250 insertions(+), 51 deletions(-) create mode 100644 src/library/scala/concurrent/akka/AbstractPromise.java diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala index 52fd3b9516..c38e668f30 100644 --- a/src/library/scala/concurrent/Awaitable.scala +++ b/src/library/scala/concurrent/Awaitable.scala @@ -11,15 +11,14 @@ package scala.concurrent import scala.annotation.implicitNotFound -import scala.util.Timeout +import scala.util.Duration trait Awaitable[+T] { - @implicitNotFound(msg = "Waiting must be done by calling `await(timeout) b`, where `b` is the `Awaitable` object.") - def await(timeout: Timeout)(implicit canawait: CanAwait): T + @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.") + def await(atMost: Duration)(implicit canawait: CanAwait): T } - diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index ebeeca995e..38a28044e1 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -23,7 +23,7 @@ trait ExecutionContext { def future[T](body: => T): Future[T] /** Only callable from the tasks running on the same execution context. */ - def blockingCall[T](timeout: Timeout, body: Awaitable[T]): T + def blockingCall[T](body: Awaitable[T]): T } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4002239fc4..e6edaea87a 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -28,6 +28,18 @@ import scala.collection.generic.CanBuildFrom /** The trait that represents futures. + * + * Asynchronous computations that yield futures are created with the `future` call: + * + * {{{ + * val s = "Hello" + * val f: Future[String] = future { + * s + " future!" + * } + * f onSuccess { + * case msg => println(msg) + * } + * }}} * * @define multipleCallbacks * Multiple callbacks may be registered; there is no guarantee that they will be @@ -37,12 +49,14 @@ import scala.collection.generic.CanBuildFrom * The future may contain a throwable object and this means that the future failed. * Futures obtained through combinators have the same exception as the future they were obtained from. * The following throwable objects are not contained in the future: - * - Error - errors are not contained within futures - * - scala.util.control.ControlThrowable - not contained within futures - * - InterruptedException - not contained within futures + * - `Error` - errors are not contained within futures + * - `InterruptedException` - not contained within futures + * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures * * Instead, the future is completed with a ExecutionException with one of the exceptions above * as the cause. + * If a future is failed with a `scala.runtime.NonLocalReturnControl`, + * it is completed with a value instead from that throwable instead instead. * * @define forComprehensionExamples * Example: @@ -146,10 +160,10 @@ self => } this } - def await(timeout: Timeout)(implicit canawait: CanAwait): Throwable = { + def await(atMost: Duration)(implicit canawait: CanAwait): Throwable = { var t: Throwable = null try { - val res = self.await(timeout) + val res = self.await(atMost) t = noSuchElem(res) } catch { case t: Throwable => return t diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index e5557ae1c3..c3fa92053b 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -78,7 +78,7 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def trySuccess(value: T): Boolean + def trySuccess(value: T): Boolean = tryComplete(Right(value)) /** Completes the promise with an exception. * @@ -96,7 +96,7 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def tryFailure(t: Throwable): Boolean + def tryFailure(t: Throwable): Boolean = tryComplete(Left(t)) /** Wraps a `Throwable` in an `ExecutionException` if necessary. * @@ -112,15 +112,7 @@ trait Promise[T] { object Promise { - /* - /** - * Creates a non-completed, new, Promise with the supplied timeout in milliseconds - */ - def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = DefaultPromise[A](timeout) - - /** - * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf) - */ - def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout) - */ + + + } diff --git a/src/library/scala/concurrent/akka/AbstractPromise.java b/src/library/scala/concurrent/akka/AbstractPromise.java new file mode 100644 index 0000000000..38c74edf2f --- /dev/null +++ b/src/library/scala/concurrent/akka/AbstractPromise.java @@ -0,0 +1,21 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.akka; + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + + + +abstract class AbstractPromise { + private volatile Object _ref = null; + protected final static AtomicReferenceFieldUpdater updater = + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +} diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala index a47dee48e2..d3b93b9573 100644 --- a/src/library/scala/concurrent/akka/Promise.scala +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -10,14 +10,19 @@ package scala.concurrent.akka -import scala.concurrent.{ExecutionContext, resolver} +import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import scala.concurrent.{Awaitable, ExecutionContext, resolver, blocking, CanAwait, TimeoutException} import scala.util.continuations._ +import scala.util.Duration +import scala.annotation.tailrec trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { // TODO refine answer and return types here from Any to type parameters + // then move this up in the hierarchy final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => @@ -61,3 +66,161 @@ trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { } + +object Promise { + + def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] + + /** Represents the internal state. + */ + sealed trait FState[+T] { def value: Option[Either[Throwable, T]] } + + case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] { + def value: Option[Either[Throwable, T]] = None + } + + case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def result: T = value.get.right.get + } + + case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def exception: Throwable = value.get.left.get + } + + private val emptyPendingValue = Pending[Nothing](Nil) + + /* default promise implementation */ + abstract class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { + self => + + updater.set(this, Promise.EmptyPending()) + + protected final def tryAwait(atMost: Duration): Boolean = { + @tailrec + def awaitUnsafe(waitTimeNanos: Long): Boolean = { + if (value.isEmpty && waitTimeNanos > 0) { + val ms = NANOSECONDS.toMillis(waitTimeNanos) + val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec + val start = System.nanoTime() + try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + + awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) + } else + value.isDefined + } + + executor.blockingCall(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost)))) + } + + private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + if (value.isDefined || tryAwait(atMost)) this + else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + + def await(atMost: Duration)(implicit permit: CanAwait): T = + ready(atMost).value.get match { + case Left(e) => throw e + case Right(r) => r + } + + def value: Option[Either[Throwable, T]] = getState.value + + @inline + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + + @inline + protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + + @inline + protected final def getState: FState[T] = updater.get(this) + + /* + def tryComplete(value: Either[Throwable, T]): Boolean = { + val callbacks: List[Either[Throwable, T] => Unit] = { + try { + @tailrec + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { + getState match { + case cur @ Pending(listeners) => + if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners + else tryComplete(v) + case _ => null + } + } + tryComplete(resolve(value)) + } finally { + synchronized { notifyAll() } //Notify any evil blockers + } + } + + callbacks match { + case null => false + case cs if cs.isEmpty => true + case cs => Future.dispatchTask(() => cs.foreach(f => notifyCompleted(f, value))); true + } + } + + def onComplete(func: Either[Throwable, T] => Unit): this.type = { + @tailrec //Returns whether the future has already been completed or not + def tryAddCallback(): Boolean = { + val cur = getState + cur match { + case _: Success[_] | _: Failure[_] => true + case p: Pending[_] => + val pt = p.asInstanceOf[Pending[T]] + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + } + } + + if (tryAddCallback()) { + val result = value.get + Future.dispatchTask(() => notifyCompleted(func, result)) + } + + this + } + + private final def notifyCompleted(func: Either[Throwable, T] => Unit, result: Either[Throwable, T]) { + try { func(result) } catch { case e => logError("Future onComplete-callback raised an exception", e) } + } + */ + } + + /* + /** + * An already completed Future is seeded with it's result at creation, is useful for when you are participating in + * a Future-composition but you already have a value to contribute. + */ + final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { + val value = Some(resolve(suppliedValue)) + + def tryComplete(value: Either[Throwable, T]): Boolean = false + def onComplete(func: Either[Throwable, T] => Unit): this.type = { + val completedAs = value.get + Future dispatchTask (() => func(completedAs)) + this + } + + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { + case Left(e) => throw e + case Right(r) => r + } + } + */ +} + + + + + + + + + + + + + + + + diff --git a/src/library/scala/concurrent/akka/package.scala b/src/library/scala/concurrent/akka/package.scala index 59eda5a3b4..8c059b8e71 100644 --- a/src/library/scala/concurrent/akka/package.scala +++ b/src/library/scala/concurrent/akka/package.scala @@ -11,6 +11,7 @@ package scala.concurrent import java.{lang => jl} +import scala.util.Duration @@ -31,6 +32,8 @@ package object akka { if (c.isPrimitive) toBoxed(c) else c } + def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue + } diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index 716b9c02f1..a38541df5d 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -5,7 +5,7 @@ package default import java.util.concurrent.atomic.AtomicReferenceFieldUpdater import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread } -import scala.util.{ Timeout, Duration } +import scala.util.Duration import scala.annotation.tailrec @@ -90,7 +90,7 @@ extends Promise[T] with Future[T] with Completable[T] { case Right(v) => trySuccess(v) } - def trySuccess(value: T): Boolean = { + override def trySuccess(value: T): Boolean = { val cbs = tryCompleteState(Success(value)) if (cbs == null) false @@ -103,7 +103,7 @@ extends Promise[T] with Future[T] with Completable[T] { } } - def tryFailure(t: Throwable): Boolean = { + override def tryFailure(t: Throwable): Boolean = { val wrapped = wrap(t) val cbs = tryCompleteState(Failure(wrapped)) if (cbs == null) @@ -117,7 +117,7 @@ extends Promise[T] with Future[T] with Completable[T] { } } - def await(timeout: Timeout)(implicit canawait: scala.concurrent.CanAwait): T = getState match { + def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match { case Success(res) => res case Failure(t) => throw t case _ => @@ -191,7 +191,7 @@ extends RecursiveAction with Task[T] with Future[T] with Completable[T] { def tryCancel(): Unit = tryUnfork() - def await(timeout: Timeout)(implicit canawait: CanAwait): T = { + def await(atMost: Duration)(implicit canawait: CanAwait): T = { join() // TODO handle timeout also (updater.get(this): @unchecked) match { case Success(r) => r @@ -257,17 +257,16 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { def promise[T]: Promise[T] = new PromiseImpl[T](this) - // TODO fix the timeout - def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match { + def blockingCall[T](b: Awaitable[T]): T = b match { case fj: TaskImpl[_] if fj.executor.pool eq pool => - fj.await(timeout) + fj.await(Duration.fromNanos(0)) case _ => var res: T = null.asInstanceOf[T] @volatile var blockingDone = false // TODO add exception handling here! val mb = new ForkJoinPool.ManagedBlocker { def block() = { - res = b.await(timeout)(CanAwaitEvidence) + res = b.await(Duration.fromNanos(0))(CanAwaitEvidence) blockingDone = true true } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index ce22c53c72..7552100af2 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -52,15 +52,17 @@ package object concurrent { private[concurrent] def resolve[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T]) + case Left(t: scala.util.control.ControlThrowable) => Left(new ExecutionException("Boxed ControlThrowable", t)) case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t)) - case Left(e: Error) => throw e + case Left(e: Error) => Left(new ExecutionException("Boxed Error", e)) case _ => source } private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = { case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value) + case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t)) case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) - case e: Error => throw e + case e: Error => Left(new ExecutionException("Boxed Error", e)) case t => Left(t) } @@ -83,9 +85,12 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def await[T](atMost: Duration)(body: =>T): T = result(new Awaitable[T] { - def await(timeout: Timeout)(implicit cb: CanAwait) = body - }, atMost) + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + /** Wraps a block of code into an awaitable object. */ + def body2awaitable[T](body: =>T) = new Awaitable[T] { + def await(atMost: Duration)(implicit cb: CanAwait) = body + } /** Blocks on a blockable object. * @@ -96,18 +101,23 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def result[T](awaitable: Awaitable[T], atMost: Duration): T = { + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { currentExecutionContext.get match { case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(atMost, awaitable) // inside an execution context thread + case x => x.blockingCall(awaitable) // inside an execution context thread } } - def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = { - result(awaitable, atMost) - awaitable + object await { + def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = { + try blocking(awaitable, atMost) + catch { case _ => } + awaitable + } + + def result[T](awaitable: Awaitable[T], atMost: Duration): T = blocking(awaitable, atMost) } - + } diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index abd363cedf..a951c09da2 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -10,9 +10,7 @@ import scala.concurrent.{ } import scala.concurrent.future import scala.concurrent.promise -import scala.concurrent.await -import scala.concurrent.result -import scala.concurrent.ready +import scala.concurrent.blocking import scala.util.Duration @@ -314,7 +312,7 @@ trait FutureProjections extends TestBase { val f = future { throw cause } - assert(result(f.failed, Duration(500, "ms")) == cause) + assert(blocking(f.failed, Duration(500, "ms")) == cause) done() } @@ -322,7 +320,7 @@ trait FutureProjections extends TestBase { done => val f = future { 0 } try { - ready(f.failed, Duration(0, "ms")) + blocking(f.failed, Duration(0, "ms")) assert(false) } catch { case nsee: NoSuchElementException => done() @@ -344,7 +342,7 @@ trait Blocking extends TestBase { def testAwaitSuccess(): Unit = once { done => val f = future { 0 } - ready(f, Duration(500, "ms")) + blocking(f, Duration(500, "ms")) done() } @@ -355,7 +353,7 @@ trait Blocking extends TestBase { throw cause } try { - ready(f, Duration(500, "ms")) + blocking(f, Duration(500, "ms")) assert(false) } catch { case t => -- cgit v1.2.3