aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala7
2 files changed, 15 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index fc2190d39d..22e4c6380f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
def post(event: StreamingQueryListener.Event) {
event match {
case s: QueryStartedEvent =>
+ sparkListenerBus.post(s)
+ // post to local listeners to trigger callbacks
postToAll(s)
case _ =>
sparkListenerBus.post(event)
@@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: StreamingQueryListener.Event =>
- postToAll(e)
+ // SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus
+ // synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
+ // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
+ // thread
+ if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
+ postToAll(e)
+ }
case _ =>
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 464c443beb..31b7fe0b04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
// A StreamingQueryListener that gets the query status after the first completed trigger
val listener = new StreamingQueryListener {
@volatile var firstStatus: StreamingQueryStatus = null
- override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+ @volatile var queryStartedEvent = 0
+ override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+ queryStartedEvent += 1
+ }
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
}
@@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
q.processAllAvailable()
eventually(timeout(streamingTimeout)) {
assert(listener.firstStatus != null)
+ // test if QueryStartedEvent callback is called for only once
+ assert(listener.queryStartedEvent === 1)
}
listener.firstStatus
} finally {