From 48c677ceb3177d93e700b399c00af6b8bb6419e4 Mon Sep 17 00:00:00 2001 From: Rich Dougherty Date: Tue, 2 Jul 2013 21:13:12 +1200 Subject: SI-7336 - Link flatMapped promises to avoid memory leaks --- bincompat-forward.whitelist.conf | 8 + src/library/scala/concurrent/Future.scala | 9 +- src/library/scala/concurrent/impl/Promise.scala | 212 +++++++++++-- test/files/run/t7336.scala | 31 ++ .../scala/concurrent/impl/DefaultPromiseTest.scala | 344 +++++++++++++++++++++ 5 files changed, 580 insertions(+), 24 deletions(-) create mode 100644 test/files/run/t7336.scala create mode 100644 test/junit/scala/concurrent/impl/DefaultPromiseTest.scala diff --git a/bincompat-forward.whitelist.conf b/bincompat-forward.whitelist.conf index 284e0809a8..9562edb9c6 100644 --- a/bincompat-forward.whitelist.conf +++ b/bincompat-forward.whitelist.conf @@ -472,6 +472,14 @@ filter { matchName="scala.concurrent.impl.Promise$CompletionLatch" problemName=MissingClassProblem }, + { + matchName="scala.concurrent.impl.Promise#DefaultPromise.linkRootOf" + problemName=MissingMethodProblem + }, + { + matchName="scala.concurrent.impl.Promise#DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback" + problemName=MissingMethodProblem + }, { matchName="scala.reflect.internal.Definitions#DefinitionsClass.getClassMethods" problemName=MissingMethodProblem diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index bc3a241ce7..caf91fbb0f 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -247,10 +247,15 @@ trait Future[+T] extends Awaitable[T] { * $forComprehensionExamples */ def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { - val p = Promise[S]() + import impl.Promise.DefaultPromise + val p = new DefaultPromise[S]() onComplete { case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - case Success(v) => try f(v) onComplete p.complete catch { case NonFatal(t) => p failure t } + case Success(v) => try f(v) match { + // If possible, link DefaultPromises to avoid space leaks + case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p) + case fut => fut onComplete p.complete + } catch { case NonFatal(t) => p failure t } } p.future } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index ed4039e2a5..c9b2a15f2f 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -55,8 +55,10 @@ private[concurrent] object Promise { case e: Error => Failure(new ExecutionException("Boxed Error", e)) case t => Failure(t) } - - /* + + /** + * Latch used to implement waiting on a DefaultPromise's result. + * * Inspired by: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at @@ -73,10 +75,122 @@ private[concurrent] object Promise { /** Default promise implementation. + * + * A DefaultPromise has three possible states. It can be: + * + * 1. Incomplete, with an associated list of callbacks waiting on completion. + * 2. Complete, with a result. + * 3. Linked to another DefaultPromise. + * + * If a DefaultPromise is linked it another DefaultPromise then it will + * delegate all its operations to that other promise. This means that two + * DefaultPromises that are linked will appear, to external callers, to have + * exactly the same state and behaviour. E.g. they will both appear to be + * either complete or incomplete, and with the same values. + * + * A DefaultPromise stores its state entirely in the AnyRef cell exposed by + * AbstractPromise. The type of object stored in the cell fully describes the + * current state of the promise. + * + * 1. List[CallbackRunnable] - The promise is incomplete and has zero or more callbacks + * to call when it is eventually completed. + * 2. Try[T] - The promise is complete and now contains its value. + * 3. DefaultPromise[T] - The promise is linked to another promise. + * + * The ability to link DefaultPromises is needed to prevent memory leaks when + * using Future.flatMap. The previous implementation of Future.flatMap used + * onComplete handlers to propagate the ultimate value of a flatMap operation + * to its promise. Recursive calls to flatMap built a chain of onComplete + * handlers and promises. Unfortunately none of the handlers or promises in + * the chain could be collected until the handlers had been called and + * detached, which only happened when the final flatMap future was completed. + * (In some situations, such as infinite streams, this would never actually + * happen.) Because of the fact that the promise implementation internally + * created references between promises, and these references were invisible to + * user code, it was easy for user code to accidentally build large chains of + * promises and thereby leak memory. + * + * The problem of leaks is solved by automatically breaking these chains of + * promises, so that promises don't refer to each other in a long chain. This + * allows each promise to be individually collected. The idea is to "flatten" + * the chain of promises, so that instead of each promise pointing to its + * neighbour, they instead point directly the promise at the root of the + * chain. This means that only the root promise is referenced, and all the + * other promises are available for garbage collection as soon as they're no + * longer referenced by user code. + * + * To make the chains flattenable, the concept of linking promises together + * needed to become an explicit feature of the DefaultPromise implementation, + * so that the implementation to navigate and rewire links as needed. The idea + * of linking promises is based on the [[Twitter promise implementation + * https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Promise.scala]]. + * + * In practice, flattening the chain cannot always be done perfectly. When a + * promise is added to the end of the chain, it scans the chain and links + * directly to the root promise. This prevents the chain from growing forwards + * But the root promise for a chain can change, causing the chain to grow + * backwards, and leaving all previously-linked promise pointing at a promise + * which is no longer the root promise. + * + * To mitigate the problem of the root promise changing, whenever a promise's + * methods are called, and it needs a reference to its root promise it calls + * the `compressedRoot()` method. This method re-scans the promise chain to + * get the root promise, and also compresses its links so that it links + * directly to whatever the current root promise is. This ensures that the + * chain is flattened whenever `compressedRoot()` is called. And since + * `compressedRoot()` is called at every possible opportunity (when getting a + * promise's value, when adding an onComplete handler, etc), this will happen + * frequently. Unfortunately, even this eager relinking doesn't absolutely + * guarantee that the chain will be flattened and that leaks cannot occur. + * However eager relinking does greatly reduce the chance that leaks will + * occur. + * + * Future.flatMap links DefaultPromises together by calling the `linkRootOf` + * method. This is the only externally visible interface to linked + * DefaultPromises, and `linkedRootOf` is currently only designed to be called + * by Future.flatMap. */ class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => - updateState(null, Nil) // Start at "No callbacks" + updateState(null, Nil) // The promise is incomplete and has no callbacks + + /** Get the root promise for this promise, compressing the link chain to that + * promise if necessary. + * + * For promises that are not linked, the result of calling + * `compressedRoot()` will the promise itself. However for linked promises, + * this method will traverse each link until it locates the root promise at + * the base of the link chain. + * + * As a side effect of calling this method, the link from this promise back + * to the root promise will be updated ("compressed") to point directly to + * the root promise. This allows intermediate promises in the link chain to + * be garbage collected. Also, subsequent calls to this method should be + * faster as the link chain will be shorter. + */ + @tailrec + private def compressedRoot(): DefaultPromise[T] = { + getState match { + case linked: DefaultPromise[_] => + val target = linked.asInstanceOf[DefaultPromise[T]].root + if (linked eq target) target else if (updateState(linked, target)) target else compressedRoot() + case _ => this + } + } + + /** Get the promise at the root of the chain of linked promises. Used by `compressedRoot()`. + * The `compressedRoot()` method should be called instead of this method, as it is important + * to compress the link chain whenever possible. + */ + @tailrec + private def root: DefaultPromise[T] = { + getState match { + case linked: DefaultPromise[_] => linked.asInstanceOf[DefaultPromise[T]].root + case _ => this + } + } + /** Try waiting for this promise to be completed. + */ protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) { import Duration.Undefined import scala.concurrent.Future.InternalCallbackExecutor @@ -108,42 +222,96 @@ private[concurrent] object Promise { def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here - def value: Option[Try[T]] = getState match { + def value: Option[Try[T]] = value0 + + @tailrec + private def value0: Option[Try[T]] = getState match { case c: Try[_] => Some(c.asInstanceOf[Try[T]]) + case _: DefaultPromise[_] => compressedRoot().value0 case _ => None } - override def isCompleted: Boolean = getState.isInstanceOf[Try[_]] + override def isCompleted: Boolean = isCompleted0 + + @tailrec + private def isCompleted0: Boolean = getState match { + case _: Try[_] => true + case _: DefaultPromise[_] => compressedRoot().isCompleted0 + case _ => false + } def tryComplete(value: Try[T]): Boolean = { val resolved = resolveTry(value) - @tailrec - def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = { - getState match { - case raw: List[_] => - val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] - if (updateState(cur, v)) cur else tryComplete(v) - case _ => null - } - } - tryComplete(resolved) match { + tryCompleteAndGetListeners(resolved) match { case null => false case rs if rs.isEmpty => true case rs => rs.foreach(r => r.executeWithValue(resolved)); true } } + /** Called by `tryComplete` to store the resolved value and get the list of + * listeners, or `null` if it is already completed. + */ + @tailrec + private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = { + getState match { + case raw: List[_] => + val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] + if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v) + case _: DefaultPromise[_] => + compressedRoot().tryCompleteAndGetListeners(v) + case _ => null + } + } + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val preparedEC = executor.prepare val runnable = new CallbackRunnable[T](preparedEC, func) + dispatchOrAddCallback(runnable) + } + + /** Tries to add the callback, if already completed, it dispatches the callback to be executed. + * Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks + * to the root promise when linking two promises togehter. + */ + @tailrec + private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = { + getState match { + case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) + case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable) + case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable) + } + } + + /** Link this promise to the root of another promise using `link()`. Should only be + * be called by Future.flatMap. + */ + protected[concurrent] final def linkRootOf(target: DefaultPromise[T]): Unit = link(target.compressedRoot()) - @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed - def dispatchOrAddCallback(): Unit = - getState match { - case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) - case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback() - } - dispatchOrAddCallback() + /** Link this promise to another promise so that both promises share the same + * externally-visible state. Depending on the current state of this promise, this + * may involve different things. For example, any onComplete listeners will need + * to be transferred. + * + * If this promise is already completed, then the same effect as linking - + * sharing the same completed value - is achieved by simply sending this + * promise's result to the target promise. + */ + @tailrec + private def link(target: DefaultPromise[T]): Unit = if (this ne target) { + getState match { + case r: Try[_] => + if (!target.tryComplete(r.asInstanceOf[Try[T]])) { + // Currently linking is done from Future.flatMap, which should ensure only + // one promise can be completed. Therefore this situation is unexpected. + throw new IllegalStateException("Cannot link completed promises together") + } + case _: DefaultPromise[_] => + compressedRoot().link(target) + case listeners: List[_] => if (updateState(listeners, target)) { + if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) + } else link(target) + } } } diff --git a/test/files/run/t7336.scala b/test/files/run/t7336.scala new file mode 100644 index 0000000000..ace83f2c1f --- /dev/null +++ b/test/files/run/t7336.scala @@ -0,0 +1,31 @@ +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +/** This test uses recursive calls to Future.flatMap to create arrays whose + * combined size is slightly greater than the JVM heap size. A previous + * implementation of Future.flatMap would retain references to each array, + * resulting in a speedy OutOfMemoryError. Now, each array should be freed soon + * after it is created and the test should complete without problems. + */ +object Test { + def main(args: Array[String]) { + def loop(i: Int, arraySize: Int): Future[Unit] = { + val array = new Array[Byte](arraySize) + Future.successful(i).flatMap { i => + if (i == 0) { + Future.successful(()) + } else { + array.size // Force closure to refer to array + loop(i - 1, arraySize) + } + + } + } + + val arraySize = 1000000 + val tooManyArrays = (Runtime.getRuntime().totalMemory() / arraySize).toInt + 1 + Await.ready(loop(tooManyArrays, arraySize), Duration.Inf) + } +} \ No newline at end of file diff --git a/test/junit/scala/concurrent/impl/DefaultPromiseTest.scala b/test/junit/scala/concurrent/impl/DefaultPromiseTest.scala new file mode 100644 index 0000000000..f3a75e24d0 --- /dev/null +++ b/test/junit/scala/concurrent/impl/DefaultPromiseTest.scala @@ -0,0 +1,344 @@ +package scala.concurrent.impl + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import org.junit.Assert._ +import org.junit.{ After, Before, Test } +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext +import scala.concurrent.impl.Promise.DefaultPromise +import scala.util.{ Failure, Success, Try } +import scala.util.control.NonFatal + +/** Tests for the private class DefaultPromise */ +@RunWith(classOf[JUnit4]) +class DefaultPromiseTest { + + // Many tests in this class use a helper class, Tester, to track the state of + // promises and to ensure they behave correctly, particularly the complex behaviour + // of linking. + + type Result = Int + type PromiseId = Int + type HandlerId = Int + type ChainId = Int + + /** The state of a set of set of linked promises. */ + case class Chain( + promises: Set[PromiseId], + state: Either[Set[HandlerId],Try[Result]] + ) + + /** A helper class that provides methods for creating, linking, completing and + * adding handlers to promises. With each operation it verifies that handlers + * are called, any expected exceptions are thrown, and that all promises have + * the expected value. + * + * The links between promises are not tracked precisely. Instead, linked promises + * are placed in the same Chain object. Each link in the same chain will share + * the same value. + */ + class Tester { + var promises = Map.empty[PromiseId, DefaultPromise[Result]] + var chains = Map.empty[ChainId, Chain] + + private var counter = 0 + private def freshId(): Int = { + val id = counter + counter += 1 + id + } + + /** Handlers report their activity on this queue */ + private val handlerQueue = new ConcurrentLinkedQueue[(Try[Result], HandlerId)]() + + /** Get the chain for a given promise */ + private def promiseChain(p: PromiseId): Option[(ChainId, Chain)] = { + val found: Iterable[(ChainId, Chain)] = for ((cid, c) <- chains; p0 <- c.promises; if (p0 == p)) yield ((cid, c)) + found.toList match { + case Nil => None + case x::Nil => Some(x) + case _ => throw new IllegalStateException(s"Promise $p found in more than one chain") + } + } + + /** Passed to `checkEffect` to indicate the expected effect of an operation */ + sealed trait Effect + case object NoEffect extends Effect + case class HandlersFired(result: Try[Result], handlers: Set[HandlerId]) extends Effect + case object MaybeIllegalThrown extends Effect + case object IllegalThrown extends Effect + + /** Runs an operation while verifying that the operation has the expected effect */ + private def checkEffect(expected: Effect)(f: => Any) { + assert(handlerQueue.isEmpty()) // Should have been cleared by last usage + val result = Try(f) + + var fireCounts = Map.empty[(Try[Result], HandlerId), Int] + while (!handlerQueue.isEmpty()) { + val key = handlerQueue.poll() + val newCount = fireCounts.getOrElse(key, 0) + 1 + fireCounts = fireCounts.updated(key, newCount) + } + + def assertIllegalResult = result match { + case Failure(e: IllegalStateException) => () + case _ => fail(s"Expected IllegalStateException: $result") + } + + expected match { + case NoEffect => + assertTrue(s"Shouldn't throw exception: $result", result.isSuccess) + assertEquals(Map.empty[(Try[Result], HandlerId), Int], fireCounts) + case HandlersFired(firingResult, handlers) => + assert(result.isSuccess) + val expectedCounts = handlers.foldLeft(Map.empty[(Try[Result], HandlerId), Int]) { + case (map, hid) => map.updated((firingResult, hid), 1) + } + assertEquals(expectedCounts, fireCounts) + case MaybeIllegalThrown => + if (result.isFailure) assertIllegalResult + assertEquals(Map.empty, fireCounts) + case IllegalThrown => + assertIllegalResult + assertEquals(Map.empty, fireCounts) + } + } + + /** Check each promise has the expected value. */ + private def assertPromiseValues() { + for ((cid, chain) <- chains; p <- chain.promises) { + chain.state match { + case Right(result) => assertEquals(Some(result), promises(p).value) + case Left(_) => () + } + } + } + + /** Create a promise, returning a handle. */ + def newPromise(): PromiseId = { + val pid = freshId() + val cid = freshId() + promises = promises.updated(pid, new DefaultPromise[Result]()) + chains = chains.updated(cid, Chain(Set(pid), Left(Set.empty))) + assertPromiseValues() + pid + } + + /** Complete a promise */ + def complete(p: PromiseId) { + val r = Success(freshId()) + val (cid, chain) = promiseChain(p).get + val (completionEffect, newState) = chain.state match { + case Left(handlers) => (HandlersFired(r, handlers), Right(r)) + case Right(completion) => (IllegalThrown, chain.state) + } + checkEffect(completionEffect) { promises(p).complete(r) } + chains = chains.updated(cid, chain.copy(state = newState)) + assertPromiseValues() + } + + /** Attempt to link two promises together */ + def link(a: PromiseId, b: PromiseId): (ChainId, ChainId) = { + val promiseA = promises(a) + val promiseB = promises(b) + val (cidA, chainA) = promiseChain(a).get + val (cidB, chainB) = promiseChain(b).get + + // Examine the state of each promise's chain to work out + // the effect of linking the promises, and to work out + // if the two chains should be merged. + + sealed trait MergeOp + case object NoMerge extends MergeOp + case class Merge(state: Either[Set[HandlerId],Try[Result]]) extends MergeOp + + val (linkEffect, mergeOp) = (chainA.state, chainB.state) match { + case (Left(handlers1), Left(handlers2)) => + (NoEffect, Merge(Left(handlers1 ++ handlers2))) + case (Left(handlers), Right(result)) => + (HandlersFired(result, handlers), Merge(Right(result))) + case (Right(result), Left(handlers)) => + (HandlersFired(result, handlers), Merge(Right(result))) + case (Right(_), Right(_)) if (cidA == cidB) => + (MaybeIllegalThrown, NoMerge) // Won't be thrown if happen to link a promise to itself + case (Right(_), Right(_)) => + (IllegalThrown, NoMerge) + } + + // Perform the linking and merge the chains, if appropriate + + checkEffect(linkEffect) { promiseA.linkRootOf(promiseB) } + + val (newCidA, newCidB) = mergeOp match { + case NoMerge => (cidA, cidB) + case Merge(newState) => { + chains = chains - cidA + chains = chains - cidB + val newCid = freshId() + chains = chains.updated(newCid, Chain(chainA.promises ++ chainB.promises, newState)) + (newCid, newCid) + } + } + assertPromiseValues() + (newCidA, newCidB) + } + + /** Attach an onComplete handler. When called, the handler will + * place an entry into `handlerQueue` with the handler's identity. + * This allows verification of handler calling semantics. + */ + def attachHandler(p: PromiseId): HandlerId = { + val hid = freshId() + val promise = promises(p) + val (cid, chain) = promiseChain(p).get + val (attachEffect, newState) = chain.state match { + case Left(handlers) => + (NoEffect, Left(handlers + hid)) + case Right(result) => + (HandlersFired(result, Set(hid)), Right(result)) + } + implicit val ec = new ExecutionContext { + def execute(r: Runnable) { r.run() } + def reportFailure(t: Throwable) { t.printStackTrace() } + } + + checkEffect(attachEffect) { promise.onComplete(result => handlerQueue.add((result, hid))) } + chains = chains.updated(cid, chain.copy(state = newState)) + assertPromiseValues() + hid + } + } + + // Some methods and objects that build a list of promise + // actions to test and then execute them + + type PromiseKey = Int + + sealed trait Action + case class Complete(p: PromiseKey) extends Action + case class Link(a: PromiseKey, b: PromiseKey) extends Action + case class AttachHandler(p: PromiseKey) extends Action + + /** Tests a sequence of actions on a Tester. Creates promises as needed. */ + private def testActions(actions: Seq[Action]) { + val t = new Tester() + var pMap = Map.empty[PromiseKey, PromiseId] + def byKey(key: PromiseKey): PromiseId = { + if (!pMap.contains(key)) { + pMap = pMap.updated(key, t.newPromise()) + } + pMap(key) + } + + actions foreach { action => + action match { + case Complete(p) => t.complete(byKey(p)) + case Link(a, b) => t.link(byKey(a), byKey(b)) + case AttachHandler(p) => t.attachHandler(byKey(p)) + } + } + } + + /** Tests all permutations of actions for `count` promises */ + private def testPermutations(count: Int) { + val ps = (0 until count).toList + val pPairs = for (a <- ps; b <- ps) yield (a, b) + + var allActions = ps.map(Complete(_)) ++ pPairs.map { case (a, b) => Link(a, b) } ++ ps.map(AttachHandler(_)) + for ((permutation, i) <- allActions.permutations.zipWithIndex) { + testActions(permutation) + } + } + + /** Test all permutations of actions with a single promise */ + @Test + def testPermutations1 { + testPermutations(1) + } + + /** Test all permutations of actions with two promises - about 40 thousand */ + @Test + def testPermutations2 { + testPermutations(2) + } + + /** Link promises in different orders, using the same link structure as is + * used in Future.flatMap */ + @Test + def simulateFlatMapLinking { + val random = new scala.util.Random(1) + for (_ <- 0 until 10) { + val t = new Tester() + val flatMapCount = 100 + + sealed trait FlatMapEvent + case class Link(a: PromiseId, b: PromiseId) extends FlatMapEvent + case class Complete(p: PromiseId) extends FlatMapEvent + + @tailrec + def flatMapEvents(count: Int, p1: PromiseId, acc: List[FlatMapEvent]): List[FlatMapEvent] = { + if (count == 0) { + Complete(p1)::acc + } else { + val p2 = t.newPromise() + flatMapEvents(count - 1, p2, Link(p2, p1)::acc) + } + } + + val events = flatMapEvents(flatMapCount, t.newPromise(), Nil) + assertEquals(flatMapCount + 1, t.chains.size) // All promises are unlinked + val shuffled = random.shuffle(events) + shuffled foreach { + case Link(a, b) => t.link(a, b) + case Complete(p) => t.complete(p) + } + // All promises should be linked together, no matter the order of their linking + assertEquals(1, t.chains.size) + } + } + + /** Link promises together on more than one thread, using the same link + * structure as is used in Future.flatMap */ + @Test + def testFlatMapLinking { + for (_ <- 0 until 100) { + val flatMapCount = 100 + val startLatch = new CountDownLatch(1) + val doneLatch = new CountDownLatch(flatMapCount + 1) + def execute(f: => Unit) { + val ec = ExecutionContext.global + ec.execute(new Runnable { + def run() { + try { + startLatch.await() + f + doneLatch.countDown() + } catch { + case NonFatal(e) => ec.reportFailure(e) + } + } + }) + } + @tailrec + def flatMapTimes(count: Int, p1: DefaultPromise[Int]) { + if (count == 0) { + execute { p1.success(1) } + } else { + val p2 = new DefaultPromise[Int]() + execute { p2.linkRootOf(p1) } + flatMapTimes(count - 1, p2) + } + } + + val p = new DefaultPromise[Int]() + flatMapTimes(flatMapCount, p) + startLatch.countDown() + doneLatch.await() + assertEquals(Some(Success(1)), p.value) + } + } + +} -- cgit v1.2.3