diff options
Diffstat (limited to 'sql')
5 files changed, 53 insertions, 27 deletions
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index 303f8ebb88..594e747a8d 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -41,3 +41,8 @@ stroke: #444; stroke-width: 1.5px; } + +/* Breaks the long string like file path when showing tooltips */ +.tooltip-inner { + word-wrap:break-word; +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d6516f26a7..85af4faf4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution +import org.apache.commons.lang.StringUtils + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -127,8 +129,11 @@ private[sql] case class RDDScanExec( private[sql] trait DataSourceScanExec extends LeafExecNode { val rdd: RDD[InternalRow] val relation: BaseRelation + val metastoreTableIdentifier: Option[TableIdentifier] - override val nodeName: String = relation.toString + override val nodeName: String = { + s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" + } // Ignore rdd when checking results override def sameResult(plan: SparkPlan): Boolean = plan match { @@ -143,7 +148,8 @@ private[sql] case class RowDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -174,8 +180,11 @@ private[sql] case class RowDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" - s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } + + s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -212,7 +221,8 @@ private[sql] case class BatchedDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -224,9 +234,11 @@ private[sql] case class BatchedDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } val metadataStr = metadataEntries.mkString(" ", ", ", "") - s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" + s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -325,7 +337,8 @@ private[sql] object DataSourceScanExec { output: Seq[Attribute], rdd: RDD[InternalRow], relation: BaseRelation, - metadata: Map[String, String] = Map.empty): DataSourceScanExec = { + metadata: Map[String, String] = Map.empty, + metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = { val outputPartitioning = { val bucketSpec = relation match { // TODO: this should be closer to bucket planning. @@ -351,9 +364,11 @@ private[sql] object DataSourceScanExec { relation match { case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) => - BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + BatchedDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) case _ => - RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + RowDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bebd74b4b..bc249f4ed5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) } - relation.relation match { - case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ") - case _ => - } - pairs.toMap } @@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 8a93c6ff9a..350508c1d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} +import org.apache.spark.sql.execution.SparkPlan /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) => + case PhysicalOperation(projects, filters, + l @ LogicalRelation(files: HadoopFsRelation, _, table)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read @@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { partitions } + val meta = Map( + "Format" -> files.fileFormat.toString, + "ReadSchema" -> prunedDataSchema.simpleString, + PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), + INPUT_PATHS -> files.location.paths.mkString(", ")) + val scan = DataSourceScanExec.create( readDataColumns ++ partitionColumns, @@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { readFile, plannedPartitions), files, - Map( - "Format" -> files.fileFormat.toString, - "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"), - "ReadSchema" -> prunedDataSchema.simpleString)) + meta, + table) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index c87e672961..b516297115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, Inter import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -157,8 +157,12 @@ case class HadoopFsRelation( def refresh(): Unit = location.refresh() - override def toString: String = - s"HadoopFiles" + override def toString: String = { + fileFormat match { + case source: DataSourceRegister => source.shortName() + case _ => "HadoopFiles" + } + } /** Returns the list of files that will be read when scanning this relation. */ override def inputFiles: Array[String] = |