aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala10
5 files changed, 53 insertions, 27 deletions
diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
index 303f8ebb88..594e747a8d 100644
--- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
+++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
@@ -41,3 +41,8 @@
stroke: #444;
stroke-width: 1.5px;
}
+
+/* Breaks the long string like file path when showing tooltips */
+.tooltip-inner {
+ word-wrap:break-word;
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index d6516f26a7..85af4faf4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.execution
+import org.apache.commons.lang.StringUtils
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@@ -127,8 +129,11 @@ private[sql] case class RDDScanExec(
private[sql] trait DataSourceScanExec extends LeafExecNode {
val rdd: RDD[InternalRow]
val relation: BaseRelation
+ val metastoreTableIdentifier: Option[TableIdentifier]
- override val nodeName: String = relation.toString
+ override val nodeName: String = {
+ s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
+ }
// Ignore rdd when checking results
override def sameResult(plan: SparkPlan): Boolean = plan match {
@@ -143,7 +148,8 @@ private[sql] case class RowDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
- override val metadata: Map[String, String] = Map.empty)
+ override val metadata: Map[String, String],
+ override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with CodegenSupport {
private[sql] override lazy val metrics =
@@ -174,8 +180,11 @@ private[sql] case class RowDataSourceScanExec(
}
override def simpleString: String = {
- val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
- s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+ key + ": " + StringUtils.abbreviate(value, 100)
+ }
+
+ s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -212,7 +221,8 @@ private[sql] case class BatchedDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
- override val metadata: Map[String, String] = Map.empty)
+ override val metadata: Map[String, String],
+ override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with CodegenSupport {
private[sql] override lazy val metrics =
@@ -224,9 +234,11 @@ private[sql] case class BatchedDataSourceScanExec(
}
override def simpleString: String = {
- val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+ key + ": " + StringUtils.abbreviate(value, 100)
+ }
val metadataStr = metadataEntries.mkString(" ", ", ", "")
- s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
+ s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -325,7 +337,8 @@ private[sql] object DataSourceScanExec {
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
- metadata: Map[String, String] = Map.empty): DataSourceScanExec = {
+ metadata: Map[String, String] = Map.empty,
+ metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = {
val outputPartitioning = {
val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
@@ -351,9 +364,11 @@ private[sql] object DataSourceScanExec {
relation match {
case r: HadoopFsRelation
if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) =>
- BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
+ BatchedDataSourceScanExec(
+ output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
case _ =>
- RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
+ RowDataSourceScanExec(
+ output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bebd74b4b..bc249f4ed5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
}
- relation.relation match {
- case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ")
- case _ =>
- }
-
pairs.toMap
}
@@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.DataSourceScanExec.create(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, metadata)
+ relation.relation, metadata, relation.metastoreTableIdentifier)
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.DataSourceScanExec.create(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, metadata)
+ relation.relation, metadata, relation.metastoreTableIdentifier)
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 8a93c6ff9a..350508c1d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
+import org.apache.spark.sql.execution.SparkPlan
/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
@@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) =>
+ case PhysicalOperation(projects, filters,
+ l @ LogicalRelation(files: HadoopFsRelation, _, table)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
@@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
partitions
}
+ val meta = Map(
+ "Format" -> files.fileFormat.toString,
+ "ReadSchema" -> prunedDataSchema.simpleString,
+ PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
+ INPUT_PATHS -> files.location.paths.mkString(", "))
+
val scan =
DataSourceScanExec.create(
readDataColumns ++ partitionColumns,
@@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
readFile,
plannedPartitions),
files,
- Map(
- "Format" -> files.fileFormat.toString,
- "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
- "ReadSchema" -> prunedDataSchema.simpleString))
+ meta,
+ table)
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index c87e672961..b516297115 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, Inter
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -157,8 +157,12 @@ case class HadoopFsRelation(
def refresh(): Unit = location.refresh()
- override def toString: String =
- s"HadoopFiles"
+ override def toString: String = {
+ fileFormat match {
+ case source: DataSourceRegister => source.shortName()
+ case _ => "HadoopFiles"
+ }
+ }
/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =