From 48c677ceb3177d93e700b399c00af6b8bb6419e4 Mon Sep 17 00:00:00 2001 From: Rich Dougherty Date: Tue, 2 Jul 2013 21:13:12 +1200 Subject: SI-7336 - Link flatMapped promises to avoid memory leaks --- src/library/scala/concurrent/impl/Promise.scala | 212 +++++++++++++++++++++--- 1 file changed, 190 insertions(+), 22 deletions(-) (limited to 'src/library/scala/concurrent/impl') diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index ed4039e2a5..c9b2a15f2f 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -55,8 +55,10 @@ private[concurrent] object Promise { case e: Error => Failure(new ExecutionException("Boxed Error", e)) case t => Failure(t) } - - /* + + /** + * Latch used to implement waiting on a DefaultPromise's result. + * * Inspired by: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at @@ -73,10 +75,122 @@ private[concurrent] object Promise { /** Default promise implementation. + * + * A DefaultPromise has three possible states. It can be: + * + * 1. Incomplete, with an associated list of callbacks waiting on completion. + * 2. Complete, with a result. + * 3. Linked to another DefaultPromise. + * + * If a DefaultPromise is linked it another DefaultPromise then it will + * delegate all its operations to that other promise. This means that two + * DefaultPromises that are linked will appear, to external callers, to have + * exactly the same state and behaviour. E.g. they will both appear to be + * either complete or incomplete, and with the same values. + * + * A DefaultPromise stores its state entirely in the AnyRef cell exposed by + * AbstractPromise. The type of object stored in the cell fully describes the + * current state of the promise. + * + * 1. List[CallbackRunnable] - The promise is incomplete and has zero or more callbacks + * to call when it is eventually completed. + * 2. Try[T] - The promise is complete and now contains its value. + * 3. DefaultPromise[T] - The promise is linked to another promise. + * + * The ability to link DefaultPromises is needed to prevent memory leaks when + * using Future.flatMap. The previous implementation of Future.flatMap used + * onComplete handlers to propagate the ultimate value of a flatMap operation + * to its promise. Recursive calls to flatMap built a chain of onComplete + * handlers and promises. Unfortunately none of the handlers or promises in + * the chain could be collected until the handlers had been called and + * detached, which only happened when the final flatMap future was completed. + * (In some situations, such as infinite streams, this would never actually + * happen.) Because of the fact that the promise implementation internally + * created references between promises, and these references were invisible to + * user code, it was easy for user code to accidentally build large chains of + * promises and thereby leak memory. + * + * The problem of leaks is solved by automatically breaking these chains of + * promises, so that promises don't refer to each other in a long chain. This + * allows each promise to be individually collected. The idea is to "flatten" + * the chain of promises, so that instead of each promise pointing to its + * neighbour, they instead point directly the promise at the root of the + * chain. This means that only the root promise is referenced, and all the + * other promises are available for garbage collection as soon as they're no + * longer referenced by user code. + * + * To make the chains flattenable, the concept of linking promises together + * needed to become an explicit feature of the DefaultPromise implementation, + * so that the implementation to navigate and rewire links as needed. The idea + * of linking promises is based on the [[Twitter promise implementation + * https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Promise.scala]]. + * + * In practice, flattening the chain cannot always be done perfectly. When a + * promise is added to the end of the chain, it scans the chain and links + * directly to the root promise. This prevents the chain from growing forwards + * But the root promise for a chain can change, causing the chain to grow + * backwards, and leaving all previously-linked promise pointing at a promise + * which is no longer the root promise. + * + * To mitigate the problem of the root promise changing, whenever a promise's + * methods are called, and it needs a reference to its root promise it calls + * the `compressedRoot()` method. This method re-scans the promise chain to + * get the root promise, and also compresses its links so that it links + * directly to whatever the current root promise is. This ensures that the + * chain is flattened whenever `compressedRoot()` is called. And since + * `compressedRoot()` is called at every possible opportunity (when getting a + * promise's value, when adding an onComplete handler, etc), this will happen + * frequently. Unfortunately, even this eager relinking doesn't absolutely + * guarantee that the chain will be flattened and that leaks cannot occur. + * However eager relinking does greatly reduce the chance that leaks will + * occur. + * + * Future.flatMap links DefaultPromises together by calling the `linkRootOf` + * method. This is the only externally visible interface to linked + * DefaultPromises, and `linkedRootOf` is currently only designed to be called + * by Future.flatMap. */ class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => - updateState(null, Nil) // Start at "No callbacks" + updateState(null, Nil) // The promise is incomplete and has no callbacks + + /** Get the root promise for this promise, compressing the link chain to that + * promise if necessary. + * + * For promises that are not linked, the result of calling + * `compressedRoot()` will the promise itself. However for linked promises, + * this method will traverse each link until it locates the root promise at + * the base of the link chain. + * + * As a side effect of calling this method, the link from this promise back + * to the root promise will be updated ("compressed") to point directly to + * the root promise. This allows intermediate promises in the link chain to + * be garbage collected. Also, subsequent calls to this method should be + * faster as the link chain will be shorter. + */ + @tailrec + private def compressedRoot(): DefaultPromise[T] = { + getState match { + case linked: DefaultPromise[_] => + val target = linked.asInstanceOf[DefaultPromise[T]].root + if (linked eq target) target else if (updateState(linked, target)) target else compressedRoot() + case _ => this + } + } + + /** Get the promise at the root of the chain of linked promises. Used by `compressedRoot()`. + * The `compressedRoot()` method should be called instead of this method, as it is important + * to compress the link chain whenever possible. + */ + @tailrec + private def root: DefaultPromise[T] = { + getState match { + case linked: DefaultPromise[_] => linked.asInstanceOf[DefaultPromise[T]].root + case _ => this + } + } + /** Try waiting for this promise to be completed. + */ protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) { import Duration.Undefined import scala.concurrent.Future.InternalCallbackExecutor @@ -108,42 +222,96 @@ private[concurrent] object Promise { def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here - def value: Option[Try[T]] = getState match { + def value: Option[Try[T]] = value0 + + @tailrec + private def value0: Option[Try[T]] = getState match { case c: Try[_] => Some(c.asInstanceOf[Try[T]]) + case _: DefaultPromise[_] => compressedRoot().value0 case _ => None } - override def isCompleted: Boolean = getState.isInstanceOf[Try[_]] + override def isCompleted: Boolean = isCompleted0 + + @tailrec + private def isCompleted0: Boolean = getState match { + case _: Try[_] => true + case _: DefaultPromise[_] => compressedRoot().isCompleted0 + case _ => false + } def tryComplete(value: Try[T]): Boolean = { val resolved = resolveTry(value) - @tailrec - def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = { - getState match { - case raw: List[_] => - val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] - if (updateState(cur, v)) cur else tryComplete(v) - case _ => null - } - } - tryComplete(resolved) match { + tryCompleteAndGetListeners(resolved) match { case null => false case rs if rs.isEmpty => true case rs => rs.foreach(r => r.executeWithValue(resolved)); true } } + /** Called by `tryComplete` to store the resolved value and get the list of + * listeners, or `null` if it is already completed. + */ + @tailrec + private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = { + getState match { + case raw: List[_] => + val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] + if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v) + case _: DefaultPromise[_] => + compressedRoot().tryCompleteAndGetListeners(v) + case _ => null + } + } + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val preparedEC = executor.prepare val runnable = new CallbackRunnable[T](preparedEC, func) + dispatchOrAddCallback(runnable) + } + + /** Tries to add the callback, if already completed, it dispatches the callback to be executed. + * Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks + * to the root promise when linking two promises togehter. + */ + @tailrec + private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = { + getState match { + case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) + case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable) + case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable) + } + } + + /** Link this promise to the root of another promise using `link()`. Should only be + * be called by Future.flatMap. + */ + protected[concurrent] final def linkRootOf(target: DefaultPromise[T]): Unit = link(target.compressedRoot()) - @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed - def dispatchOrAddCallback(): Unit = - getState match { - case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) - case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback() - } - dispatchOrAddCallback() + /** Link this promise to another promise so that both promises share the same + * externally-visible state. Depending on the current state of this promise, this + * may involve different things. For example, any onComplete listeners will need + * to be transferred. + * + * If this promise is already completed, then the same effect as linking - + * sharing the same completed value - is achieved by simply sending this + * promise's result to the target promise. + */ + @tailrec + private def link(target: DefaultPromise[T]): Unit = if (this ne target) { + getState match { + case r: Try[_] => + if (!target.tryComplete(r.asInstanceOf[Try[T]])) { + // Currently linking is done from Future.flatMap, which should ensure only + // one promise can be completed. Therefore this situation is unexpected. + throw new IllegalStateException("Cannot link completed promises together") + } + case _: DefaultPromise[_] => + compressedRoot().link(target) + case listeners: List[_] => if (updateState(listeners, target)) { + if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) + } else link(target) + } } } -- cgit v1.2.3