aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-10-21 13:07:29 -0700
committerYin Huai <yhuai@databricks.com>2016-10-21 13:07:29 -0700
commit7a531e3054f8d4820216ed379433559f57f571b8 (patch)
tree8113aa776cc65b3f18dbb5ce3bb63eb3e61aa1f5 /sql
parente371040a0150e4ed748a7c25465965840b61ca63 (diff)
downloadspark-7a531e3054f8d4820216ed379433559f57f571b8.tar.gz
spark-7a531e3054f8d4820216ed379433559f57f571b8.tar.bz2
spark-7a531e3054f8d4820216ed379433559f57f571b8.zip
[SPARK-17926][SQL][STREAMING] Added json for statuses
## What changes were proposed in this pull request? StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`. ## How was this patch tested? New unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15476 from tdas/SPARK-17926.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala55
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala105
4 files changed, 182 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index c9911665f7..ab19602207 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -17,6 +17,11 @@
package org.apache.spark.sql.streaming
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
@@ -34,8 +39,19 @@ class SinkStatus private(
val description: String,
val offsetDesc: String) {
+ /** The compact JSON representation of this status. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this status. */
+ def prettyJson: String = pretty(render(jsonValue))
+
override def toString: String =
- "SinkStatus:" + indent(prettyString)
+ "Status of sink " + indent(prettyString).trim
+
+ private[sql] def jsonValue: JValue = {
+ ("description" -> JString(description)) ~
+ ("offsetDesc" -> JString(offsetDesc))
+ }
private[sql] def prettyString: String = {
s"""$description
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
index 6ace4833be..cfdf11370e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -21,8 +21,14 @@ import java.{util => ju}
import scala.collection.JavaConverters._
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
+import org.apache.spark.util.JsonProtocol
/**
* :: Experimental ::
@@ -47,8 +53,22 @@ class SourceStatus private(
val processingRate: Double,
val triggerDetails: ju.Map[String, String]) {
+ /** The compact JSON representation of this status. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this status. */
+ def prettyJson: String = pretty(render(jsonValue))
+
override def toString: String =
- "SourceStatus:" + indent(prettyString)
+ "Status of source " + indent(prettyString).trim
+
+ private[sql] def jsonValue: JValue = {
+ ("description" -> JString(description)) ~
+ ("offsetDesc" -> JString(offsetDesc)) ~
+ ("inputRate" -> JDouble(inputRate)) ~
+ ("processingRate" -> JDouble(processingRate)) ~
+ ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
+ }
private[sql] def prettyString: String = {
val triggerDetailsLines =
@@ -59,7 +79,6 @@ class SourceStatus private(
|Processing rate: $processingRate rows/sec
|Trigger details:
|""".stripMargin + indent(triggerDetailsLines)
-
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 4768992873..a50b0d96c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -21,8 +21,14 @@ import java.{util => ju}
import scala.collection.JavaConverters._
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset}
+import org.apache.spark.util.JsonProtocol
/**
* :: Experimental ::
@@ -59,29 +65,46 @@ class StreamingQueryStatus private(
import StreamingQueryStatus._
+ /** The compact JSON representation of this status. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this status. */
+ def prettyJson: String = pretty(render(jsonValue))
+
override def toString: String = {
val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
- s"Source ${i + 1}:" + indent(s.prettyString)
+ s"Source ${i + 1} - " + indent(s.prettyString).trim
}
- val sinkStatusLines = sinkStatus.prettyString
+ val sinkStatusLines = sinkStatus.prettyString.trim
val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted
val numSources = sourceStatuses.length
val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" }
- val allLines = s"""
- |Query name: $name
- |Query id: $id
- |Status timestamp: $timestamp
- |Input rate: $inputRate rows/sec
- |Processing rate $processingRate rows/sec
- |Latency: ${latency.getOrElse("-")} ms
- |Trigger details:
- |${indent(triggerDetailsLines)}
- |Source statuses [$numSourcesString]:
- |${indent(sourceStatusLines)}
- |Sink status: ${indent(sinkStatusLines)}""".stripMargin
-
- s"StreamingQueryStatus:${indent(allLines)}"
+ val allLines =
+ s"""|Query id: $id
+ |Status timestamp: $timestamp
+ |Input rate: $inputRate rows/sec
+ |Processing rate $processingRate rows/sec
+ |Latency: ${latency.getOrElse("-")} ms
+ |Trigger details:
+ |${indent(triggerDetailsLines)}
+ |Source statuses [$numSourcesString]:
+ |${indent(sourceStatusLines)}
+ |Sink status - ${indent(sinkStatusLines).trim}""".stripMargin
+
+ s"Status of query '$name'\n${indent(allLines)}"
+ }
+
+ private[sql] def jsonValue: JValue = {
+ ("name" -> JString(name)) ~
+ ("id" -> JInt(id)) ~
+ ("timestamp" -> JInt(timestamp)) ~
+ ("inputRate" -> JDouble(inputRate)) ~
+ ("processingRate" -> JDouble(processingRate)) ~
+ ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
+ ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
+ ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
+ ("sinkStatus" -> sinkStatus.jsonValue)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
new file mode 100644
index 0000000000..1a98cf2ba7
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+
+class StreamingQueryStatusSuite extends SparkFunSuite {
+ test("toString") {
+ assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
+ """
+ |Status of source MySource1
+ | Available offset: #0
+ | Input rate: 15.5 rows/sec
+ | Processing rate: 23.5 rows/sec
+ | Trigger details:
+ | numRows.input.source: 100
+ | latency.getOffset.source: 10
+ | latency.getBatch.source: 20
+ """.stripMargin.trim, "SourceStatus.toString does not match")
+
+ assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
+ """
+ |Status of sink MySink
+ | Committed offsets: [#1, -]
+ """.stripMargin.trim, "SinkStatus.toString does not match")
+
+ assert(StreamingQueryStatus.testStatus.toString ===
+ """
+ |Status of query 'query'
+ | Query id: 1
+ | Status timestamp: 123
+ | Input rate: 15.5 rows/sec
+ | Processing rate 23.5 rows/sec
+ | Latency: 345.0 ms
+ | Trigger details:
+ | isDataPresentInTrigger: true
+ | isTriggerActive: true
+ | latency.getBatch.total: 20
+ | latency.getOffset.total: 10
+ | numRows.input.total: 100
+ | triggerId: 5
+ | Source statuses [1 source]:
+ | Source 1 - MySource1
+ | Available offset: #0
+ | Input rate: 15.5 rows/sec
+ | Processing rate: 23.5 rows/sec
+ | Trigger details:
+ | numRows.input.source: 100
+ | latency.getOffset.source: 10
+ | latency.getBatch.source: 20
+ | Sink status - MySink
+ | Committed offsets: [#1, -]
+ """.stripMargin.trim, "StreamingQueryStatus.toString does not match")
+
+ }
+
+ test("json") {
+ assert(StreamingQueryStatus.testStatus.json ===
+ """
+ |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"#0","inputRate":15.5,
+ |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
+ |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
+ |"sinkStatus":{"description":"MySink","offsetDesc":"[#1, -]"}}
+ """.stripMargin.replace("\n", "").trim)
+ }
+
+ test("prettyJson") {
+ assert(
+ StreamingQueryStatus.testStatus.prettyJson ===
+ """
+ |{
+ | "sourceStatuses" : [ {
+ | "description" : "MySource1",
+ | "offsetDesc" : "#0",
+ | "inputRate" : 15.5,
+ | "processingRate" : 23.5,
+ | "triggerDetails" : {
+ | "numRows.input.source" : "100",
+ | "latency.getOffset.source" : "10",
+ | "latency.getBatch.source" : "20"
+ | }
+ | } ],
+ | "sinkStatus" : {
+ | "description" : "MySink",
+ | "offsetDesc" : "[#1, -]"
+ | }
+ |}
+ """.stripMargin.trim)
+ }
+}