aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-21 10:44:20 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-21 10:44:20 -0800
commit607a1e63dbc9269b806a9f537e1d041029333cdd (patch)
treec698a1d7477a8cf9aeece1b1061de272a5567b77 /sql/core/src/main
parent1a64388973711b4e567f25fa33d752066a018b49 (diff)
downloadspark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.gz
spark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.bz2
spark-607a1e63dbc9269b806a9f537e1d041029333cdd.zip
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on #16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16304 from tdas/SPARK-18834-1.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala2
3 files changed, 12 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index e8570d040d..5a9a99e111 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
child: SparkPlan) extends SparkPlan {
val eventTimeStats = new EventTimeStatsAccum()
+ val delayMs = {
+ val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ delay.milliseconds + delay.months * millisPerMonth
+ }
+
sparkContext.register(eventTimeStats)
override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
if (a semanticEquals eventTime) {
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
- .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+ .putLong(EventTimeWatermark.delayKey, delayMs)
.build()
a.withMetadata(updatedMetadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2386f33f8a..c5e9eae607 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
- val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+ val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
+ val watermarkTimestamp =
+ if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+ else Map.empty[String, String]
if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8f97d9570e..e05200df50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -387,7 +387,7 @@ class StreamExecution(
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
- e.eventTimeStats.value.max - e.delay.milliseconds
+ e.eventTimeStats.value.max - e.delayMs
}.headOption.foreach { newWatermarkMs =>
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")