aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSean Zhong <clockfly@gmail.com>2016-05-10 21:50:53 -0700
committerReynold Xin <rxin@databricks.com>2016-05-10 21:50:53 -0700
commit61e0bdcff2ed57b22541fb3c03146d6eec2bb70f (patch)
tree621f9ab55934b3efbaaf579118f47273cc3d9500 /sql
parentd9ca9fd3e582f9d29f8887c095637c93a8b93651 (diff)
downloadspark-61e0bdcff2ed57b22541fb3c03146d6eec2bb70f.tar.gz
spark-61e0bdcff2ed57b22541fb3c03146d6eec2bb70f.tar.bz2
spark-61e0bdcff2ed57b22541fb3c03146d6eec2bb70f.zip
[SPARK-14476][SQL] Improve the physical plan visualization by adding meta info like table name and file path for data source.
## What changes were proposed in this pull request? Improve the physical plan visualization by adding meta info like table name and file path for data source. Meta info InputPaths and TableName are newly added. Example: ``` scala> spark.range(10).write.saveAsTable("tt") scala> spark.sql("select * from tt").explain() == Physical Plan == WholeStageCodegen : +- BatchedScan HadoopFiles[id#13L] Format: ParquetFormat, InputPaths: file:/home/xzhong10/spark-linux/assembly/spark-warehouse/tt, PushedFilters: [], ReadSchema: struct<id:bigint>, TableName: default.tt ``` ## How was this patch tested? manual tests. Changes for UI: Before: ![ui_before_change](https://cloud.githubusercontent.com/assets/2595532/15064559/3d423e3c-1388-11e6-8099-7803ef496c4d.jpg) After: ![fix_long_string](https://cloud.githubusercontent.com/assets/2595532/15133566/8ad09e26-1696-11e6-939c-99b908249b9d.jpg) ![for_load](https://cloud.githubusercontent.com/assets/2595532/15157224/3ba95c98-171d-11e6-885a-de0ee8dec27c.jpg) Author: Sean Zhong <clockfly@gmail.com> Closes #12947 from clockfly/spark-14476.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala10
5 files changed, 53 insertions, 27 deletions
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
index 303f8ebb88..594e747a8d 100644
--- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
+++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
@@ -41,3 +41,8 @@
stroke: #444;
stroke-width: 1.5px;
}
+
+/* Breaks the long string like file path when showing tooltips */
+.tooltip-inner {
+ word-wrap:break-word;
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index d6516f26a7..85af4faf4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.execution
+import org.apache.commons.lang.StringUtils
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@@ -127,8 +129,11 @@ private[sql] case class RDDScanExec(
private[sql] trait DataSourceScanExec extends LeafExecNode {
val rdd: RDD[InternalRow]
val relation: BaseRelation
+ val metastoreTableIdentifier: Option[TableIdentifier]
- override val nodeName: String = relation.toString
+ override val nodeName: String = {
+ s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
+ }
// Ignore rdd when checking results
override def sameResult(plan: SparkPlan): Boolean = plan match {
@@ -143,7 +148,8 @@ private[sql] case class RowDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
- override val metadata: Map[String, String] = Map.empty)
+ override val metadata: Map[String, String],
+ override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with CodegenSupport {
private[sql] override lazy val metrics =
@@ -174,8 +180,11 @@ private[sql] case class RowDataSourceScanExec(
}
override def simpleString: String = {
- val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
- s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+ key + ": " + StringUtils.abbreviate(value, 100)
+ }
+
+ s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -212,7 +221,8 @@ private[sql] case class BatchedDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
- override val metadata: Map[String, String] = Map.empty)
+ override val metadata: Map[String, String],
+ override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with CodegenSupport {
private[sql] override lazy val metrics =
@@ -224,9 +234,11 @@ private[sql] case class BatchedDataSourceScanExec(
}
override def simpleString: String = {
- val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+ key + ": " + StringUtils.abbreviate(value, 100)
+ }
val metadataStr = metadataEntries.mkString(" ", ", ", "")
- s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
+ s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -325,7 +337,8 @@ private[sql] object DataSourceScanExec {
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
- metadata: Map[String, String] = Map.empty): DataSourceScanExec = {
+ metadata: Map[String, String] = Map.empty,
+ metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = {
val outputPartitioning = {
val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
@@ -351,9 +364,11 @@ private[sql] object DataSourceScanExec {
relation match {
case r: HadoopFsRelation
if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) =>
- BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
+ BatchedDataSourceScanExec(
+ output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
case _ =>
- RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
+ RowDataSourceScanExec(
+ output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bebd74b4b..bc249f4ed5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
}
- relation.relation match {
- case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ")
- case _ =>
- }
-
pairs.toMap
}
@@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.DataSourceScanExec.create(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, metadata)
+ relation.relation, metadata, relation.metastoreTableIdentifier)
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.DataSourceScanExec.create(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, metadata)
+ relation.relation, metadata, relation.metastoreTableIdentifier)
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 8a93c6ff9a..350508c1d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
+import org.apache.spark.sql.execution.SparkPlan
/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
@@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) =>
+ case PhysicalOperation(projects, filters,
+ l @ LogicalRelation(files: HadoopFsRelation, _, table)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
@@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
partitions
}
+ val meta = Map(
+ "Format" -> files.fileFormat.toString,
+ "ReadSchema" -> prunedDataSchema.simpleString,
+ PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
+ INPUT_PATHS -> files.location.paths.mkString(", "))
+
val scan =
DataSourceScanExec.create(
readDataColumns ++ partitionColumns,
@@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
readFile,
plannedPartitions),
files,
- Map(
- "Format" -> files.fileFormat.toString,
- "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
- "ReadSchema" -> prunedDataSchema.simpleString))
+ meta,
+ table)
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index c87e672961..b516297115 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, Inter
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -157,8 +157,12 @@ case class HadoopFsRelation(
def refresh(): Unit = location.refresh()
- override def toString: String =
- s"HadoopFiles"
+ override def toString: String = {
+ fileFormat match {
+ case source: DataSourceRegister => source.shortName()
+ case _ => "HadoopFiles"
+ }
+ }
/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =