summaryrefslogtreecommitdiff
path: root/src/library/scala
diff options
context:
space:
mode:
authorGrzegorz Kossakowski <grzegorz.kossakowski@gmail.com>2013-07-29 17:27:03 -0700
committerGrzegorz Kossakowski <grzegorz.kossakowski@gmail.com>2013-07-29 17:27:03 -0700
commita72f79abf5845d25c847f7d7cde6d28a9d15612e (patch)
treee2c8669276a0ae7ddf8014136d3bedc2ac753dc9 /src/library/scala
parent20cd9474f0a22950c905badb81fb6eeebdf00b34 (diff)
parent1e5bfdb117c1cab659456949549765084081d534 (diff)
downloadscala-a72f79abf5845d25c847f7d7cde6d28a9d15612e.tar.gz
scala-a72f79abf5845d25c847f7d7cde6d28a9d15612e.tar.bz2
scala-a72f79abf5845d25c847f7d7cde6d28a9d15612e.zip
Merge remote-tracking branch 'scala/2.10.x' into merge-2.10.x
Conflicts: bincompat-backward.whitelist.conf bincompat-forward.whitelist.conf src/compiler/scala/reflect/reify/phases/Reshape.scala src/compiler/scala/tools/nsc/symtab/classfile/ClassfileParser.scala src/compiler/scala/tools/nsc/transform/Mixin.scala src/compiler/scala/tools/nsc/typechecker/RefChecks.scala src/compiler/scala/tools/nsc/typechecker/Typers.scala src/library/scala/concurrent/impl/Promise.scala src/reflect/scala/reflect/internal/StdAttachments.scala test/files/neg/macro-override-macro-overrides-abstract-method-b.check test/files/run/t7569.check
Diffstat (limited to 'src/library/scala')
-rw-r--r--src/library/scala/concurrent/Future.scala9
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala210
-rw-r--r--src/library/scala/util/Properties.scala35
3 files changed, 221 insertions, 33 deletions
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index b072cd653b..411b89701b 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -247,10 +247,15 @@ trait Future[+T] extends Awaitable[T] {
* $forComprehensionExamples
*/
def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
- val p = Promise[S]()
+ import impl.Promise.DefaultPromise
+ val p = new DefaultPromise[S]()
onComplete {
case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
- case Success(v) => try f(v) onComplete p.complete catch { case NonFatal(t) => p failure t }
+ case Success(v) => try f(v) match {
+ // If possible, link DefaultPromises to avoid space leaks
+ case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
+ case fut => fut onComplete p.complete
+ } catch { case NonFatal(t) => p failure t }
}
p.future
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index ffaea8de96..35511856ee 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -56,7 +56,9 @@ private[concurrent] object Promise {
case t => Failure(t)
}
- /*
+ /**
+ * Latch used to implement waiting on a DefaultPromise's result.
+ *
* Inspired by: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
@@ -73,10 +75,122 @@ private[concurrent] object Promise {
/** Default promise implementation.
+ *
+ * A DefaultPromise has three possible states. It can be:
+ *
+ * 1. Incomplete, with an associated list of callbacks waiting on completion.
+ * 2. Complete, with a result.
+ * 3. Linked to another DefaultPromise.
+ *
+ * If a DefaultPromise is linked it another DefaultPromise then it will
+ * delegate all its operations to that other promise. This means that two
+ * DefaultPromises that are linked will appear, to external callers, to have
+ * exactly the same state and behaviour. E.g. they will both appear to be
+ * either complete or incomplete, and with the same values.
+ *
+ * A DefaultPromise stores its state entirely in the AnyRef cell exposed by
+ * AbstractPromise. The type of object stored in the cell fully describes the
+ * current state of the promise.
+ *
+ * 1. List[CallbackRunnable] - The promise is incomplete and has zero or more callbacks
+ * to call when it is eventually completed.
+ * 2. Try[T] - The promise is complete and now contains its value.
+ * 3. DefaultPromise[T] - The promise is linked to another promise.
+ *
+ * The ability to link DefaultPromises is needed to prevent memory leaks when
+ * using Future.flatMap. The previous implementation of Future.flatMap used
+ * onComplete handlers to propagate the ultimate value of a flatMap operation
+ * to its promise. Recursive calls to flatMap built a chain of onComplete
+ * handlers and promises. Unfortunately none of the handlers or promises in
+ * the chain could be collected until the handlers had been called and
+ * detached, which only happened when the final flatMap future was completed.
+ * (In some situations, such as infinite streams, this would never actually
+ * happen.) Because of the fact that the promise implementation internally
+ * created references between promises, and these references were invisible to
+ * user code, it was easy for user code to accidentally build large chains of
+ * promises and thereby leak memory.
+ *
+ * The problem of leaks is solved by automatically breaking these chains of
+ * promises, so that promises don't refer to each other in a long chain. This
+ * allows each promise to be individually collected. The idea is to "flatten"
+ * the chain of promises, so that instead of each promise pointing to its
+ * neighbour, they instead point directly the promise at the root of the
+ * chain. This means that only the root promise is referenced, and all the
+ * other promises are available for garbage collection as soon as they're no
+ * longer referenced by user code.
+ *
+ * To make the chains flattenable, the concept of linking promises together
+ * needed to become an explicit feature of the DefaultPromise implementation,
+ * so that the implementation to navigate and rewire links as needed. The idea
+ * of linking promises is based on the [[Twitter promise implementation
+ * https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Promise.scala]].
+ *
+ * In practice, flattening the chain cannot always be done perfectly. When a
+ * promise is added to the end of the chain, it scans the chain and links
+ * directly to the root promise. This prevents the chain from growing forwards
+ * But the root promise for a chain can change, causing the chain to grow
+ * backwards, and leaving all previously-linked promise pointing at a promise
+ * which is no longer the root promise.
+ *
+ * To mitigate the problem of the root promise changing, whenever a promise's
+ * methods are called, and it needs a reference to its root promise it calls
+ * the `compressedRoot()` method. This method re-scans the promise chain to
+ * get the root promise, and also compresses its links so that it links
+ * directly to whatever the current root promise is. This ensures that the
+ * chain is flattened whenever `compressedRoot()` is called. And since
+ * `compressedRoot()` is called at every possible opportunity (when getting a
+ * promise's value, when adding an onComplete handler, etc), this will happen
+ * frequently. Unfortunately, even this eager relinking doesn't absolutely
+ * guarantee that the chain will be flattened and that leaks cannot occur.
+ * However eager relinking does greatly reduce the chance that leaks will
+ * occur.
+ *
+ * Future.flatMap links DefaultPromises together by calling the `linkRootOf`
+ * method. This is the only externally visible interface to linked
+ * DefaultPromises, and `linkedRootOf` is currently only designed to be called
+ * by Future.flatMap.
*/
class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
- updateState(null, Nil) // Start at "No callbacks"
+ updateState(null, Nil) // The promise is incomplete and has no callbacks
+
+ /** Get the root promise for this promise, compressing the link chain to that
+ * promise if necessary.
+ *
+ * For promises that are not linked, the result of calling
+ * `compressedRoot()` will the promise itself. However for linked promises,
+ * this method will traverse each link until it locates the root promise at
+ * the base of the link chain.
+ *
+ * As a side effect of calling this method, the link from this promise back
+ * to the root promise will be updated ("compressed") to point directly to
+ * the root promise. This allows intermediate promises in the link chain to
+ * be garbage collected. Also, subsequent calls to this method should be
+ * faster as the link chain will be shorter.
+ */
+ @tailrec
+ private def compressedRoot(): DefaultPromise[T] = {
+ getState match {
+ case linked: DefaultPromise[_] =>
+ val target = linked.asInstanceOf[DefaultPromise[T]].root
+ if (linked eq target) target else if (updateState(linked, target)) target else compressedRoot()
+ case _ => this
+ }
+ }
+ /** Get the promise at the root of the chain of linked promises. Used by `compressedRoot()`.
+ * The `compressedRoot()` method should be called instead of this method, as it is important
+ * to compress the link chain whenever possible.
+ */
+ @tailrec
+ private def root: DefaultPromise[T] = {
+ getState match {
+ case linked: DefaultPromise[_] => linked.asInstanceOf[DefaultPromise[T]].root
+ case _ => this
+ }
+ }
+
+ /** Try waiting for this promise to be completed.
+ */
protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) {
import Duration.Undefined
import scala.concurrent.Future.InternalCallbackExecutor
@@ -108,42 +222,96 @@ private[concurrent] object Promise {
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here
- def value: Option[Try[T]] = getState match {
+ def value: Option[Try[T]] = value0
+
+ @tailrec
+ private def value0: Option[Try[T]] = getState match {
case c: Try[_] => Some(c.asInstanceOf[Try[T]])
+ case _: DefaultPromise[_] => compressedRoot().value0
case _ => None
}
- override def isCompleted: Boolean = getState.isInstanceOf[Try[_]]
+ override def isCompleted: Boolean = isCompleted0
+
+ @tailrec
+ private def isCompleted0: Boolean = getState match {
+ case _: Try[_] => true
+ case _: DefaultPromise[_] => compressedRoot().isCompleted0
+ case _ => false
+ }
def tryComplete(value: Try[T]): Boolean = {
val resolved = resolveTry(value)
- @tailrec
- def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
- getState match {
- case raw: List[_] =>
- val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
- if (updateState(cur, v)) cur else tryComplete(v)
- case _ => null
- }
- }
- tryComplete(resolved) match {
+ tryCompleteAndGetListeners(resolved) match {
case null => false
case rs if rs.isEmpty => true
case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
+ /** Called by `tryComplete` to store the resolved value and get the list of
+ * listeners, or `null` if it is already completed.
+ */
+ @tailrec
+ private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = {
+ getState match {
+ case raw: List[_] =>
+ val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
+ if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v)
+ case _: DefaultPromise[_] =>
+ compressedRoot().tryCompleteAndGetListeners(v)
+ case _ => null
+ }
+ }
+
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val preparedEC = executor.prepare()
val runnable = new CallbackRunnable[T](preparedEC, func)
+ dispatchOrAddCallback(runnable)
+ }
+
+ /** Tries to add the callback, if already completed, it dispatches the callback to be executed.
+ * Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks
+ * to the root promise when linking two promises togehter.
+ */
+ @tailrec
+ private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {
+ getState match {
+ case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
+ case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable)
+ case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable)
+ }
+ }
- @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
- def dispatchOrAddCallback(): Unit =
- getState match {
- case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
- case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
- }
- dispatchOrAddCallback()
+ /** Link this promise to the root of another promise using `link()`. Should only be
+ * be called by Future.flatMap.
+ */
+ protected[concurrent] final def linkRootOf(target: DefaultPromise[T]): Unit = link(target.compressedRoot())
+
+ /** Link this promise to another promise so that both promises share the same
+ * externally-visible state. Depending on the current state of this promise, this
+ * may involve different things. For example, any onComplete listeners will need
+ * to be transferred.
+ *
+ * If this promise is already completed, then the same effect as linking -
+ * sharing the same completed value - is achieved by simply sending this
+ * promise's result to the target promise.
+ */
+ @tailrec
+ private def link(target: DefaultPromise[T]): Unit = if (this ne target) {
+ getState match {
+ case r: Try[_] =>
+ if (!target.tryComplete(r.asInstanceOf[Try[T]])) {
+ // Currently linking is done from Future.flatMap, which should ensure only
+ // one promise can be completed. Therefore this situation is unexpected.
+ throw new IllegalStateException("Cannot link completed promises together")
+ }
+ case _: DefaultPromise[_] =>
+ compressedRoot().link(target)
+ case listeners: List[_] => if (updateState(listeners, target)) {
+ if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_))
+ } else link(target)
+ }
}
}
diff --git a/src/library/scala/util/Properties.scala b/src/library/scala/util/Properties.scala
index fef9cef246..02c461f3c6 100644
--- a/src/library/scala/util/Properties.scala
+++ b/src/library/scala/util/Properties.scala
@@ -131,6 +131,10 @@ private[scala] trait PropertiesTrait {
def javaVmName = propOrEmpty("java.vm.name")
def javaVmVendor = propOrEmpty("java.vm.vendor")
def javaVmVersion = propOrEmpty("java.vm.version")
+ // this property must remain less-well-known until 2.11
+ private def javaSpecVersion = propOrEmpty("java.specification.version")
+ //private def javaSpecVendor = propOrEmpty("java.specification.vendor")
+ //private def javaSpecName = propOrEmpty("java.specification.name")
def osName = propOrEmpty("os.name")
def scalaHome = propOrEmpty("scala.home")
def tmpDir = propOrEmpty("java.io.tmpdir")
@@ -158,18 +162,29 @@ private[scala] trait PropertiesTrait {
def scalaCmd = if (isWin) "scala.bat" else "scala"
def scalacCmd = if (isWin) "scalac.bat" else "scalac"
- /** Can the java version be determined to be at least as high as the argument?
- * Hard to properly future proof this but at the rate 1.7 is going we can leave
- * the issue for our cyborg grandchildren to solve.
+ /** Compares the given specification version to the specification version of the platform.
+ *
+ * @param version a specification version of the form "major.minor"
+ * @return `true` iff the specification version of the current runtime
+ * is equal to or higher than the version denoted by the given string.
+ * @throws NumberFormatException if the given string is not a version string
+ *
+ * @example {{{
+ * // In this example, the runtime's Java specification is assumed to be at version 1.7.
+ * isJavaAtLeast("1.6") // true
+ * isJavaAtLeast("1.7") // true
+ * isJavaAtLeast("1.8") // false
+ * }}
*/
- def isJavaAtLeast(version: String) = {
- val okVersions = version match {
- case "1.5" => List("1.5", "1.6", "1.7")
- case "1.6" => List("1.6", "1.7")
- case "1.7" => List("1.7")
- case _ => Nil
+ def isJavaAtLeast(version: String): Boolean = {
+ def parts(x: String) = {
+ val i = x.indexOf('.')
+ if (i < 0) throw new NumberFormatException("Not a version: " + x)
+ (x.substring(0, i), x.substring(i+1, x.length))
}
- okVersions exists (javaVersion startsWith _)
+ val (v, _v) = parts(version)
+ val (s, _s) = parts(javaSpecVersion)
+ s.toInt >= v.toInt && _s.toInt >= _v.toInt
}
// provide a main method so version info can be obtained by running this