aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-21 10:44:20 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-21 10:44:20 -0800
commit607a1e63dbc9269b806a9f537e1d041029333cdd (patch)
treec698a1d7477a8cf9aeece1b1061de272a5567b77 /sql
parent1a64388973711b4e567f25fa33d752066a018b49 (diff)
downloadspark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.gz
spark-607a1e63dbc9269b806a9f537e1d041029333cdd.tar.bz2
spark-607a1e63dbc9269b806a9f537e1d041029333cdd.zip
[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years
## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on #16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16304 from tdas/SPARK-18834-1.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala)75
4 files changed, 73 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index e8570d040d..5a9a99e111 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
child: SparkPlan) extends SparkPlan {
val eventTimeStats = new EventTimeStatsAccum()
+ val delayMs = {
+ val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ delay.milliseconds + delay.months * millisPerMonth
+ }
+
sparkContext.register(eventTimeStats)
override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
if (a semanticEquals eventTime) {
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
- .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+ .putLong(EventTimeWatermark.delayKey, delayMs)
.build()
a.withMetadata(updatedMetadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2386f33f8a..c5e9eae607 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
- val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+ val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
+ val watermarkTimestamp =
+ if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+ else Map.empty[String, String]
if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8f97d9570e..e05200df50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -387,7 +387,7 @@ class StreamExecution(
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
- e.eventTimeStats.value.max - e.delay.milliseconds
+ e.eventTimeStats.value.max - e.delayMs
}.headOption.foreach { newWatermarkMs =>
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index f1cc19c6e2..bdfba9590b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
import java.{util => ju}
import java.text.SimpleDateFormat
+import java.util.{Calendar, Date}
import org.scalatest.BeforeAndAfter
@@ -26,8 +27,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.InternalOutputModes.Complete
-class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
+class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
import testImplicits._
@@ -52,24 +54,35 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.getMessage contains "int")
}
-
test("event time and watermark metrics") {
- val inputData = MemoryStream[Int]
+ // No event time metrics when there is no watermarking
+ val inputData1 = MemoryStream[Int]
+ val aggWithoutWatermark = inputData1.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
- val windowedAggregation = inputData.toDF()
+ testStream(aggWithoutWatermark, outputMode = Complete)(
+ AddData(inputData1, 15),
+ CheckAnswer((15, 1)),
+ assertEventStats { e => assert(e.isEmpty) },
+ AddData(inputData1, 10, 12, 14),
+ CheckAnswer((10, 3), (15, 1)),
+ assertEventStats { e => assert(e.isEmpty) }
+ )
+
+ // All event time metrics where watermarking is set
+ val inputData2 = MemoryStream[Int]
+ val aggWithWatermark = inputData2.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
- def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
- body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
- true
- }
-
- testStream(windowedAggregation)(
- AddData(inputData, 15),
+ testStream(aggWithWatermark)(
+ AddData(inputData2, 15),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(15))
@@ -77,7 +90,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.get("avg") === formatTimestamp(15))
assert(e.get("watermark") === formatTimestamp(0))
},
- AddData(inputData, 10, 12, 14),
+ AddData(inputData2, 10, 12, 14),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(14))
@@ -85,7 +98,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.get("avg") === formatTimestamp(12))
assert(e.get("watermark") === formatTimestamp(5))
},
- AddData(inputData, 25),
+ AddData(inputData2, 25),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
@@ -93,7 +106,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.get("avg") === formatTimestamp(25))
assert(e.get("watermark") === formatTimestamp(5))
},
- AddData(inputData, 25),
+ AddData(inputData2, 25),
CheckAnswer((10, 3)),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
@@ -124,6 +137,33 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
)
}
+ test("delay in months and years handled correctly") {
+ val currentTimeMs = System.currentTimeMillis
+ val currentTime = new Date(currentTimeMs)
+
+ val input = MemoryStream[Long]
+ val aggWithWatermark = input.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "2 years 5 months")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth }
+
+ testStream(aggWithWatermark)(
+ AddData(input, currentTimeMs / 1000),
+ CheckAnswer(),
+ AddData(input, currentTimeMs / 1000),
+ CheckAnswer(),
+ assertEventStats { e =>
+ assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000)
+ val watermarkTime = timestampFormat.parse(e.get("watermark"))
+ assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29)
+ }
+ )
+ }
+
test("recovery") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
@@ -231,6 +271,13 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
)
}
+ private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
+ AssertOnQuery { q =>
+ body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
+ true
+ }
+ }
+
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))