aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorEyal Zituny <eyal.zituny@equalum.io>2017-02-26 15:57:32 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-26 15:57:32 -0800
commit9f8e392159ba65decddf62eb3cd85b6821db01b4 (patch)
tree88bcc5a12adcf6248c6f43f04e0abe7c924e1f94 /sql/core/src
parent68f2142cfd2ca632a4afb0cc29cc358edbb21f8d (diff)
downloadspark-9f8e392159ba65decddf62eb3cd85b6821db01b4.tar.gz
spark-9f8e392159ba65decddf62eb3cd85b6821db01b4.tar.bz2
spark-9f8e392159ba65decddf62eb3cd85b6821db01b4.zip
[SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
## What changes were proposed in this pull request? currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event. this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set. in this PR, the query id will be removed from the set only after all the listeners handles the event ## How was this patch tested? a test with multiple listeners has been added to StreamingQueryListenerSuite Author: Eyal Zituny <eyal.zituny@equalum.io> Closes #16991 from eyalzit/master.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala25
2 files changed, 38 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index a2153d27e9..4207013c3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -75,6 +75,19 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
}
}
+ /**
+ * Override the parent `postToAll` to remove the query id from `activeQueryRunIds` after all
+ * the listeners process `QueryTerminatedEvent`. (SPARK-19594)
+ */
+ override def postToAll(event: Event): Unit = {
+ super.postToAll(event)
+ event match {
+ case t: QueryTerminatedEvent =>
+ activeQueryRunIds.synchronized { activeQueryRunIds -= t.runId }
+ case _ =>
+ }
+ }
+
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: StreamingQueryListener.Event =>
@@ -112,7 +125,6 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
case queryTerminated: QueryTerminatedEvent =>
if (shouldReport(queryTerminated.runId)) {
listener.onQueryTerminated(queryTerminated)
- activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
}
case _ =>
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 4596aa1d34..eb09b9ffcf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -133,6 +133,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
+ val df = MemoryStream[Int].toDS().as[Long]
+ val listeners = (1 to 5).map(_ => new EventCollector)
+ try {
+ listeners.foreach(listener => spark.streams.addListener(listener))
+ testStream(df, OutputMode.Append)(
+ StartStream(),
+ StopStream,
+ AssertOnQuery { query =>
+ eventually(Timeout(streamingTimeout)) {
+ listeners.foreach(listener => assert(listener.terminationEvent !== null))
+ listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
+ listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
+ listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
+ }
+ listeners.foreach(listener => listener.checkAsyncErrors())
+ listeners.foreach(listener => listener.reset())
+ true
+ }
+ )
+ } finally {
+ listeners.foreach(spark.streams.removeListener)
+ }
+ }
+
test("adding and removing listener") {
def isListenerActive(listener: EventCollector): Boolean = {
listener.reset()