summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHeather Miller <heather.miller@epfl.ch>2012-04-14 08:45:17 -0700
committerHeather Miller <heather.miller@epfl.ch>2012-04-14 08:45:17 -0700
commit0e8db083f63177a97f3d27024c016c6143573838 (patch)
treef405936e5e7e2310a4987e36dedf5f301d180a91
parent412885a92e4621af33bbb2c265627f5854e30ba1 (diff)
parent31771bdc6246c0b5c58ec8ded4d7bb411eb15fff (diff)
downloadscala-0e8db083f63177a97f3d27024c016c6143573838.tar.gz
scala-0e8db083f63177a97f3d27024c016c6143573838.tar.bz2
scala-0e8db083f63177a97f3d27024c016c6143573838.zip
Merge pull request #14 from viktorklang/wip-sip14-fixes
Wip sip14 fixes
-rw-r--r--src/library/scala/concurrent/Future.scala33
-rw-r--r--src/library/scala/concurrent/impl/AbstractPromise.java6
-rw-r--r--src/library/scala/concurrent/impl/Future.scala16
-rw-r--r--src/library/scala/concurrent/impl/NonFatal.scala36
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala127
-rw-r--r--src/library/scala/concurrent/impl/Unsafe.java32
6 files changed, 134 insertions, 116 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 1463dbcebf..40acea91c9 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -18,6 +18,7 @@ import java.{ lang => jl }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import scala.concurrent.util.Duration
+import scala.concurrent.impl.NonFatal
import scala.Option
import scala.annotation.tailrec
@@ -203,7 +204,7 @@ trait Future[+T] extends Awaitable[T] {
case Right(v) =>
try p success f(v)
catch {
- case t => p complete resolver(t)
+ case NonFatal(t) => p complete resolver(t)
}
}
@@ -229,7 +230,7 @@ trait Future[+T] extends Awaitable[T] {
case Right(v) => p success v
}
} catch {
- case t: Throwable => p complete resolver(t)
+ case NonFatal(t) => p complete resolver(t)
}
}
@@ -262,7 +263,7 @@ trait Future[+T] extends Awaitable[T] {
if (pred(v)) p success v
else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
} catch {
- case t: Throwable => p complete resolver(t)
+ case NonFatal(t) => p complete resolver(t)
}
}
@@ -336,7 +337,7 @@ trait Future[+T] extends Awaitable[T] {
onComplete {
case Left(t) if pf isDefinedAt t =>
try { p success pf(t) }
- catch { case t: Throwable => p complete resolver(t) }
+ catch { case NonFatal(t) => p complete resolver(t) }
case otherwise => p complete otherwise
}
@@ -364,7 +365,7 @@ trait Future[+T] extends Awaitable[T] {
try {
p completeWith pf(t)
} catch {
- case t: Throwable => p complete resolver(t)
+ case NonFatal(t) => p complete resolver(t)
}
case otherwise => p complete otherwise
}
@@ -511,17 +512,16 @@ trait Future[+T] extends Awaitable[T] {
* Note: using this method yields nondeterministic dataflow programs.
*/
object Future {
-
- /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
- *
- * The result becomes available once the asynchronous computation is completed.
- *
- * @tparam T the type of the result
- * @param body the asychronous computation
- * @param execctx the execution context on which the future is run
- * @return the `Future` holding the result of the computation
- */
- def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body)
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ *
+ * The result becomes available once the asynchronous computation is completed.
+ *
+ * @tparam T the type of the result
+ * @param body the asychronous computation
+ * @param execctx the execution context on which the future is run
+ * @return the `Future` holding the result of the computation
+ */
+ def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
@@ -614,4 +614,3 @@ object Future {
-
diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java
index 5280d67854..8aac5de042 100644
--- a/src/library/scala/concurrent/impl/AbstractPromise.java
+++ b/src/library/scala/concurrent/impl/AbstractPromise.java
@@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
abstract class AbstractPromise {
- private volatile Object _ref = null;
+ private volatile Object _ref;
protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
- AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
-}
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+} \ No newline at end of file
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index a3c8ed3095..1cc9e95463 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -28,7 +28,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
/** Tests whether this Future has been completed.
*/
- final def isCompleted: Boolean = value.isDefined
+ def isCompleted: Boolean
/** The contained value of this Future. Before this Future is completed
* the value will be None. After completion the value will be Some(Right(t))
@@ -77,7 +77,9 @@ object Future {
try {
Right(body)
} catch {
- case e => scala.concurrent.resolver(e)
+ case NonFatal(e) =>
+ executor.reportFailure(e)
+ scala.concurrent.resolver(e)
}
}
}
@@ -107,7 +109,7 @@ object Future {
private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
_taskStack.get match {
- case stack if (stack ne null) && !force => stack push task
+ case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373
case _ => executor.execute(new Runnable {
def run() {
try {
@@ -115,13 +117,7 @@ object Future {
_taskStack set taskStack
while (taskStack.nonEmpty) {
val next = taskStack.pop()
- try {
- next.apply()
- } catch {
- case e =>
- // TODO catching all and continue isn't good for OOME
- executor.reportFailure(e)
- }
+ try next() catch { case NonFatal(e) => executor reportFailure e }
}
} finally {
_taskStack.remove()
diff --git a/src/library/scala/concurrent/impl/NonFatal.scala b/src/library/scala/concurrent/impl/NonFatal.scala
new file mode 100644
index 0000000000..ac0cddaf1b
--- /dev/null
+++ b/src/library/scala/concurrent/impl/NonFatal.scala
@@ -0,0 +1,36 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl
+
+/**
+ * Extractor of non-fatal Throwables. Will not match fatal errors
+ * like VirtualMachineError (OutOfMemoryError)
+ * ThreadDeath, LinkageError and InterruptedException.
+ * StackOverflowError is matched, i.e. considered non-fatal.
+ *
+ * Usage to catch all harmless throwables:
+ * {{{
+ * try {
+ * // dangerous stuff
+ * } catch {
+ * case NonFatal(e) => log.error(e, "Something not that bad")
+ * }
+ * }}}
+ */
+object NonFatal {
+
+ def unapply(t: Throwable): Option[Throwable] = t match {
+ case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError
+ // VirtualMachineError includes OutOfMemoryError and other fatal errors
+ case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None
+ case e ⇒ Some(e)
+ }
+
+}
+
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 07b6d1f278..1388c8b357 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -74,37 +74,10 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
object Promise {
-
- def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue
-
- def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
-
- /** Represents the internal state.
- *
- * [adriaan] it's unsound to make FState covariant (tryComplete won't type check)
- */
- sealed trait FState[T] { def value: Option[Either[Throwable, T]] }
-
- case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] {
- def value: Option[Either[Throwable, T]] = None
- }
-
- case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
- def result: T = value.get.right.get
- }
-
- case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
- def exception: Throwable = value.get.left.get
- }
-
- private val emptyPendingValue = Pending[Nothing](Nil)
-
/** Default promise implementation.
*/
- class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
- self =>
-
- updater.set(this, Promise.EmptyPending())
+ class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self =>
+ updater.set(this, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU
protected final def tryAwait(atMost: Duration): Boolean = {
@tailrec
@@ -115,7 +88,7 @@ object Promise {
val start = System.nanoTime()
try {
synchronized {
- while (value.isEmpty) wait(ms, ns)
+ while (!isCompleted) wait(ms, ns)
}
} catch {
case e: InterruptedException =>
@@ -123,93 +96,91 @@ object Promise {
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
} else
- value.isDefined
+ isCompleted
}
-
- blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
+ //FIXME do not do this if there'll be no waiting
+ blocking(Future.body2awaitable(awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)), atMost)
}
+ @throws(classOf[TimeoutException])
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
- if (value.isDefined || tryAwait(atMost)) this
+ if (isCompleted || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
+ @throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case Left(e) => throw e
case Right(r) => r
}
- def value: Option[Either[Throwable, T]] = getState.value
+ def value: Option[Either[Throwable, T]] = getState match {
+ case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]])
+ case _ => None
+ }
+
+ override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
+ case _: Either[_, _] => true
+ case _ => false
+ }
@inline
- private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
+ private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]]
@inline
- protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
+ protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState)
@inline
- protected final def getState: FState[T] = updater.get(this)
+ protected final def getState: AnyRef = updater.get(this)
def tryComplete(value: Either[Throwable, T]): Boolean = {
- val callbacks: List[Either[Throwable, T] => Any] = {
+ val callbacks: List[Either[Throwable, T] => Unit] = {
try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = {
+ def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
getState match {
- case cur @ Pending(listeners) =>
- val newState =
- if (v.isLeft) Failure(Some(v.asInstanceOf[Left[Throwable, T]]))
- else Success(Some(v.asInstanceOf[Right[Throwable, T]]))
-
- if (updateState(cur, newState)) listeners
- else tryComplete(v)
+ case raw: List[_] =>
+ val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]]
+ if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
}
tryComplete(resolveEither(value))
} finally {
- synchronized { notifyAll() } // notify any blockers from `tryAwait`
+ synchronized { notifyAll() } //Notify any evil blockers
}
}
callbacks match {
case null => false
case cs if cs.isEmpty => true
- case cs =>
- Future.dispatchFuture(executor, {
- () => cs.foreach(f => notifyCompleted(f, value))
- })
- true
+ case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, value))); true
}
}
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
- @tailrec // Returns whether the future has already been completed or not
- def tryAddCallback(): Boolean = {
+ @tailrec //Returns whether the future has already been completed or not
+ def tryAddCallback(): Either[Throwable, T] = {
val cur = getState
cur match {
- case _: Success[_] | _: Failure[_] => true
- case p: Pending[_] =>
- val pt = p.asInstanceOf[Pending[T]]
- if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
+ case r: Either[_, _] => r.asInstanceOf[Either[Throwable, T]]
+ case listeners: List[_] => if (updateState(listeners, func :: listeners)) null else tryAddCallback()
}
}
- if (tryAddCallback()) {
- val result = value.get
- Future.dispatchFuture(executor, {
- () => notifyCompleted(func, result)
- })
+ tryAddCallback() match {
+ case null => this
+ case completed =>
+ Future.dispatchFuture(executor, () => notifyCompleted(func, completed))
+ this
}
-
- this
}
private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
try {
func(result)
} catch {
- case e => executor.reportFailure(e)
+ case NonFatal(e) => executor reportFailure e
}
}
}
@@ -222,13 +193,13 @@ object Promise {
val value = Some(resolveEither(suppliedValue))
+ override def isCompleted(): Boolean = true
+
def tryComplete(value: Either[Throwable, T]): Boolean = false
def onComplete[U](func: Either[Throwable, T] => U): this.type = {
val completedAs = value.get
- Future.dispatchFuture(executor, {
- () => func(completedAs)
- })
+ Future.dispatchFuture(executor, () => func(completedAs))
this
}
@@ -241,19 +212,3 @@ object Promise {
}
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/library/scala/concurrent/impl/Unsafe.java b/src/library/scala/concurrent/impl/Unsafe.java
new file mode 100644
index 0000000000..3c695c3905
--- /dev/null
+++ b/src/library/scala/concurrent/impl/Unsafe.java
@@ -0,0 +1,32 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+package scala.concurrent.impl;
+
+import java.lang.reflect.Field;
+
+public final class Unsafe {
+ public final static sun.misc.Unsafe instance;
+ static {
+ try {
+ sun.misc.Unsafe found = null;
+ for(Field field : sun.misc.Unsafe.class.getDeclaredFields()) {
+ if (field.getType() == sun.misc.Unsafe.class) {
+ field.setAccessible(true);
+ found = (sun.misc.Unsafe) field.get(null);
+ break;
+ }
+ }
+ if (found == null) throw new IllegalStateException("Can't find instance of sun.misc.Unsafe");
+ else instance = found;
+ } catch(Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ }
+}