diff options
author | Sean Zhong <clockfly@gmail.com> | 2016-05-10 21:50:53 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-10 21:50:53 -0700 |
commit | 61e0bdcff2ed57b22541fb3c03146d6eec2bb70f (patch) | |
tree | 621f9ab55934b3efbaaf579118f47273cc3d9500 /sql | |
parent | d9ca9fd3e582f9d29f8887c095637c93a8b93651 (diff) | |
download | spark-61e0bdcff2ed57b22541fb3c03146d6eec2bb70f.tar.gz spark-61e0bdcff2ed57b22541fb3c03146d6eec2bb70f.tar.bz2 spark-61e0bdcff2ed57b22541fb3c03146d6eec2bb70f.zip |
[SPARK-14476][SQL] Improve the physical plan visualization by adding meta info like table name and file path for data source.
## What changes were proposed in this pull request?
Improve the physical plan visualization by adding meta info like table name and file path for data source.
Meta info InputPaths and TableName are newly added. Example:
```
scala> spark.range(10).write.saveAsTable("tt")
scala> spark.sql("select * from tt").explain()
== Physical Plan ==
WholeStageCodegen
: +- BatchedScan HadoopFiles[id#13L] Format: ParquetFormat, InputPaths: file:/home/xzhong10/spark-linux/assembly/spark-warehouse/tt, PushedFilters: [], ReadSchema: struct<id:bigint>, TableName: default.tt
```
## How was this patch tested?
manual tests.
Changes for UI:
Before:
![ui_before_change](https://cloud.githubusercontent.com/assets/2595532/15064559/3d423e3c-1388-11e6-8099-7803ef496c4d.jpg)
After:
![fix_long_string](https://cloud.githubusercontent.com/assets/2595532/15133566/8ad09e26-1696-11e6-939c-99b908249b9d.jpg)
![for_load](https://cloud.githubusercontent.com/assets/2595532/15157224/3ba95c98-171d-11e6-885a-de0ee8dec27c.jpg)
Author: Sean Zhong <clockfly@gmail.com>
Closes #12947 from clockfly/spark-14476.
Diffstat (limited to 'sql')
5 files changed, 53 insertions, 27 deletions
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index 303f8ebb88..594e747a8d 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -41,3 +41,8 @@ stroke: #444; stroke-width: 1.5px; } + +/* Breaks the long string like file path when showing tooltips */ +.tooltip-inner { + word-wrap:break-word; +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d6516f26a7..85af4faf4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution +import org.apache.commons.lang.StringUtils + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -127,8 +129,11 @@ private[sql] case class RDDScanExec( private[sql] trait DataSourceScanExec extends LeafExecNode { val rdd: RDD[InternalRow] val relation: BaseRelation + val metastoreTableIdentifier: Option[TableIdentifier] - override val nodeName: String = relation.toString + override val nodeName: String = { + s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" + } // Ignore rdd when checking results override def sameResult(plan: SparkPlan): Boolean = plan match { @@ -143,7 +148,8 @@ private[sql] case class RowDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -174,8 +180,11 @@ private[sql] case class RowDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" - s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } + + s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -212,7 +221,8 @@ private[sql] case class BatchedDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -224,9 +234,11 @@ private[sql] case class BatchedDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } val metadataStr = metadataEntries.mkString(" ", ", ", "") - s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" + s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -325,7 +337,8 @@ private[sql] object DataSourceScanExec { output: Seq[Attribute], rdd: RDD[InternalRow], relation: BaseRelation, - metadata: Map[String, String] = Map.empty): DataSourceScanExec = { + metadata: Map[String, String] = Map.empty, + metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = { val outputPartitioning = { val bucketSpec = relation match { // TODO: this should be closer to bucket planning. @@ -351,9 +364,11 @@ private[sql] object DataSourceScanExec { relation match { case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) => - BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + BatchedDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) case _ => - RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + RowDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bebd74b4b..bc249f4ed5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) } - relation.relation match { - case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ") - case _ => - } - pairs.toMap } @@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 8a93c6ff9a..350508c1d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} +import org.apache.spark.sql.execution.SparkPlan /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) => + case PhysicalOperation(projects, filters, + l @ LogicalRelation(files: HadoopFsRelation, _, table)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read @@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { partitions } + val meta = Map( + "Format" -> files.fileFormat.toString, + "ReadSchema" -> prunedDataSchema.simpleString, + PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), + INPUT_PATHS -> files.location.paths.mkString(", ")) + val scan = DataSourceScanExec.create( readDataColumns ++ partitionColumns, @@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { readFile, plannedPartitions), files, - Map( - "Format" -> files.fileFormat.toString, - "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"), - "ReadSchema" -> prunedDataSchema.simpleString)) + meta, + table) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index c87e672961..b516297115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, Inter import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -157,8 +157,12 @@ case class HadoopFsRelation( def refresh(): Unit = location.refresh() - override def toString: String = - s"HadoopFiles" + override def toString: String = { + fileFormat match { + case source: DataSourceRegister => source.shortName() + case _ => "HadoopFiles" + } + } /** Returns the list of files that will be read when scanning this relation. */ override def inputFiles: Array[String] = |