aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-05 18:51:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-05 18:51:07 -0800
commit4af142f55771affa5fc7f2abbbf5e47766194e6e (patch)
tree234dea0f43142dcfc11e82e444da7ed352b8db2c
parent508de38c9928d160cf70e8e7d69ddb1dca5c1a64 (diff)
downloadspark-4af142f55771affa5fc7f2abbbf5e47766194e6e.tar.gz
spark-4af142f55771affa5fc7f2abbbf5e47766194e6e.tar.bz2
spark-4af142f55771affa5fc7f2abbbf5e47766194e6e.zip
[SPARK-18722][SS] Move no data rate limit from StreamExecution to ProgressReporter
## What changes were proposed in this pull request? Move no data rate limit from StreamExecution to ProgressReporter to make `recentProgresses` and listener events consistent. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16155 from zsxwing/SPARK-18722.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala4
3 files changed, 33 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 7d0d086746..d95f55267e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
/**
@@ -56,6 +57,7 @@ trait ProgressReporter extends Logging {
protected def offsetSeqMetadata: OffsetSeqMetadata
protected def currentBatchId: Long
protected def sparkSession: SparkSession
+ protected def postEvent(event: StreamingQueryListener.Event): Unit
// Local timestamps and counters.
private var currentTriggerStartTimestamp = -1L
@@ -70,6 +72,12 @@ trait ProgressReporter extends Logging {
/** Holds the most recent query progress updates. Accesses must lock on the queue itself. */
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+ private val noDataProgressEventInterval =
+ sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
+ // The timestamp we report an event that has no input data
+ private var lastNoDataProgressEventTime = Long.MinValue
+
@volatile
protected var currentStatus: StreamingQueryStatus = {
new StreamingQueryStatus(
@@ -100,6 +108,17 @@ trait ProgressReporter extends Logging {
currentDurationsMs.clear()
}
+ private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+ progressBuffer.synchronized {
+ progressBuffer += newProgress
+ while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
+ progressBuffer.dequeue()
+ }
+ }
+ postEvent(new QueryProgressEvent(newProgress))
+ logInfo(s"Streaming query made progress: $newProgress")
+ }
+
/** Finalizes the query progress and adds it to list of recent status updates. */
protected def finishTrigger(hasNewData: Boolean): Unit = {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
@@ -145,14 +164,18 @@ trait ProgressReporter extends Logging {
sources = sourceProgress.toArray,
sink = sinkProgress)
- progressBuffer.synchronized {
- progressBuffer += newProgress
- while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
- progressBuffer.dequeue()
+ if (hasNewData) {
+ // Reset noDataEventTimestamp if we processed any data
+ lastNoDataProgressEventTime = Long.MinValue
+ updateProgress(newProgress)
+ } else {
+ val now = triggerClock.getTimeMillis()
+ if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
+ lastNoDataProgressEventTime = now
+ updateProgress(newProgress)
}
}
- logInfo(s"Streaming query made progress: $newProgress")
currentStatus = currentStatus.copy(isTriggerActive = false)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 083cce8eb5..39be222d05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,9 +58,6 @@ class StreamExecution(
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
- private val noDataProgressEventInterval =
- sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
-
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
@@ -217,9 +214,6 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SparkSession.setActiveSession(sparkSession)
- // The timestamp we report an event that has no input data
- var lastNoDataProgressEventTime = Long.MinValue
-
triggerExecutor.execute(() => {
startTrigger()
@@ -243,18 +237,6 @@ class StreamExecution(
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
- // Reset noDataEventTimestamp if we processed any data
- lastNoDataProgressEventTime = Long.MinValue
- postEvent(new QueryProgressEvent(lastProgress))
- } else {
- val now = triggerClock.getTimeMillis()
- if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
- lastNoDataProgressEventTime = now
- postEvent(new QueryProgressEvent(lastProgress))
- }
- }
-
- if (dataAvailable) {
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
@@ -504,7 +486,7 @@ class StreamExecution(
}
}
- private def postEvent(event: StreamingQueryListener.Event) {
+ override protected def postEvent(event: StreamingQueryListener.Event): Unit = {
sparkSession.streams.postListenerEvent(event)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index a38c05eed5..1cd503c6de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -237,6 +237,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
true
}
+ // `recentProgresses` should not receive too many no data events
+ actions += AssertOnQuery { q =>
+ q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
+ }
testStream(input.toDS)(actions: _*)
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// 11 is the max value of the possible numbers of events.