aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala25
1 files changed, 25 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 4596aa1d34..eb09b9ffcf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -133,6 +133,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
+ val df = MemoryStream[Int].toDS().as[Long]
+ val listeners = (1 to 5).map(_ => new EventCollector)
+ try {
+ listeners.foreach(listener => spark.streams.addListener(listener))
+ testStream(df, OutputMode.Append)(
+ StartStream(),
+ StopStream,
+ AssertOnQuery { query =>
+ eventually(Timeout(streamingTimeout)) {
+ listeners.foreach(listener => assert(listener.terminationEvent !== null))
+ listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
+ listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
+ listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
+ }
+ listeners.foreach(listener => listener.checkAsyncErrors())
+ listeners.foreach(listener => listener.reset())
+ true
+ }
+ )
+ } finally {
+ listeners.foreach(spark.streams.removeListener)
+ }
+ }
+
test("adding and removing listener") {
def isListenerActive(listener: EventCollector): Boolean = {
listener.reset()