aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-02-21 15:32:49 -0800
committerReynold Xin <rxin@databricks.com>2016-02-21 15:32:49 -0800
commit76bd98d914ecdf6ff0d73d9d8d221fe3403f8627 (patch)
tree7a3ac51974024113dae378365066af21935609ef /sql
parent03e62aa3f6e16a271262c786be3d1542af79d3e4 (diff)
downloadspark-76bd98d914ecdf6ff0d73d9d8d221fe3403f8627.tar.gz
spark-76bd98d914ecdf6ff0d73d9d8d221fe3403f8627.tar.bz2
spark-76bd98d914ecdf6ff0d73d9d8d221fe3403f8627.zip
[SPARK-13405][STREAMING][TESTS] Make sure no messages leak to the next test
## What changes were proposed in this pull request? Fixed the test failure `org.apache.spark.sql.util.ContinuousQueryListenerSuite.event ordering`: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.6/202/testReport/junit/org.apache.spark.sql.util/ContinuousQueryListenerSuite/event_ordering/ ``` org.scalatest.exceptions.TestFailedException: Assert failed: : null equaled null onQueryTerminated called before onQueryStarted org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500) org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466) org.apache.spark.sql.util.ContinuousQueryListenerSuite$QueryStatusCollector$$anonfun$onQueryTerminated$1.apply$mcV$sp(ContinuousQueryListenerSuite.scala:204) org.scalatest.concurrent.AsyncAssertions$Waiter.apply(AsyncAssertions.scala:349) org.apache.spark.sql.util.ContinuousQueryListenerSuite$QueryStatusCollector.onQueryTerminated(ContinuousQueryListenerSuite.scala:203) org.apache.spark.sql.execution.streaming.ContinuousQueryListenerBus.doPostEvent(ContinuousQueryListenerBus.scala:67) org.apache.spark.sql.execution.streaming.ContinuousQueryListenerBus.doPostEvent(ContinuousQueryListenerBus.scala:32) org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) org.apache.spark.sql.execution.streaming.ContinuousQueryListenerBus.postToAll(ContinuousQueryListenerBus.scala:32) ``` In the previous codes, when the test `adding and removing listener` finishes, there may be still some QueryTerminated events in the listener bus queue. Then when `event ordering` starts to run, it may see these events and throw the above exception. This PR just added `waitUntilEmpty` in `after` to make sure all events be consumed after each test. ## How was the this patch tested? Jenkins tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11275 from zsxwing/SPARK-13405.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index d6cc6ad86b..52783281ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -41,6 +41,8 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
sqlContext.streams.active.foreach(_.stop())
assert(sqlContext.streams.active.isEmpty)
assert(addedListeners.isEmpty)
+ // Make sure we don't leak any events to the next test
+ sqlContext.sparkContext.listenerBus.waitUntilEmpty(10000)
}
test("single listener") {