diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-06-23 16:04:16 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-23 16:04:16 -0700 |
commit | 0e4bdebece892edb126fa443f67c846e44e7367e (patch) | |
tree | ebe2f44c51fde2c233f1109b05e42023c5423057 | |
parent | 91b1ef28d134313d7b6faaffa1c390f3ca4455d0 (diff) | |
download | spark-0e4bdebece892edb126fa443f67c846e44e7367e.tar.gz spark-0e4bdebece892edb126fa443f67c846e44e7367e.tar.bz2 spark-0e4bdebece892edb126fa443f67c846e44e7367e.zip |
[SPARK-15443][SQL] Fix 'explain' for streaming Dataset
## What changes were proposed in this pull request?
- Fix the `explain` command for streaming Dataset/DataFrame. E.g.,
```
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- 'MapElements <function1>, obj#6: java.lang.String
+- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, StringType).toString, StructField(value,StringType,true))), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
+- *DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- *Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
```
- Add `StreamingQuery.explain` to display the last execution plan. E.g.,
```
== Parsed Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- Relation[value#12] text
== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- Relation[value#12] text
== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- Relation[value#12] text
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
+- *DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- *Filter <function1>.apply
+- *Scan text [value#12] Format: org.apache.spark.sql.execution.datasources.text.TextFileFormat1836ab91, InputPaths: file:/Users/zsx/stream/a.txt, file:/Users/zsx/stream/b.txt, file:/Users/zsx/stream/c.txt, PushedFilters: [], ReadSchema: struct<value:string>
```
## How was this patch tested?
The added unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13815 from zsxwing/sdf-explain.
8 files changed, 142 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8e2f2ed4f8..b619d4edc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} -import org.apache.spark.sql.execution.streaming.MemoryPlan +import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation, StreamingRelation, StreamingRelationExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery @@ -307,6 +307,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + /** + * This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`. + * It won't affect the execution, because `StreamingRelation` will be replaced with + * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation` will + * be replaced with the real relation using the `Source` in `StreamExecution`. + */ + object StreamingRelationStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case s: StreamingRelation => + StreamingRelationExec(s.sourceName, s.output) :: Nil + case s: StreamingExecutionRelation => + StreamingRelationExec(s.toString, s.output) :: Nil + case _ => Nil + } + } + // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { def numPartitions: Int = self.numPartitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 38bb6e412f..7eaad81a81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.debug._ +import org.apache.spark.sql.execution.streaming.IncrementalExecution +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types._ /** @@ -98,7 +100,14 @@ case class ExplainCommand( // Run through the optimizer to generate the physical plan. override def run(sparkSession: SparkSession): Seq[Row] = try { - val queryExecution = sparkSession.sessionState.executePlan(logicalPlan) + val queryExecution = + if (logicalPlan.isStreaming) { + // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the + // output mode does not matter since there is no `Sink`. + new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0) + } else { + sparkSession.sessionState.executePlan(logicalPlan) + } val outputString = if (codegen) { codegenString(queryExecution.executedPlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index bc0e443ca7..0ce00552bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -37,6 +37,7 @@ class IncrementalExecution private[sql]( // TODO: make this always part of planning. val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +: + sparkSession.sessionState.planner.StreamingRelationStrategy +: sparkSession.sessionState.experimentalMethods.extraStrategies // Modified planner with stateful operations. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1428b97149..f1af79e738 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -473,6 +474,25 @@ class StreamExecution( } } + /** Expose for tests */ + def explainInternal(extended: Boolean): String = { + if (lastExecution == null) { + "N/A" + } else { + val explain = ExplainCommand(lastExecution.logical, extended = extended) + sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect() + .map(_.getString(0)).mkString("\n") + } + } + + override def explain(extended: Boolean): Unit = { + // scalastyle:off println + println(explainInternal(extended)) + // scalastyle:on println + } + + override def explain(): Unit = explain(extended = false) + override def toString: String = { s"Streaming Query - $name [state = $state]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 4d65d2f4f5..e8b00094ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource object StreamingRelation { @@ -50,6 +53,17 @@ case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) ex override def toString: String = source.toString } +/** + * A dummy physical plan for [[StreamingRelation]] to support + * [[org.apache.spark.sql.Dataset.explain]] + */ +case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) extends LeafExecNode { + override def toString: String = sourceName + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException("StreamingRelationExec cannot be executed") + } +} + object StreamingExecutionRelation { def apply(source: Source): StreamingExecutionRelation = { StreamingExecutionRelation(source, source.schema.toAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index dc81a5b180..19d1ecf740 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -107,6 +107,7 @@ trait StreamingQuery { * method may block forever. Additionally, this method is only guaranteed to block until data that * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). + * @since 2.0.0 */ def processAllAvailable(): Unit @@ -116,4 +117,18 @@ trait StreamingQuery { * @since 2.0.0 */ def stop(): Unit + + /** + * Prints the physical plan to the console for debugging purposes. + * @since 2.0.0 + */ + def explain(): Unit + + /** + * Prints the physical plan to the console for debugging purposes. + * + * @param extended whether to do extended explain or not + * @since 2.0.0 + */ + def explain(extended: Boolean): Unit } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6971f93b23..0eade71d1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -592,6 +592,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest { ) } } + + test("explain") { + withTempDirs { case (src, tmp) => + src.mkdirs() + + val df = spark.readStream.format("text").load(src.getCanonicalPath).map(_ + "-x") + // Test `explain` not throwing errors + df.explain() + + val q = df.writeStream.queryName("file_explain").format("memory").start() + .asInstanceOf[StreamExecution] + try { + assert("N/A" === q.explainInternal(false)) + assert("N/A" === q.explainInternal(true)) + + val tempFile = Utils.tempFileWith(new File(tmp, "text")) + val finalFile = new File(src, tempFile.getName) + require(stringToFile(tempFile, "foo").renameTo(finalFile)) + + q.processAllAvailable() + + val explainWithoutExtended = q.explainInternal(false) + // `extended = false` only displays the physical plan. + assert("Relation.*text".r.findAllMatchIn(explainWithoutExtended).size === 0) + assert("TextFileFormat".r.findAllMatchIn(explainWithoutExtended).size === 1) + + val explainWithExtended = q.explainInternal(true) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("Relation.*text".r.findAllMatchIn(explainWithExtended).size === 3) + assert("TextFileFormat".r.findAllMatchIn(explainWithExtended).size === 1) + } finally { + q.stop() + } + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index b8e40e71bf..c4a894b681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -242,6 +242,35 @@ class StreamSuite extends StreamTest { val o2 = OutputMode.Complete assert(o2 === InternalOutputModes.Complete) } + + test("explain") { + val inputData = MemoryStream[String] + val df = inputData.toDS().map(_ + "foo") + // Test `explain` not throwing errors + df.explain() + val q = df.writeStream.queryName("memory_explain").format("memory").start() + .asInstanceOf[StreamExecution] + try { + assert("N/A" === q.explainInternal(false)) + assert("N/A" === q.explainInternal(true)) + + inputData.addData("abc") + q.processAllAvailable() + + val explainWithoutExtended = q.explainInternal(false) + // `extended = false` only displays the physical plan. + assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0) + assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1) + + val explainWithExtended = q.explainInternal(true) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3) + assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1) + } finally { + q.stop() + } + } } /** |