aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-02 12:42:47 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-02 12:42:47 -0800
commit56a503df5ccbb233ad6569e22002cc989e676337 (patch)
treeea92a9bc3bdaa408262d2642d48fd24245af6677
parenta985dd8e99d2663a3cb4745c675fa2057aa67155 (diff)
downloadspark-56a503df5ccbb233ad6569e22002cc989e676337.tar.gz
spark-56a503df5ccbb233ad6569e22002cc989e676337.tar.bz2
spark-56a503df5ccbb233ad6569e22002cc989e676337.zip
[SPARK-18670][SS] Limit the number of StreamingQueryListener.StreamProgressEvent when there is no data
## What changes were proposed in this pull request? This PR adds a sql conf `spark.sql.streaming.noDataReportInterval` to control how long to wait before outputing the next StreamProgressEvent when there is no data. ## How was this patch tested? The added unit test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16108 from zsxwing/SPARK-18670.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala44
3 files changed, 71 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6d0e269d34..8804c647a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -63,6 +63,9 @@ class StreamExecution(
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
+ private val noDataProgressEventInterval =
+ sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
@@ -196,6 +199,9 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SparkSession.setActiveSession(sparkSession)
+ // The timestamp we report an event that has no input data
+ var lastNoDataProgressEventTime = Long.MinValue
+
triggerExecutor.execute(() => {
startTrigger()
@@ -218,7 +224,17 @@ class StreamExecution(
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
- postEvent(new QueryProgressEvent(lastProgress))
+ if (dataAvailable) {
+ // Reset noDataEventTimestamp if we processed any data
+ lastNoDataProgressEventTime = Long.MinValue
+ postEvent(new QueryProgressEvent(lastProgress))
+ } else {
+ val now = triggerClock.getTimeMillis()
+ if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
+ lastNoDataProgressEventTime = now
+ postEvent(new QueryProgressEvent(lastProgress))
+ }
+ }
if (dataAvailable) {
// We'll increase currentBatchId after we complete processing current batch's data
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 200f0603e1..5b45df69e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -603,6 +603,13 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(10L)
+ val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
+ SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval")
+ .internal()
+ .doc("How long to wait between two progress events when there is no data")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(10000L)
+
val STREAMING_METRICS_ENABLED =
SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
.doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
@@ -684,6 +691,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
+ def streamingNoDataProgressEventInterval: Long =
+ getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
+
def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 07a13a48a1..3086abf03c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.PrivateMethodTester._
import org.apache.spark.SparkException
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.util.JsonProtocol
@@ -46,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(spark.streams.active.isEmpty)
assert(addedListeners.isEmpty)
// Make sure we don't leak any events to the next test
+ spark.sparkContext.listenerBus.waitUntilEmpty(10000)
}
testQuietly("single listener, check trigger events are generated correctly") {
@@ -191,6 +193,48 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
+ test("only one progress event per interval when no data") {
+ // This test will start a query but not push any data, and then check if we push too many events
+ withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100ms") {
+ @volatile var numProgressEvent = 0
+ val listener = new StreamingQueryListener {
+ override def onQueryStarted(event: QueryStartedEvent): Unit = {}
+ override def onQueryProgress(event: QueryProgressEvent): Unit = {
+ numProgressEvent += 1
+ }
+ override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
+ }
+ spark.streams.addListener(listener)
+ try {
+ val input = new MemoryStream[Int](0, sqlContext) {
+ @volatile var numTriggers = 0
+ override def getOffset: Option[Offset] = {
+ numTriggers += 1
+ super.getOffset
+ }
+ }
+ val clock = new StreamManualClock()
+ val actions = mutable.ArrayBuffer[StreamAction]()
+ actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock)
+ for (_ <- 1 to 100) {
+ actions += AdvanceManualClock(10)
+ }
+ actions += AssertOnQuery { _ =>
+ eventually(timeout(streamingTimeout)) {
+ assert(input.numTriggers > 100) // at least 100 triggers have occurred
+ }
+ true
+ }
+ testStream(input.toDS)(actions: _*)
+ spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+ // 11 is the max value of the possible numbers of events.
+ assert(numProgressEvent > 1 && numProgressEvent <= 11)
+ } finally {
+ spark.streams.removeListener(listener)
+ }
+ }
+ }
+
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
// query-event-logs-version-2.0.0.txt has all types of events generated by
// Structured Streaming in Spark 2.0.0.