diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-21 10:44:20 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-21 10:44:20 -0800 |
commit | 607a1e63dbc9269b806a9f537e1d041029333cdd (patch) | |
tree | c698a1d7477a8cf9aeece1b1061de272a5567b77 /sql | |
parent | 1a64388973711b4e567f25fa33d752066a018b49 (diff) | |
download | spark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.gz spark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.bz2 spark-607a1e63dbc9269b806a9f537e1d041029333cdd.zip |
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
## What changes were proposed in this pull request?
Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query
## How was this patch tested?
Updated and new unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #16304 from tdas/SPARK-18834-1.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala | 7 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala | 7 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala | 2 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala) | 75 |
4 files changed, 73 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index e8570d040d..5a9a99e111 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -84,6 +84,11 @@ case class EventTimeWatermarkExec( child: SparkPlan) extends SparkPlan { val eventTimeStats = new EventTimeStatsAccum() + val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + delay.milliseconds + delay.months * millisPerMonth + } + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { @@ -101,7 +106,7 @@ case class EventTimeWatermarkExec( if (a semanticEquals eventTime) { val updatedMetadata = new MetadataBuilder() .withMetadata(a.metadata) - .putLong(EventTimeWatermark.delayKey, delay.milliseconds) + .putLong(EventTimeWatermark.delayKey, delayMs) .build() a.withMetadata(updatedMetadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 2386f33f8a..c5e9eae607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -182,7 +182,10 @@ trait ProgressReporter extends Logging { /** Extracts statistics from the most recent query execution. */ private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { - val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty + val watermarkTimestamp = + if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + else Map.empty[String, String] if (!hasNewData) { return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8f97d9570e..e05200df50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -387,7 +387,7 @@ class StreamExecution( lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") - e.eventTimeStats.value.max - e.delay.milliseconds + e.eventTimeStats.value.max - e.delayMs }.headOption.foreach { newWatermarkMs => if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index f1cc19c6e2..bdfba9590b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.{util => ju} import java.text.SimpleDateFormat +import java.util.{Calendar, Date} import org.scalatest.BeforeAndAfter @@ -26,8 +27,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.InternalOutputModes.Complete -class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { +class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging { import testImplicits._ @@ -52,24 +54,35 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { assert(e.getMessage contains "int") } - test("event time and watermark metrics") { - val inputData = MemoryStream[Int] + // No event time metrics when there is no watermarking + val inputData1 = MemoryStream[Int] + val aggWithoutWatermark = inputData1.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - val windowedAggregation = inputData.toDF() + testStream(aggWithoutWatermark, outputMode = Complete)( + AddData(inputData1, 15), + CheckAnswer((15, 1)), + assertEventStats { e => assert(e.isEmpty) }, + AddData(inputData1, 10, 12, 14), + CheckAnswer((10, 3), (15, 1)), + assertEventStats { e => assert(e.isEmpty) } + ) + + // All event time metrics where watermarking is set + val inputData2 = MemoryStream[Int] + val aggWithWatermark = inputData2.toDF() .withColumn("eventTime", $"value".cast("timestamp")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => - body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) - true - } - - testStream(windowedAggregation)( - AddData(inputData, 15), + testStream(aggWithWatermark)( + AddData(inputData2, 15), CheckAnswer(), assertEventStats { e => assert(e.get("max") === formatTimestamp(15)) @@ -77,7 +90,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { assert(e.get("avg") === formatTimestamp(15)) assert(e.get("watermark") === formatTimestamp(0)) }, - AddData(inputData, 10, 12, 14), + AddData(inputData2, 10, 12, 14), CheckAnswer(), assertEventStats { e => assert(e.get("max") === formatTimestamp(14)) @@ -85,7 +98,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { assert(e.get("avg") === formatTimestamp(12)) assert(e.get("watermark") === formatTimestamp(5)) }, - AddData(inputData, 25), + AddData(inputData2, 25), CheckAnswer(), assertEventStats { e => assert(e.get("max") === formatTimestamp(25)) @@ -93,7 +106,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { assert(e.get("avg") === formatTimestamp(25)) assert(e.get("watermark") === formatTimestamp(5)) }, - AddData(inputData, 25), + AddData(inputData2, 25), CheckAnswer((10, 3)), assertEventStats { e => assert(e.get("max") === formatTimestamp(25)) @@ -124,6 +137,33 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { ) } + test("delay in months and years handled correctly") { + val currentTimeMs = System.currentTimeMillis + val currentTime = new Date(currentTimeMs) + + val input = MemoryStream[Long] + val aggWithWatermark = input.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "2 years 5 months") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth } + + testStream(aggWithWatermark)( + AddData(input, currentTimeMs / 1000), + CheckAnswer(), + AddData(input, currentTimeMs / 1000), + CheckAnswer(), + assertEventStats { e => + assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000) + val watermarkTime = timestampFormat.parse(e.get("watermark")) + assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29) + } + ) + } + test("recovery") { val inputData = MemoryStream[Int] val df = inputData.toDF() @@ -231,6 +271,13 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { ) } + private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { + AssertOnQuery { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) + true + } + } + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) |