aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-06-23 16:04:16 -0700
committerYin Huai <yhuai@databricks.com>2016-06-23 16:04:16 -0700
commit0e4bdebece892edb126fa443f67c846e44e7367e (patch)
treeebe2f44c51fde2c233f1109b05e42023c5423057 /sql
parent91b1ef28d134313d7b6faaffa1c390f3ca4455d0 (diff)
downloadspark-0e4bdebece892edb126fa443f67c846e44e7367e.tar.gz
spark-0e4bdebece892edb126fa443f67c846e44e7367e.tar.bz2
spark-0e4bdebece892edb126fa443f67c846e44e7367e.zip
[SPARK-15443][SQL] Fix 'explain' for streaming Dataset
## What changes were proposed in this pull request? - Fix the `explain` command for streaming Dataset/DataFrame. E.g., ``` == Parsed Logical Plan == 'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- 'MapElements <function1>, obj#6: java.lang.String +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, StringType).toString, StructField(value,StringType,true))), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] == Analyzed Logical Plan == value: string SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] == Optimized Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- *MapElements <function1>, obj#6: java.lang.String +- *DeserializeToObject createexternalrow(value#0.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- *Filter <function1>.apply +- StreamingRelation FileSource[/Users/zsx/stream], [value#0] ``` - Add `StreamingQuery.explain` to display the last execution plan. E.g., ``` == Parsed Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- Relation[value#12] text == Analyzed Logical Plan == value: string SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- Relation[value#12] text == Optimized Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- MapElements <function1>, obj#6: java.lang.String +- DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- Filter <function1>.apply +- Relation[value#12] text == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#7] +- *MapElements <function1>, obj#6: java.lang.String +- *DeserializeToObject createexternalrow(value#12.toString, StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row +- *Filter <function1>.apply +- *Scan text [value#12] Format: org.apache.spark.sql.execution.datasources.text.TextFileFormat1836ab91, InputPaths: file:/Users/zsx/stream/a.txt, file:/Users/zsx/stream/b.txt, file:/Users/zsx/stream/c.txt, PushedFilters: [], ReadSchema: struct<value:string> ``` ## How was this patch tested? The added unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13815 from zsxwing/sdf-explain.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala36
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala29
8 files changed, 142 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 8e2f2ed4f8..b619d4edc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
-import org.apache.spark.sql.execution.streaming.MemoryPlan
+import org.apache.spark.sql.execution.streaming.{MemoryPlan, StreamingExecutionRelation, StreamingRelation, StreamingRelationExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
@@ -307,6 +307,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
+ /**
+ * This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`.
+ * It won't affect the execution, because `StreamingRelation` will be replaced with
+ * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation` will
+ * be replaced with the real relation using the `Source` in `StreamExecution`.
+ */
+ object StreamingRelationStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case s: StreamingRelation =>
+ StreamingRelationExec(s.sourceName, s.output) :: Nil
+ case s: StreamingExecutionRelation =>
+ StreamingRelationExec(s.toString, s.output) :: Nil
+ case _ => Nil
+ }
+ }
+
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def numPartitions: Int = self.numPartitions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 38bb6e412f..7eaad81a81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.streaming.IncrementalExecution
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
/**
@@ -98,7 +100,14 @@ case class ExplainCommand(
// Run through the optimizer to generate the physical plan.
override def run(sparkSession: SparkSession): Seq[Row] = try {
- val queryExecution = sparkSession.sessionState.executePlan(logicalPlan)
+ val queryExecution =
+ if (logicalPlan.isStreaming) {
+ // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
+ // output mode does not matter since there is no `Sink`.
+ new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0)
+ } else {
+ sparkSession.sessionState.executePlan(logicalPlan)
+ }
val outputString =
if (codegen) {
codegenString(queryExecution.executedPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index bc0e443ca7..0ce00552bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -37,6 +37,7 @@ class IncrementalExecution private[sql](
// TODO: make this always part of planning.
val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+ sparkSession.sessionState.planner.StreamingRelationStrategy +:
sparkSession.sessionState.experimentalMethods.extraStrategies
// Modified planner with stateful operations.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 1428b97149..f1af79e738 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -473,6 +474,25 @@ class StreamExecution(
}
}
+ /** Expose for tests */
+ def explainInternal(extended: Boolean): String = {
+ if (lastExecution == null) {
+ "N/A"
+ } else {
+ val explain = ExplainCommand(lastExecution.logical, extended = extended)
+ sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect()
+ .map(_.getString(0)).mkString("\n")
+ }
+ }
+
+ override def explain(extended: Boolean): Unit = {
+ // scalastyle:off println
+ println(explainInternal(extended))
+ // scalastyle:on println
+ }
+
+ override def explain(): Unit = explain(extended = false)
+
override def toString: String = {
s"Streaming Query - $name [state = $state]"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 4d65d2f4f5..e8b00094ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -17,8 +17,11 @@
package org.apache.spark.sql.execution.streaming
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
object StreamingRelation {
@@ -50,6 +53,17 @@ case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) ex
override def toString: String = source.toString
}
+/**
+ * A dummy physical plan for [[StreamingRelation]] to support
+ * [[org.apache.spark.sql.Dataset.explain]]
+ */
+case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) extends LeafExecNode {
+ override def toString: String = sourceName
+ override protected def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException("StreamingRelationExec cannot be executed")
+ }
+}
+
object StreamingExecutionRelation {
def apply(source: Source): StreamingExecutionRelation = {
StreamingExecutionRelation(source, source.schema.toAttributes)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index dc81a5b180..19d1ecf740 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -107,6 +107,7 @@ trait StreamingQuery {
* method may block forever. Additionally, this method is only guaranteed to block until data that
* has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]]
* prior to invocation. (i.e. `getOffset` must immediately reflect the addition).
+ * @since 2.0.0
*/
def processAllAvailable(): Unit
@@ -116,4 +117,18 @@ trait StreamingQuery {
* @since 2.0.0
*/
def stop(): Unit
+
+ /**
+ * Prints the physical plan to the console for debugging purposes.
+ * @since 2.0.0
+ */
+ def explain(): Unit
+
+ /**
+ * Prints the physical plan to the console for debugging purposes.
+ *
+ * @param extended whether to do extended explain or not
+ * @since 2.0.0
+ */
+ def explain(extended: Boolean): Unit
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6971f93b23..0eade71d1e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -592,6 +592,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
)
}
}
+
+ test("explain") {
+ withTempDirs { case (src, tmp) =>
+ src.mkdirs()
+
+ val df = spark.readStream.format("text").load(src.getCanonicalPath).map(_ + "-x")
+ // Test `explain` not throwing errors
+ df.explain()
+
+ val q = df.writeStream.queryName("file_explain").format("memory").start()
+ .asInstanceOf[StreamExecution]
+ try {
+ assert("N/A" === q.explainInternal(false))
+ assert("N/A" === q.explainInternal(true))
+
+ val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+ val finalFile = new File(src, tempFile.getName)
+ require(stringToFile(tempFile, "foo").renameTo(finalFile))
+
+ q.processAllAvailable()
+
+ val explainWithoutExtended = q.explainInternal(false)
+ // `extended = false` only displays the physical plan.
+ assert("Relation.*text".r.findAllMatchIn(explainWithoutExtended).size === 0)
+ assert("TextFileFormat".r.findAllMatchIn(explainWithoutExtended).size === 1)
+
+ val explainWithExtended = q.explainInternal(true)
+ // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
+ // plan.
+ assert("Relation.*text".r.findAllMatchIn(explainWithExtended).size === 3)
+ assert("TextFileFormat".r.findAllMatchIn(explainWithExtended).size === 1)
+ } finally {
+ q.stop()
+ }
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b8e40e71bf..c4a894b681 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -242,6 +242,35 @@ class StreamSuite extends StreamTest {
val o2 = OutputMode.Complete
assert(o2 === InternalOutputModes.Complete)
}
+
+ test("explain") {
+ val inputData = MemoryStream[String]
+ val df = inputData.toDS().map(_ + "foo")
+ // Test `explain` not throwing errors
+ df.explain()
+ val q = df.writeStream.queryName("memory_explain").format("memory").start()
+ .asInstanceOf[StreamExecution]
+ try {
+ assert("N/A" === q.explainInternal(false))
+ assert("N/A" === q.explainInternal(true))
+
+ inputData.addData("abc")
+ q.processAllAvailable()
+
+ val explainWithoutExtended = q.explainInternal(false)
+ // `extended = false` only displays the physical plan.
+ assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0)
+ assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1)
+
+ val explainWithExtended = q.explainInternal(true)
+ // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
+ // plan.
+ assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3)
+ assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1)
+ } finally {
+ q.stop()
+ }
+ }
}
/**