summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Klang <viktor.klang@gmail.com>2014-12-20 19:51:43 +0100
committerJason Zaugg <jzaugg@gmail.com>2015-02-04 14:04:05 +1000
commitbf20737faa2da5b45ad1ef5e6a43dff307c99788 (patch)
tree235f68bfa6b187f2e7d9ad12a84610bdec41c8f4
parentad0ddd4603e6ec134460491333444d505d376883 (diff)
downloadscala-bf20737faa2da5b45ad1ef5e6a43dff307c99788.tar.gz
scala-bf20737faa2da5b45ad1ef5e6a43dff307c99788.tar.bz2
scala-bf20737faa2da5b45ad1ef5e6a43dff307c99788.zip
SI-8689 Avoid internal error in Promise after sequence of completions
Calling `completeWith` when the `DefaultPromise` is already completed, leads to callbacks not being properly executed. This happened because `Future.InternalCallbackExecutor` extends `BatchingExecutor`[1] which assumes `unbatchedExecute` to be async, when in this case it is sync, and if there is an exception thrown by executing the batch, it creates a new batch with the remaining items from the current batch and submits that to `unbatchedExecute` and then rethrows, but if you have a sync `unbatchedExecute`, it will fail since it is not reentrant, as witnessed by the failed `require` as reported in this issue. This commit avoids problem by delegating `completeWith` to `tryComplete`, which has the effect of using `onComplete` + `tryComplete` i.s.o. `complete`, which means that when it fails (because of a benign race condition between completers) it won't throw an exception. It has been tested by the minimized reproducer. [1] Actually, in the 2.10.x branch where this patch is starting out, "The BatchingExecutor trait had to be inlined into InternalCallbackExecutor for binary compatibility.". This comment will be more literally correct in the context of 2.11.x and beyond
-rw-r--r--src/library/scala/concurrent/Promise.scala7
-rw-r--r--test/files/jvm/future-spec/PromiseTests.scala77
-rw-r--r--test/files/jvm/t8689.check1
-rw-r--r--test/files/jvm/t8689.scala13
4 files changed, 84 insertions, 14 deletions
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 8355a73a1f..02253d4bd9 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -66,11 +66,8 @@ trait Promise[T] {
*
* @return This promise
*/
- final def completeWith(other: Future[T]): this.type = {
- other onComplete { this complete _ }
- this
- }
-
+ final def completeWith(other: Future[T]): this.type = tryCompleteWith(other)
+
/** Attempts to complete this promise with the specified future, once that future is completed.
*
* @return This promise
diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala
index 48f94666ba..3b20a96502 100644
--- a/test/files/jvm/future-spec/PromiseTests.scala
+++ b/test/files/jvm/future-spec/PromiseTests.scala
@@ -43,21 +43,80 @@ object PromiseTests extends MinimalScalaTest {
Await.result(failure fallbackTo otherFailure, defaultTimeout)
}.getMessage mustBe ("br0ken")
}
-
+
+ "be completable with a completed Promise" in {
+ {
+ val p = Promise[String]()
+ p.tryCompleteWith(Promise[String]().success("foo").future)
+ Await.result(p.future, defaultTimeout) mustBe ("foo")
+ }
+ {
+ val p = Promise[String]()
+ p.completeWith(Promise[String]().success("foo").future)
+ Await.result(p.future, defaultTimeout) mustBe ("foo")
+ }
+ {
+ val p = Promise[String]()
+ p.tryCompleteWith(Promise[String]().failure(new RuntimeException("br0ken")).future)
+ intercept[RuntimeException] {
+ Await.result(p.future, defaultTimeout)
+ }.getMessage mustBe ("br0ken")
+ }
+ {
+ val p = Promise[String]()
+ p.tryCompleteWith(Promise[String]().failure(new RuntimeException("br0ken")).future)
+ intercept[RuntimeException] {
+ Await.result(p.future, defaultTimeout)
+ }.getMessage mustBe ("br0ken")
+ }
+ }
}
"A successful Promise" should {
- val result = "test value"
- val promise = Promise[String]().complete(Success(result))
- promise.isCompleted mustBe (true)
- futureWithResult(_(promise.future, result))
+ "be completed" in {
+ val result = "test value"
+ val promise = Promise[String]().complete(Success(result))
+ promise.isCompleted mustBe (true)
+ futureWithResult(_(promise.future, result))
+ }
+
+ "not be completable with a completed Promise" in {
+ {
+ val p = Promise.successful("bar")
+ p.tryCompleteWith(Promise[String]().success("foo").future)
+ Await.result(p.future, defaultTimeout) mustBe ("bar")
+ }
+ {
+ val p = Promise.successful("bar")
+ p.completeWith(Promise[String]().success("foo").future)
+ Await.result(p.future, defaultTimeout) mustBe ("bar")
+ }
+ }
}
"A failed Promise" should {
- val message = "Expected Exception"
- val promise = Promise[String]().complete(Failure(new RuntimeException(message)))
- promise.isCompleted mustBe (true)
- futureWithException[RuntimeException](_(promise.future, message))
+ "be completed" in {
+ val message = "Expected Exception"
+ val promise = Promise[String]().complete(Failure(new RuntimeException(message)))
+ promise.isCompleted mustBe (true)
+ futureWithException[RuntimeException](_(promise.future, message))
+ }
+ "not be completable with a completed Promise" in {
+ {
+ val p = Promise[String]().failure(new RuntimeException("unbr0ken"))
+ p.tryCompleteWith(Promise[String].failure(new Exception("br0ken")).future)
+ intercept[RuntimeException] {
+ Await.result(p.future, defaultTimeout)
+ }.getMessage mustBe ("unbr0ken")
+ }
+ {
+ val p = Promise[String]().failure(new RuntimeException("unbr0ken"))
+ p.completeWith(Promise[String]().failure(new Exception("br0ken")).future)
+ intercept[RuntimeException] {
+ Await.result(p.future, defaultTimeout)
+ }.getMessage mustBe ("unbr0ken")
+ }
+ }
}
"An interrupted Promise" should {
diff --git a/test/files/jvm/t8689.check b/test/files/jvm/t8689.check
new file mode 100644
index 0000000000..2e9ba477f8
--- /dev/null
+++ b/test/files/jvm/t8689.check
@@ -0,0 +1 @@
+success
diff --git a/test/files/jvm/t8689.scala b/test/files/jvm/t8689.scala
new file mode 100644
index 0000000000..ef43a1df63
--- /dev/null
+++ b/test/files/jvm/t8689.scala
@@ -0,0 +1,13 @@
+object Test {
+ def main(args: Array[String]): Unit = {
+ import scala.concurrent._
+ import ExecutionContext.Implicits.global
+ val source1 = Promise[Int]()
+ val source2 = Promise[Int]()
+ source2.completeWith(source1.future).future.onComplete {
+ case _ => print("success")
+ }
+ source2.tryFailure(new TimeoutException)
+ source1.success(123)
+ }
+} \ No newline at end of file