aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala25
2 files changed, 38 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index a2153d27e9..4207013c3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -75,6 +75,19 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
}
}
+ /**
+ * Override the parent `postToAll` to remove the query id from `activeQueryRunIds` after all
+ * the listeners process `QueryTerminatedEvent`. (SPARK-19594)
+ */
+ override def postToAll(event: Event): Unit = {
+ super.postToAll(event)
+ event match {
+ case t: QueryTerminatedEvent =>
+ activeQueryRunIds.synchronized { activeQueryRunIds -= t.runId }
+ case _ =>
+ }
+ }
+
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: StreamingQueryListener.Event =>
@@ -112,7 +125,6 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
case queryTerminated: QueryTerminatedEvent =>
if (shouldReport(queryTerminated.runId)) {
listener.onQueryTerminated(queryTerminated)
- activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
}
case _ =>
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 4596aa1d34..eb09b9ffcf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -133,6 +133,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
+ val df = MemoryStream[Int].toDS().as[Long]
+ val listeners = (1 to 5).map(_ => new EventCollector)
+ try {
+ listeners.foreach(listener => spark.streams.addListener(listener))
+ testStream(df, OutputMode.Append)(
+ StartStream(),
+ StopStream,
+ AssertOnQuery { query =>
+ eventually(Timeout(streamingTimeout)) {
+ listeners.foreach(listener => assert(listener.terminationEvent !== null))
+ listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
+ listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
+ listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
+ }
+ listeners.foreach(listener => listener.checkAsyncErrors())
+ listeners.foreach(listener => listener.reset())
+ true
+ }
+ )
+ } finally {
+ listeners.foreach(spark.streams.removeListener)
+ }
+ }
+
test("adding and removing listener") {
def isListenerActive(listener: EventCollector): Boolean = {
listener.reset()