aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-15 20:51:33 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-15 20:51:33 -0800
commitfc02ef95cdfc226603b52dc579b7133631f7143d (patch)
tree6f8fd69fc139d2b0795fd7aeef365bacff7d2f50 /sql/core/src/test
parent08c1972a0661d42f300520cc6e5fb31023de093b (diff)
downloadspark-fc02ef95cdfc226603b52dc579b7133631f7143d.tar.gz
spark-fc02ef95cdfc226603b52dc579b7133631f7143d.tar.bz2
spark-fc02ef95cdfc226603b52dc579b7133631f7143d.zip
[SPARK-19603][SS] Fix StreamingQuery explain command
## What changes were proposed in this pull request? `StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false. This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan. Examples of the explain outputs: - streaming DataFrame.explain() ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements <function1>, obj#517: java.lang.String +- *DeserializeToObject value#513.toString, obj#516: java.lang.String +- StreamingRelation MemoryStream[value#513], [value#513] ``` - StreamingQuery.explain(extended = false) ``` == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements <function1>, obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ``` - StreamingQuery.explain(extended = true) ``` == Parsed Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Analyzed Logical Plan == value: string, count(1): bigint Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String +- LocalRelation [value#543] == Optimized Logical Plan == Aggregate [value#518], [value#518, count(1) AS count(1)#524L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String +- DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalRelation [value#543] == Physical Plan == *HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L]) +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0 +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- StateStoreRestore [value#518], OperatorStateId(...,0,0) +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L]) +- Exchange hashpartitioning(value#518, 5) +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518] +- *MapElements <function1>, obj#517: java.lang.String +- *DeserializeToObject value#543.toString, obj#516: java.lang.String +- LocalTableScan [value#543] ``` ## How was this patch tested? The updated unit test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16934 from zsxwing/SPARK-19603.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala28
1 files changed, 24 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f31dc8add4..0296a2ade3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -22,7 +22,9 @@ import scala.util.control.ControlThrowable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -277,10 +279,24 @@ class StreamSuite extends StreamTest {
test("explain") {
val inputData = MemoryStream[String]
- val df = inputData.toDS().map(_ + "foo")
- // Test `explain` not throwing errors
- df.explain()
- val q = df.writeStream.queryName("memory_explain").format("memory").start()
+ val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
+
+ // Test `df.explain`
+ val explain = ExplainCommand(df.queryExecution.logical, extended = false)
+ val explainString =
+ spark.sessionState
+ .executePlan(explain)
+ .executedPlan
+ .executeCollect()
+ .map(_.getString(0))
+ .mkString("\n")
+ assert(explainString.contains("StateStoreRestore"))
+ assert(explainString.contains("StreamingRelation"))
+ assert(!explainString.contains("LocalTableScan"))
+
+ // Test StreamingQuery.display
+ val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
+ .start()
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
try {
@@ -294,12 +310,16 @@ class StreamSuite extends StreamTest {
// `extended = false` only displays the physical plan.
assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0)
assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1)
+ // Use "StateStoreRestore" to verify that it does output a streaming physical plan
+ assert(explainWithoutExtended.contains("StateStoreRestore"))
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
// plan.
assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3)
assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1)
+ // Use "StateStoreRestore" to verify that it does output a streaming physical plan
+ assert(explainWithExtended.contains("StateStoreRestore"))
} finally {
q.stop()
}