aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-03 11:16:55 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-05-03 11:16:55 -0700
commitb545d752195f6dcba4c512b8a1d5bf5b74279dc8 (patch)
treecb79123e24f401eb3a8425ddbe7bc3284c9b6fef
parent4ad492c40358d0104db508db98ce0971114b6817 (diff)
downloadspark-b545d752195f6dcba4c512b8a1d5bf5b74279dc8.tar.gz
spark-b545d752195f6dcba4c512b8a1d5bf5b74279dc8.tar.bz2
spark-b545d752195f6dcba4c512b8a1d5bf5b74279dc8.zip
[SPARK-14860][TESTS] Create a new Waiter in reset to bypass an issue of ScalaTest's Waiter.wait
## What changes were proposed in this pull request? This PR updates `QueryStatusCollector.reset` to create Waiter instead of calling `await(1 milliseconds)` to bypass an ScalaTest's issue that Waiter.await may block forever. ## How was this patch tested? I created a local stress test to call codes in `test("event ordering")` 100 times. It cannot pass without this patch. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12623 from zsxwing/flaky-test.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala12
1 files changed, 3 insertions, 9 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index 3498fe83d0..2596231a12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.util
import java.util.concurrent.ConcurrentLinkedQueue
-import scala.util.control.NonFatal
-
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._
import org.scalatest.concurrent.AsyncAssertions.Waiter
@@ -164,8 +162,8 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
}
class QueryStatusCollector extends ContinuousQueryListener {
-
- private val asyncTestWaiter = new Waiter // to catch errors in the async listener events
+ // to catch errors in the async listener events
+ @volatile private var asyncTestWaiter = new Waiter
@volatile var startStatus: QueryStatus = null
@volatile var terminationStatus: QueryStatus = null
@@ -175,11 +173,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
startStatus = null
terminationStatus = null
progressStatuses.clear()
-
- // To reset the waiter
- try asyncTestWaiter.await(timeout(1 milliseconds)) catch {
- case NonFatal(e) =>
- }
+ asyncTestWaiter = new Waiter
}
def checkAsyncErrors(): Unit = {