diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-13 14:14:25 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-13 14:14:25 -0800 |
commit | c68fb426d4ac05414fb402aa1f30f4c98df103ad (patch) | |
tree | 0d0cf5136bdc62085870e35d831b3abc5ab27ca4 /sql/core/src/test/scala | |
parent | aebf44e50b6b04b848829adbbe08b0f74f31eb32 (diff) | |
download | spark-c68fb426d4ac05414fb402aa1f30f4c98df103ad.tar.gz spark-c68fb426d4ac05414fb402aa1f30f4c98df103ad.tar.bz2 spark-c68fb426d4ac05414fb402aa1f30f4c98df103ad.zip |
[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress
## What changes were proposed in this pull request?
- Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings.
- Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started.
## How was this patch tested?
Updated tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #16258 from tdas/SPARK-18834.
Diffstat (limited to 'sql/core/src/test/scala')
4 files changed, 57 insertions, 13 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index f75f5b537e..7c6745ac82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryProgressEvent serialization") { def testSerialization(event: QueryProgressEvent): Unit = { + import scala.collection.JavaConverters._ val json = JsonProtocol.sparkEventToJson(event) val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent] assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality + assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala) + assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala) } testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1)) testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 193c943f83..c970743a31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "durationMs" : { | "total" : 0 | }, - | "currentWatermark" : 3, + | "eventTime" : { + | "avg" : "2016-12-05T20:54:20.827Z", + | "max" : "2016-12-05T20:54:20.827Z", + | "min" : "2016-12-05T20:54:20.827Z", + | "watermark" : "2016-12-05T20:54:20.827Z" + | }, | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1 @@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "durationMs" : { | "total" : 0 | }, - | "currentWatermark" : 3, | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1 @@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - currentWatermark = 3L, + eventTime = Map( + "max" -> "2016-12-05T20:54:20.827Z", + "min" -> "2016-12-05T20:54:20.827Z", + "avg" -> "2016-12-05T20:54:20.827Z", + "watermark" -> "2016-12-05T20:54:20.827Z").asJava, stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( @@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - currentWatermark = 3L, + eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index c66d6b1f8d..afd788ce3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import scala.collection.JavaConverters._ + import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.concurrent.Eventually._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 12f3c3e5ff..f1cc19c6e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.streaming +import java.{util => ju} +import java.text.SimpleDateFormat + import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging @@ -50,8 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { } - test("watermark metric") { - + test("event time and watermark metrics") { val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() @@ -61,21 +63,43 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) + true + } + testStream(windowedAggregation)( AddData(inputData, 15), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 5000 + assertEventStats { e => + assert(e.get("max") === formatTimestamp(15)) + assert(e.get("min") === formatTimestamp(15)) + assert(e.get("avg") === formatTimestamp(15)) + assert(e.get("watermark") === formatTimestamp(0)) }, - AddData(inputData, 15), + AddData(inputData, 10, 12, 14), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 5000 + assertEventStats { e => + assert(e.get("max") === formatTimestamp(14)) + assert(e.get("min") === formatTimestamp(10)) + assert(e.get("avg") === formatTimestamp(12)) + assert(e.get("watermark") === formatTimestamp(5)) }, AddData(inputData, 25), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 15000 + assertEventStats { e => + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(5)) + }, + AddData(inputData, 25), + CheckAnswer((10, 3)), + assertEventStats { e => + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(15)) } ) } @@ -206,4 +230,11 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { CheckAnswer((10, 1)) ) } + + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) + + private def formatTimestamp(sec: Long): String = { + timestampFormat.format(new ju.Date(sec * 1000)) + } } |