From 7c049a15f6cb3992abc6debabe2b53b2097ffb8a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Apr 2012 15:38:50 +0200 Subject: Updating to latest version of Akka's DefaultPromise --- src/library/scala/concurrent/Future.scala | 20 ++- .../scala/concurrent/impl/AbstractPromise.java | 6 +- src/library/scala/concurrent/impl/Future.scala | 2 +- src/library/scala/concurrent/impl/Promise.scala | 135 +++++++-------------- 4 files changed, 58 insertions(+), 105 deletions(-) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 1463dbcebf..16432f6aac 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -511,16 +511,15 @@ trait Future[+T] extends Awaitable[T] { * Note: using this method yields nondeterministic dataflow programs. */ object Future { - - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. - * - * The result becomes available once the asynchronous computation is completed. - * - * @tparam T the type of the result - * @param body the asychronous computation - * @param execctx the execution context on which the future is run - * @return the `Future` holding the result of the computation - */ + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) import scala.collection.mutable.Builder @@ -614,4 +613,3 @@ object Future { - diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java index 5280d67854..8aac5de042 100644 --- a/src/library/scala/concurrent/impl/AbstractPromise.java +++ b/src/library/scala/concurrent/impl/AbstractPromise.java @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; abstract class AbstractPromise { - private volatile Object _ref = null; + private volatile Object _ref; protected final static AtomicReferenceFieldUpdater updater = - AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); -} + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +} \ No newline at end of file diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index a3c8ed3095..72ffa6a014 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -28,7 +28,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa /** Tests whether this Future has been completed. */ - final def isCompleted: Boolean = value.isDefined + def isCompleted: Boolean /** The contained value of this Future. Before this Future is completed * the value will be None. After completion the value will be Some(Right(t)) diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 07b6d1f278..ef87f27d63 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -74,37 +74,10 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu object Promise { - - def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue - - def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] - - /** Represents the internal state. - * - * [adriaan] it's unsound to make FState covariant (tryComplete won't type check) - */ - sealed trait FState[T] { def value: Option[Either[Throwable, T]] } - - case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] { - def value: Option[Either[Throwable, T]] = None - } - - case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def result: T = value.get.right.get - } - - case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def exception: Throwable = value.get.left.get - } - - private val emptyPendingValue = Pending[Nothing](Nil) - /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { - self => - - updater.set(this, Promise.EmptyPending()) + class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => + updater.set(this, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU protected final def tryAwait(atMost: Duration): Boolean = { @tailrec @@ -115,7 +88,7 @@ object Promise { val start = System.nanoTime() try { synchronized { - while (value.isEmpty) wait(ms, ns) + while (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException => @@ -123,93 +96,91 @@ object Promise { awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) } else - value.isDefined + isCompleted } - - blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) + //FIXME do not do this if there'll be no waiting + blocking(Future.body2awaitable(awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)), atMost) } + @throws(classOf[TimeoutException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (value.isDefined || tryAwait(atMost)) this + if (isCompleted || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { case Left(e) => throw e case Right(r) => r } - def value: Option[Either[Throwable, T]] = getState.value + def value: Option[Either[Throwable, T]] = getState match { + case _: List[_] ⇒ None + case c: Either[_, _] ⇒ Some(c.asInstanceOf[Either[Throwable, T]]) + } + + override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value" + case _: Either[_, _] ⇒ true + case _ ⇒ false + } @inline - private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]] @inline - protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState) @inline - protected final def getState: FState[T] = updater.get(this) + protected final def getState: AnyRef = updater.get(this) def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] => Any] = { + val callbacks: List[Either[Throwable, T] ⇒ Unit] = { try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = { + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = { getState match { - case cur @ Pending(listeners) => - val newState = - if (v.isLeft) Failure(Some(v.asInstanceOf[Left[Throwable, T]])) - else Success(Some(v.asInstanceOf[Right[Throwable, T]])) - - if (updateState(cur, newState)) listeners - else tryComplete(v) - case _ => null + case raw: List[_] ⇒ + val cur = raw.asInstanceOf[List[Either[Throwable, T] ⇒ Unit]] + if (updateState(cur, v)) cur else tryComplete(v) + case _ ⇒ null } } tryComplete(resolveEither(value)) } finally { - synchronized { notifyAll() } // notify any blockers from `tryAwait` + synchronized { notifyAll() } //Notify any evil blockers } } callbacks match { - case null => false - case cs if cs.isEmpty => true - case cs => - Future.dispatchFuture(executor, { - () => cs.foreach(f => notifyCompleted(f, value)) - }) - true + case null ⇒ false + case cs if cs.isEmpty ⇒ true + case cs ⇒ Future.dispatchFuture(executor, () ⇒ cs.foreach(f ⇒ notifyCompleted(f, value))); true } } - def onComplete[U](func: Either[Throwable, T] => U): this.type = { - @tailrec // Returns whether the future has already been completed or not - def tryAddCallback(): Boolean = { + def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { + @tailrec //Returns whether the future has already been completed or not + def tryAddCallback(): Either[Throwable, T] = { val cur = getState cur match { - case _: Success[_] | _: Failure[_] => true - case p: Pending[_] => - val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + case r: Either[_, _] ⇒ r.asInstanceOf[Either[Throwable, T]] + case listeners: List[_] ⇒ if (updateState(listeners, func :: listeners)) null else tryAddCallback() } } - if (tryAddCallback()) { - val result = value.get - Future.dispatchFuture(executor, { - () => notifyCompleted(func, result) - }) + tryAddCallback() match { + case null ⇒ this + case completed ⇒ + Future.dispatchFuture(executor, () ⇒ notifyCompleted(func, completed)) + this } - - this } private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { try { func(result) } catch { - case e => executor.reportFailure(e) + case /*NonFatal(*/e/*)*/ => executor.reportFailure(e) } } } @@ -222,13 +193,13 @@ object Promise { val value = Some(resolveEither(suppliedValue)) + override def isCompleted(): Boolean = true + def tryComplete(value: Either[Throwable, T]): Boolean = false def onComplete[U](func: Either[Throwable, T] => U): this.type = { val completedAs = value.get - Future.dispatchFuture(executor, { - () => func(completedAs) - }) + Future.dispatchFuture(executor, () => func(completedAs)) this } @@ -241,19 +212,3 @@ object Promise { } } - - - - - - - - - - - - - - - - -- cgit v1.2.3 From a2115b2352700dfc32a99663086871a2cd192685 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Apr 2012 15:57:10 +0200 Subject: Adding NonFatal and Unsafe to scala.concurrent.impl --- src/library/scala/concurrent/impl/NonFatal.scala | 36 ++++++++++++++++++++++++ src/library/scala/concurrent/impl/Unsafe.java | 32 +++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 src/library/scala/concurrent/impl/NonFatal.scala create mode 100644 src/library/scala/concurrent/impl/Unsafe.java diff --git a/src/library/scala/concurrent/impl/NonFatal.scala b/src/library/scala/concurrent/impl/NonFatal.scala new file mode 100644 index 0000000000..ac0cddaf1b --- /dev/null +++ b/src/library/scala/concurrent/impl/NonFatal.scala @@ -0,0 +1,36 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + +/** + * Extractor of non-fatal Throwables. Will not match fatal errors + * like VirtualMachineError (OutOfMemoryError) + * ThreadDeath, LinkageError and InterruptedException. + * StackOverflowError is matched, i.e. considered non-fatal. + * + * Usage to catch all harmless throwables: + * {{{ + * try { + * // dangerous stuff + * } catch { + * case NonFatal(e) => log.error(e, "Something not that bad") + * } + * }}} + */ +object NonFatal { + + def unapply(t: Throwable): Option[Throwable] = t match { + case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None + case e ⇒ Some(e) + } + +} + diff --git a/src/library/scala/concurrent/impl/Unsafe.java b/src/library/scala/concurrent/impl/Unsafe.java new file mode 100644 index 0000000000..3c695c3905 --- /dev/null +++ b/src/library/scala/concurrent/impl/Unsafe.java @@ -0,0 +1,32 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + + +package scala.concurrent.impl; + +import java.lang.reflect.Field; + +public final class Unsafe { + public final static sun.misc.Unsafe instance; + static { + try { + sun.misc.Unsafe found = null; + for(Field field : sun.misc.Unsafe.class.getDeclaredFields()) { + if (field.getType() == sun.misc.Unsafe.class) { + field.setAccessible(true); + found = (sun.misc.Unsafe) field.get(null); + break; + } + } + if (found == null) throw new IllegalStateException("Can't find instance of sun.misc.Unsafe"); + else instance = found; + } catch(Throwable t) { + throw new ExceptionInInitializerError(t); + } + } +} -- cgit v1.2.3 From 828aa0aaa9288c57f4574ca267a38173d15c458f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Apr 2012 17:21:52 +0200 Subject: Making sure that the ScalaDoc for Future.apply has the right names --- src/library/scala/concurrent/Future.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 16432f6aac..f331263d39 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -520,7 +520,7 @@ object Future { * @param execctx the execution context on which the future is run * @return the `Future` holding the result of the computation */ - def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) + def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom -- cgit v1.2.3 From 05779d413ed8de3717307b78cccb413f0a687101 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Apr 2012 17:33:34 +0200 Subject: Replacing all ⇒ with => in Promise.scala and harmonized def value to use same match approach as def isCompleted MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/library/scala/concurrent/impl/Promise.scala | 36 ++++++++++++------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index ef87f27d63..140cfa93a0 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -115,13 +115,13 @@ object Promise { } def value: Option[Either[Throwable, T]] = getState match { - case _: List[_] ⇒ None - case c: Either[_, _] ⇒ Some(c.asInstanceOf[Either[Throwable, T]]) + case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]]) + case _ => None } override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value" - case _: Either[_, _] ⇒ true - case _ ⇒ false + case _: Either[_, _] => true + case _ => false } @inline @@ -134,15 +134,15 @@ object Promise { protected final def getState: AnyRef = updater.get(this) def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] ⇒ Unit] = { + val callbacks: List[Either[Throwable, T] => Unit] = { try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = { + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { getState match { - case raw: List[_] ⇒ - val cur = raw.asInstanceOf[List[Either[Throwable, T] ⇒ Unit]] + case raw: List[_] => + val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]] if (updateState(cur, v)) cur else tryComplete(v) - case _ ⇒ null + case _ => null } } tryComplete(resolveEither(value)) @@ -152,26 +152,26 @@ object Promise { } callbacks match { - case null ⇒ false - case cs if cs.isEmpty ⇒ true - case cs ⇒ Future.dispatchFuture(executor, () ⇒ cs.foreach(f ⇒ notifyCompleted(f, value))); true + case null => false + case cs if cs.isEmpty => true + case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, value))); true } } - def onComplete[U](func: Either[Throwable, T] ⇒ U): this.type = { + def onComplete[U](func: Either[Throwable, T] => U): this.type = { @tailrec //Returns whether the future has already been completed or not def tryAddCallback(): Either[Throwable, T] = { val cur = getState cur match { - case r: Either[_, _] ⇒ r.asInstanceOf[Either[Throwable, T]] - case listeners: List[_] ⇒ if (updateState(listeners, func :: listeners)) null else tryAddCallback() + case r: Either[_, _] => r.asInstanceOf[Either[Throwable, T]] + case listeners: List[_] => if (updateState(listeners, func :: listeners)) null else tryAddCallback() } } tryAddCallback() match { - case null ⇒ this - case completed ⇒ - Future.dispatchFuture(executor, () ⇒ notifyCompleted(func, completed)) + case null => this + case completed => + Future.dispatchFuture(executor, () => notifyCompleted(func, completed)) this } } -- cgit v1.2.3 From af8416bb7ff4d6816f416cd0671e26e2cdc653d7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Apr 2012 18:11:38 +0200 Subject: Adding NonFatal matching as to not catch anything that should terminate the JVM, also adding EX.reportFailure where it makes sense --- src/library/scala/concurrent/Future.scala | 11 ++++++----- src/library/scala/concurrent/impl/Future.scala | 12 ++++-------- src/library/scala/concurrent/impl/Promise.scala | 2 +- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index f331263d39..40acea91c9 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -18,6 +18,7 @@ import java.{ lang => jl } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } import scala.concurrent.util.Duration +import scala.concurrent.impl.NonFatal import scala.Option import scala.annotation.tailrec @@ -203,7 +204,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => try p success f(v) catch { - case t => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -229,7 +230,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => p success v } } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -262,7 +263,7 @@ trait Future[+T] extends Awaitable[T] { if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -336,7 +337,7 @@ trait Future[+T] extends Awaitable[T] { onComplete { case Left(t) if pf isDefinedAt t => try { p success pf(t) } - catch { case t: Throwable => p complete resolver(t) } + catch { case NonFatal(t) => p complete resolver(t) } case otherwise => p complete otherwise } @@ -364,7 +365,7 @@ trait Future[+T] extends Awaitable[T] { try { p completeWith pf(t) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } case otherwise => p complete otherwise } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 72ffa6a014..ca13981163 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -77,7 +77,9 @@ object Future { try { Right(body) } catch { - case e => scala.concurrent.resolver(e) + case NonFatal(e) => + executor.reportFailure(e) + scala.concurrent.resolver(e) } } } @@ -115,13 +117,7 @@ object Future { _taskStack set taskStack while (taskStack.nonEmpty) { val next = taskStack.pop() - try { - next.apply() - } catch { - case e => - // TODO catching all and continue isn't good for OOME - executor.reportFailure(e) - } + try next() catch { case NonFatal(e) => executor reportFailure e } } } finally { _taskStack.remove() diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 140cfa93a0..1388c8b357 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -180,7 +180,7 @@ object Promise { try { func(result) } catch { - case /*NonFatal(*/e/*)*/ => executor.reportFailure(e) + case NonFatal(e) => executor reportFailure e } } } -- cgit v1.2.3 From 31771bdc6246c0b5c58ec8ded4d7bb411eb15fff Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Apr 2012 18:23:16 +0200 Subject: Adding FIXME about putting mixed beans in the EC-basket --- src/library/scala/concurrent/impl/Future.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index ca13981163..1cc9e95463 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -109,7 +109,7 @@ object Future { private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit = _taskStack.get match { - case stack if (stack ne null) && !force => stack push task + case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373 case _ => executor.execute(new Runnable { def run() { try { -- cgit v1.2.3