diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-21 10:44:20 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-21 10:44:20 -0800 |
commit | 607a1e63dbc9269b806a9f537e1d041029333cdd (patch) | |
tree | c698a1d7477a8cf9aeece1b1061de272a5567b77 /sql/core/src/main | |
parent | 1a64388973711b4e567f25fa33d752066a018b49 (diff) | |
download | spark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.gz spark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.bz2 spark-607a1e63dbc9269b806a9f537e1d041029333cdd.zip |
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
## What changes were proposed in this pull request?
Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query
## How was this patch tested?
Updated and new unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #16304 from tdas/SPARK-18834-1.
Diffstat (limited to 'sql/core/src/main')
3 files changed, 12 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index e8570d040d..5a9a99e111 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,6 +84,11 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() + val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + delay.milliseconds + delay.months * millisPerMonth + } + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { @@ -101,7 +106,7 @@ case class EventTimeWatermarkExec( if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delay.milliseconds) + .putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 2386f33f8a..c5e9eae607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -182,7 +182,10 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { - val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty + val watermarkTimestamp = + if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + else Map.empty[String, String] if (!hasNewData) { return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8f97d9570e..e05200df50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -387,7 +387,7 @@ class StreamExecution( lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - e.eventTimeStats.value.max - e.delay.milliseconds + e.eventTimeStats.value.max - e.delayMs }.headOption.foreach { newWatermarkMs => if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") |