aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-13 14:14:25 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-13 14:14:25 -0800
commitc68fb426d4ac05414fb402aa1f30f4c98df103ad (patch)
tree0d0cf5136bdc62085870e35d831b3abc5ab27ca4 /sql/core/src/test
parentaebf44e50b6b04b848829adbbe08b0f74f31eb32 (diff)
downloadspark-c68fb426d4ac05414fb402aa1f30f4c98df103ad.tar.gz
spark-c68fb426d4ac05414fb402aa1f30f4c98df103ad.tar.bz2
spark-c68fb426d4ac05414fb402aa1f30f4c98df103ad.zip
[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress
## What changes were proposed in this pull request? - Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings. - Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started. ## How was this patch tested? Updated tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16258 from tdas/SPARK-18834.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala49
4 files changed, 57 insertions, 13 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index f75f5b537e..7c6745ac82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryProgressEvent serialization") {
def testSerialization(event: QueryProgressEvent): Unit = {
+ import scala.collection.JavaConverters._
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
+ assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
+ assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
}
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 193c943f83..c970743a31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
- | "currentWatermark" : 3,
+ | "eventTime" : {
+ | "avg" : "2016-12-05T20:54:20.827Z",
+ | "max" : "2016-12-05T20:54:20.827Z",
+ | "min" : "2016-12-05T20:54:20.827Z",
+ | "watermark" : "2016-12-05T20:54:20.827Z"
+ | },
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
@@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "durationMs" : {
| "total" : 0
| },
- | "currentWatermark" : 3,
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
@@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
- currentWatermark = 3L,
+ eventTime = Map(
+ "max" -> "2016-12-05T20:54:20.827Z",
+ "min" -> "2016-12-05T20:54:20.827Z",
+ "avg" -> "2016-12-05T20:54:20.827Z",
+ "watermark" -> "2016-12-05T20:54:20.827Z").asJava,
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
@@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite {
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
- currentWatermark = 3L,
+ eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c66d6b1f8d..afd788ce3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.streaming
+import scala.collection.JavaConverters._
+
import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
index 12f3c3e5ff..f1cc19c6e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.streaming
+import java.{util => ju}
+import java.text.SimpleDateFormat
+
import org.scalatest.BeforeAndAfter
import org.apache.spark.internal.Logging
@@ -50,8 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
}
- test("watermark metric") {
-
+ test("event time and watermark metrics") {
val inputData = MemoryStream[Int]
val windowedAggregation = inputData.toDF()
@@ -61,21 +63,43 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+ def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
+ body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
+ true
+ }
+
testStream(windowedAggregation)(
AddData(inputData, 15),
CheckAnswer(),
- AssertOnQuery { query =>
- query.lastProgress.currentWatermark === 5000
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(15))
+ assert(e.get("min") === formatTimestamp(15))
+ assert(e.get("avg") === formatTimestamp(15))
+ assert(e.get("watermark") === formatTimestamp(0))
},
- AddData(inputData, 15),
+ AddData(inputData, 10, 12, 14),
CheckAnswer(),
- AssertOnQuery { query =>
- query.lastProgress.currentWatermark === 5000
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(14))
+ assert(e.get("min") === formatTimestamp(10))
+ assert(e.get("avg") === formatTimestamp(12))
+ assert(e.get("watermark") === formatTimestamp(5))
},
AddData(inputData, 25),
CheckAnswer(),
- AssertOnQuery { query =>
- query.lastProgress.currentWatermark === 15000
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(25))
+ assert(e.get("min") === formatTimestamp(25))
+ assert(e.get("avg") === formatTimestamp(25))
+ assert(e.get("watermark") === formatTimestamp(5))
+ },
+ AddData(inputData, 25),
+ CheckAnswer((10, 3)),
+ assertEventStats { e =>
+ assert(e.get("max") === formatTimestamp(25))
+ assert(e.get("min") === formatTimestamp(25))
+ assert(e.get("avg") === formatTimestamp(25))
+ assert(e.get("watermark") === formatTimestamp(15))
}
)
}
@@ -206,4 +230,11 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
CheckAnswer((10, 1))
)
}
+
+ private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
+
+ private def formatTimestamp(sec: Long): String = {
+ timestampFormat.format(new ju.Date(sec * 1000))
+ }
}