diff options
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/collection/SeqExtractors.scala | 3 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 62 | ||||
-rw-r--r-- | src/library/scala/concurrent/ManagedBlocker.scala | 1 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/AbstractPromise.java | 6 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 4 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Future.scala | 37 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/NonFatal.scala | 37 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 127 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Unsafe.java | 32 | ||||
-rw-r--r-- | src/library/scala/concurrent/util/Duration.scala | 8 | ||||
-rw-r--r-- | src/library/scala/concurrent/util/duration/package.scala | 3 |
11 files changed, 173 insertions, 147 deletions
diff --git a/src/library/scala/collection/SeqExtractors.scala b/src/library/scala/collection/SeqExtractors.scala index cb3cb27f18..cbb09a0a90 100644 --- a/src/library/scala/collection/SeqExtractors.scala +++ b/src/library/scala/collection/SeqExtractors.scala @@ -19,3 +19,6 @@ object :+ { if(t.isEmpty) None else Some(t.init -> t.last) } + +// Dummy to fool ant +private abstract class SeqExtractors
\ No newline at end of file diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 0d76c23c25..9aaf05dbd6 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -18,6 +18,7 @@ import java.{ lang => jl } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } import scala.concurrent.util.Duration +import scala.concurrent.impl.NonFatal import scala.Option import scala.annotation.tailrec @@ -117,7 +118,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => // do nothing } - /** When this future is completed, either through an exception, a timeout, or a value, + /** When this future is completed, either through an exception, or a value, * apply the provided function. * * If the future has already been completed, @@ -204,7 +205,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => try p success f(v) catch { - case t => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -230,7 +231,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => p success v } } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -242,7 +243,7 @@ trait Future[+T] extends Awaitable[T] { * If the current future contains a value which satisfies the predicate, the new future will also hold that value. * Otherwise, the resulting future will fail with a `NoSuchElementException`. * - * If the current future fails or times out, the resulting future also fails or times out, respectively. + * If the current future fails, then the resulting future also fails. * * Example: * {{{ @@ -263,7 +264,7 @@ trait Future[+T] extends Awaitable[T] { if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -282,12 +283,12 @@ trait Future[+T] extends Awaitable[T] { // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x)) // } - /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. + /** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value. * * If the current future contains a value for which the partial function is defined, the new future will also hold that value. * Otherwise, the resulting future will fail with a `NoSuchElementException`. * - * If the current future fails or times out, the resulting future also fails or times out, respectively. + * If the current future fails, then the resulting future also fails. * * Example: * {{{ @@ -312,7 +313,7 @@ trait Future[+T] extends Awaitable[T] { if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -337,7 +338,7 @@ trait Future[+T] extends Awaitable[T] { onComplete { case Left(t) if pf isDefinedAt t => try { p success pf(t) } - catch { case t: Throwable => p complete resolver(t) } + catch { case NonFatal(t) => p complete resolver(t) } case otherwise => p complete otherwise } @@ -365,7 +366,7 @@ trait Future[+T] extends Awaitable[T] { try { p completeWith pf(t) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } case otherwise => p complete otherwise } @@ -425,13 +426,30 @@ trait Future[+T] extends Awaitable[T] { * that conforms to `S`'s erased type or a `ClassCastException` otherwise. */ def mapTo[S](implicit m: Manifest[S]): Future[S] = { + import java.{ lang => jl } + val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[jl.Boolean], + classOf[Byte] -> classOf[jl.Byte], + classOf[Char] -> classOf[jl.Character], + classOf[Short] -> classOf[jl.Short], + classOf[Int] -> classOf[jl.Integer], + classOf[Long] -> classOf[jl.Long], + classOf[Float] -> classOf[jl.Float], + classOf[Double] -> classOf[jl.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit] + ) + + def boxedType(c: Class[_]): Class[_] = { + if (c.isPrimitive) toBoxed(c) else c + } + val p = newPromise[S] onComplete { case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]] case Right(t) => p complete (try { - Right(impl.Future.boxedType(m.erasure).cast(t).asInstanceOf[S]) + Right(boxedType(m.erasure).cast(t).asInstanceOf[S]) } catch { case e: ClassCastException => Left(e) }) @@ -512,17 +530,16 @@ trait Future[+T] extends Awaitable[T] { * Note: using this method yields nondeterministic dataflow programs. */ object Future { - - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. - * - * The result becomes available once the asynchronous computation is completed. - * - * @tparam T the type of the result - * @param body the asychronous computation - * @param execctx the execution context on which the future is run - * @return the `Future` holding the result of the computation - */ - def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ + def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -615,4 +632,3 @@ object Future { - diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala index 0b6d82e76f..9c6f4d51d6 100644 --- a/src/library/scala/concurrent/ManagedBlocker.scala +++ b/src/library/scala/concurrent/ManagedBlocker.scala @@ -12,7 +12,6 @@ package scala.concurrent * * @author Philipp Haller */ -@deprecated("Not used.", "2.10.0") trait ManagedBlocker { /** diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java index 5280d67854..8aac5de042 100644 --- a/src/library/scala/concurrent/impl/AbstractPromise.java +++ b/src/library/scala/concurrent/impl/AbstractPromise.java @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; abstract class AbstractPromise { - private volatile Object _ref = null; + private volatile Object _ref; protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater = - AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); -} + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +}
\ No newline at end of file diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index c308a59297..ad98331241 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -93,7 +93,9 @@ private[scala] class ExecutionContextImpl(es: AnyRef) extends ExecutionContext w } def reportFailure(t: Throwable) = t match { - case e: Error => throw e // rethrow serious errors + // `Error`s are currently wrapped by `resolver`. + // Also, re-throwing `Error`s here causes an exception handling test to fail. + //case e: Error => throw e case t => t.printStackTrace() } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index a3c8ed3095..548524c9fe 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -28,7 +28,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa /** Tests whether this Future has been completed. */ - final def isCompleted: Boolean = value.isDefined + def isCompleted: Boolean /** The contained value of this Future. Before this Future is completed * the value will be None. After completion the value will be Some(Right(t)) @@ -42,20 +42,6 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa } object Future { - import java.{ lang => jl } - - private val toBoxed = Map[Class[_], Class[_]]( - classOf[Boolean] -> classOf[jl.Boolean], - classOf[Byte] -> classOf[jl.Byte], - classOf[Char] -> classOf[jl.Character], - classOf[Short] -> classOf[jl.Short], - classOf[Int] -> classOf[jl.Integer], - classOf[Long] -> classOf[jl.Long], - classOf[Float] -> classOf[jl.Float], - classOf[Double] -> classOf[jl.Double], - classOf[Unit] -> classOf[scala.runtime.BoxedUnit] - ) - /** Wraps a block of code into an awaitable object. */ private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] { def ready(atMost: Duration)(implicit permit: CanAwait) = { @@ -65,19 +51,20 @@ object Future { def result(atMost: Duration)(implicit permit: CanAwait) = body } - def boxedType(c: Class[_]): Class[_] = { - if (c.isPrimitive) toBoxed(c) else c - } - def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { val promise = new Promise.DefaultPromise[T]() + + //TODO: use `dispatchFuture`? executor.execute(new Runnable { def run = { promise complete { try { Right(body) } catch { - case e => scala.concurrent.resolver(e) + case NonFatal(e) => + // Commenting out reporting for now, since it produces too much output in the tests + //executor.reportFailure(e) + scala.concurrent.resolver(e) } } } @@ -107,7 +94,7 @@ object Future { private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit = _taskStack.get match { - case stack if (stack ne null) && !force => stack push task + case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373 case _ => executor.execute(new Runnable { def run() { try { @@ -115,13 +102,7 @@ object Future { _taskStack set taskStack while (taskStack.nonEmpty) { val next = taskStack.pop() - try { - next.apply() - } catch { - case e => - // TODO catching all and continue isn't good for OOME - executor.reportFailure(e) - } + try next() catch { case NonFatal(e) => executor reportFailure e } } } finally { _taskStack.remove() diff --git a/src/library/scala/concurrent/impl/NonFatal.scala b/src/library/scala/concurrent/impl/NonFatal.scala new file mode 100644 index 0000000000..bc509e664c --- /dev/null +++ b/src/library/scala/concurrent/impl/NonFatal.scala @@ -0,0 +1,37 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent +package impl + +/** + * Extractor of non-fatal Throwables. Will not match fatal errors + * like VirtualMachineError (OutOfMemoryError) + * ThreadDeath, LinkageError and InterruptedException. + * StackOverflowError is matched, i.e. considered non-fatal. + * + * Usage to catch all harmless throwables: + * {{{ + * try { + * // dangerous stuff + * } catch { + * case NonFatal(e) => log.error(e, "Something not that bad") + * } + * }}} + */ +private[concurrent] object NonFatal { + + def unapply(t: Throwable): Option[Throwable] = t match { + case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None + case e ⇒ Some(e) + } + +} + diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 07b6d1f278..ee1841aaff 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -74,37 +74,10 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu object Promise { - - def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue - - def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] - - /** Represents the internal state. - * - * [adriaan] it's unsound to make FState covariant (tryComplete won't type check) - */ - sealed trait FState[T] { def value: Option[Either[Throwable, T]] } - - case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] { - def value: Option[Either[Throwable, T]] = None - } - - case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def result: T = value.get.right.get - } - - case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def exception: Throwable = value.get.left.get - } - - private val emptyPendingValue = Pending[Nothing](Nil) - /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { - self => - - updater.set(this, Promise.EmptyPending()) + class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => + updater.set(this, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU protected final def tryAwait(atMost: Duration): Boolean = { @tailrec @@ -115,7 +88,7 @@ object Promise { val start = System.nanoTime() try { synchronized { - while (value.isEmpty) wait(ms, ns) + while (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException => @@ -123,93 +96,91 @@ object Promise { awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) } else - value.isDefined + isCompleted } - - blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) + //FIXME do not do this if there'll be no waiting + blocking(Future.body2awaitable(awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)), atMost) } + @throws(classOf[TimeoutException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (value.isDefined || tryAwait(atMost)) this + if (isCompleted || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { case Left(e) => throw e case Right(r) => r } - def value: Option[Either[Throwable, T]] = getState.value + def value: Option[Either[Throwable, T]] = getState match { + case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]]) + case _ => None + } + + override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value" + case _: Either[_, _] => true + case _ => false + } @inline - private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]] @inline - protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState) @inline - protected final def getState: FState[T] = updater.get(this) + protected final def getState: AnyRef = updater.get(this) def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] => Any] = { + val callbacks: List[Either[Throwable, T] => Unit] = { try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = { + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { getState match { - case cur @ Pending(listeners) => - val newState = - if (v.isLeft) Failure(Some(v.asInstanceOf[Left[Throwable, T]])) - else Success(Some(v.asInstanceOf[Right[Throwable, T]])) - - if (updateState(cur, newState)) listeners - else tryComplete(v) + case raw: List[_] => + val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]] + if (updateState(cur, v)) cur else tryComplete(v) case _ => null } } tryComplete(resolveEither(value)) } finally { - synchronized { notifyAll() } // notify any blockers from `tryAwait` + synchronized { notifyAll() } //Notify any evil blockers } } callbacks match { case null => false case cs if cs.isEmpty => true - case cs => - Future.dispatchFuture(executor, { - () => cs.foreach(f => notifyCompleted(f, value)) - }) - true + case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, value))); true } } def onComplete[U](func: Either[Throwable, T] => U): this.type = { - @tailrec // Returns whether the future has already been completed or not - def tryAddCallback(): Boolean = { + @tailrec //Returns the future's results if it has already been completed, or null otherwise. + def tryAddCallback(): Either[Throwable, T] = { val cur = getState cur match { - case _: Success[_] | _: Failure[_] => true - case p: Pending[_] => - val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + case r: Either[_, _] => r.asInstanceOf[Either[Throwable, T]] + case listeners: List[_] => if (updateState(listeners, func :: listeners)) null else tryAddCallback() } } - if (tryAddCallback()) { - val result = value.get - Future.dispatchFuture(executor, { - () => notifyCompleted(func, result) - }) + tryAddCallback() match { + case null => this + case completed => + Future.dispatchFuture(executor, () => notifyCompleted(func, completed)) + this } - - this } private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { try { func(result) } catch { - case e => executor.reportFailure(e) + case NonFatal(e) => executor reportFailure e } } } @@ -222,13 +193,13 @@ object Promise { val value = Some(resolveEither(suppliedValue)) + override def isCompleted(): Boolean = true + def tryComplete(value: Either[Throwable, T]): Boolean = false def onComplete[U](func: Either[Throwable, T] => U): this.type = { val completedAs = value.get - Future.dispatchFuture(executor, { - () => func(completedAs) - }) + Future.dispatchFuture(executor, () => func(completedAs)) this } @@ -241,19 +212,3 @@ object Promise { } } - - - - - - - - - - - - - - - - diff --git a/src/library/scala/concurrent/impl/Unsafe.java b/src/library/scala/concurrent/impl/Unsafe.java new file mode 100644 index 0000000000..21f7e638e5 --- /dev/null +++ b/src/library/scala/concurrent/impl/Unsafe.java @@ -0,0 +1,32 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + + +package scala.concurrent; + +import java.lang.reflect.Field; + +final class Unsafe { + public final static sun.misc.Unsafe instance; + static { + try { + sun.misc.Unsafe found = null; + for(Field field : sun.misc.Unsafe.class.getDeclaredFields()) { + if (field.getType() == sun.misc.Unsafe.class) { + field.setAccessible(true); + found = (sun.misc.Unsafe) field.get(null); + break; + } + } + if (found == null) throw new IllegalStateException("Can't find instance of sun.misc.Unsafe"); + else instance = found; + } catch(Throwable t) { + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/src/library/scala/concurrent/util/Duration.scala b/src/library/scala/concurrent/util/Duration.scala index 15a546de10..c4e5fa491a 100644 --- a/src/library/scala/concurrent/util/Duration.scala +++ b/src/library/scala/concurrent/util/Duration.scala @@ -297,7 +297,7 @@ class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration { def toMinutes = unit.toMinutes(length) def toHours = unit.toHours(length) def toDays = unit.toDays(length) - def toUnit(u: TimeUnit) = long2double(toNanos) / NANOSECONDS.convert(1, u) + def toUnit(u: TimeUnit) = toNanos.toDouble / NANOSECONDS.convert(1, u) override def toString = this match { case Duration(1, DAYS) ⇒ "1 day" @@ -341,11 +341,11 @@ class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration { } } - def *(factor: Double) = fromNanos(long2double(toNanos) * factor) + def *(factor: Double) = fromNanos(toNanos.toDouble * factor) - def /(factor: Double) = fromNanos(long2double(toNanos) / factor) + def /(factor: Double) = fromNanos(toNanos.toDouble / factor) - def /(other: Duration) = if (other.finite_?) long2double(toNanos) / other.toNanos else 0 + def /(other: Duration) = if (other.finite_?) toNanos.toDouble / other.toNanos else 0 def unary_- = Duration(-length, unit) diff --git a/src/library/scala/concurrent/util/duration/package.scala b/src/library/scala/concurrent/util/duration/package.scala index 25625054ee..e3cf229c61 100644 --- a/src/library/scala/concurrent/util/duration/package.scala +++ b/src/library/scala/concurrent/util/duration/package.scala @@ -1,6 +1,7 @@ package scala.concurrent.util import java.util.concurrent.TimeUnit +import language.implicitConversions package object duration { @@ -27,4 +28,4 @@ package object duration { implicit def intMult(i: Int) = new IntMult(i) implicit def longMult(l: Long) = new LongMult(l) implicit def doubleMult(f: Double) = new DoubleMult(f) -}
\ No newline at end of file +} |