From bf20737faa2da5b45ad1ef5e6a43dff307c99788 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 20 Dec 2014 19:51:43 +0100 Subject: SI-8689 Avoid internal error in Promise after sequence of completions Calling `completeWith` when the `DefaultPromise` is already completed, leads to callbacks not being properly executed. This happened because `Future.InternalCallbackExecutor` extends `BatchingExecutor`[1] which assumes `unbatchedExecute` to be async, when in this case it is sync, and if there is an exception thrown by executing the batch, it creates a new batch with the remaining items from the current batch and submits that to `unbatchedExecute` and then rethrows, but if you have a sync `unbatchedExecute`, it will fail since it is not reentrant, as witnessed by the failed `require` as reported in this issue. This commit avoids problem by delegating `completeWith` to `tryComplete`, which has the effect of using `onComplete` + `tryComplete` i.s.o. `complete`, which means that when it fails (because of a benign race condition between completers) it won't throw an exception. It has been tested by the minimized reproducer. [1] Actually, in the 2.10.x branch where this patch is starting out, "The BatchingExecutor trait had to be inlined into InternalCallbackExecutor for binary compatibility.". This comment will be more literally correct in the context of 2.11.x and beyond --- src/library/scala/concurrent/Promise.scala | 7 +-- test/files/jvm/future-spec/PromiseTests.scala | 77 +++++++++++++++++++++++---- test/files/jvm/t8689.check | 1 + test/files/jvm/t8689.scala | 13 +++++ 4 files changed, 84 insertions(+), 14 deletions(-) create mode 100644 test/files/jvm/t8689.check create mode 100644 test/files/jvm/t8689.scala diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 8355a73a1f..02253d4bd9 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -66,11 +66,8 @@ trait Promise[T] { * * @return This promise */ - final def completeWith(other: Future[T]): this.type = { - other onComplete { this complete _ } - this - } - + final def completeWith(other: Future[T]): this.type = tryCompleteWith(other) + /** Attempts to complete this promise with the specified future, once that future is completed. * * @return This promise diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala index 48f94666ba..3b20a96502 100644 --- a/test/files/jvm/future-spec/PromiseTests.scala +++ b/test/files/jvm/future-spec/PromiseTests.scala @@ -43,21 +43,80 @@ object PromiseTests extends MinimalScalaTest { Await.result(failure fallbackTo otherFailure, defaultTimeout) }.getMessage mustBe ("br0ken") } - + + "be completable with a completed Promise" in { + { + val p = Promise[String]() + p.tryCompleteWith(Promise[String]().success("foo").future) + Await.result(p.future, defaultTimeout) mustBe ("foo") + } + { + val p = Promise[String]() + p.completeWith(Promise[String]().success("foo").future) + Await.result(p.future, defaultTimeout) mustBe ("foo") + } + { + val p = Promise[String]() + p.tryCompleteWith(Promise[String]().failure(new RuntimeException("br0ken")).future) + intercept[RuntimeException] { + Await.result(p.future, defaultTimeout) + }.getMessage mustBe ("br0ken") + } + { + val p = Promise[String]() + p.tryCompleteWith(Promise[String]().failure(new RuntimeException("br0ken")).future) + intercept[RuntimeException] { + Await.result(p.future, defaultTimeout) + }.getMessage mustBe ("br0ken") + } + } } "A successful Promise" should { - val result = "test value" - val promise = Promise[String]().complete(Success(result)) - promise.isCompleted mustBe (true) - futureWithResult(_(promise.future, result)) + "be completed" in { + val result = "test value" + val promise = Promise[String]().complete(Success(result)) + promise.isCompleted mustBe (true) + futureWithResult(_(promise.future, result)) + } + + "not be completable with a completed Promise" in { + { + val p = Promise.successful("bar") + p.tryCompleteWith(Promise[String]().success("foo").future) + Await.result(p.future, defaultTimeout) mustBe ("bar") + } + { + val p = Promise.successful("bar") + p.completeWith(Promise[String]().success("foo").future) + Await.result(p.future, defaultTimeout) mustBe ("bar") + } + } } "A failed Promise" should { - val message = "Expected Exception" - val promise = Promise[String]().complete(Failure(new RuntimeException(message))) - promise.isCompleted mustBe (true) - futureWithException[RuntimeException](_(promise.future, message)) + "be completed" in { + val message = "Expected Exception" + val promise = Promise[String]().complete(Failure(new RuntimeException(message))) + promise.isCompleted mustBe (true) + futureWithException[RuntimeException](_(promise.future, message)) + } + "not be completable with a completed Promise" in { + { + val p = Promise[String]().failure(new RuntimeException("unbr0ken")) + p.tryCompleteWith(Promise[String].failure(new Exception("br0ken")).future) + intercept[RuntimeException] { + Await.result(p.future, defaultTimeout) + }.getMessage mustBe ("unbr0ken") + } + { + val p = Promise[String]().failure(new RuntimeException("unbr0ken")) + p.completeWith(Promise[String]().failure(new Exception("br0ken")).future) + intercept[RuntimeException] { + Await.result(p.future, defaultTimeout) + }.getMessage mustBe ("unbr0ken") + } + } } "An interrupted Promise" should { diff --git a/test/files/jvm/t8689.check b/test/files/jvm/t8689.check new file mode 100644 index 0000000000..2e9ba477f8 --- /dev/null +++ b/test/files/jvm/t8689.check @@ -0,0 +1 @@ +success diff --git a/test/files/jvm/t8689.scala b/test/files/jvm/t8689.scala new file mode 100644 index 0000000000..ef43a1df63 --- /dev/null +++ b/test/files/jvm/t8689.scala @@ -0,0 +1,13 @@ +object Test { + def main(args: Array[String]): Unit = { + import scala.concurrent._ + import ExecutionContext.Implicits.global + val source1 = Promise[Int]() + val source2 = Promise[Int]() + source2.completeWith(source1.future).future.onComplete { + case _ => print("success") + } + source2.tryFailure(new TimeoutException) + source1.success(123) + } +} \ No newline at end of file -- cgit v1.2.3