aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-12-09 23:30:42 +0800
committerReynold Xin <rxin@databricks.com>2015-12-09 23:30:42 +0800
commit6e1c55eac4849669e119ce0d51f6d051830deb9f (patch)
tree4d8286eb371c2de27cc374ed55749c2830add681 /sql/core
parenta113216865fd45ea39ae8f104e784af2cf667dcf (diff)
downloadspark-6e1c55eac4849669e119ce0d51f6d051830deb9f.tar.gz
spark-6e1c55eac4849669e119ce0d51f6d051830deb9f.tar.bz2
spark-6e1c55eac4849669e119ce0d51f6d051830deb9f.zip
[SPARK-12012][SQL] Show more comprehensive PhysicalRDD metadata when visualizing SQL query plan
This PR adds a `private[sql]` method `metadata` to `SparkPlan`, which can be used to describe detail information about a physical plan during visualization. Specifically, this PR uses this method to provide details of `PhysicalRDD`s translated from a data source relation. For example, a `ParquetRelation` converted from Hive metastore table `default.psrc` is now shown as the following screenshot: ![image](https://cloud.githubusercontent.com/assets/230655/11526657/e10cb7e6-9916-11e5-9afa-f108932ec890.png) And here is the screenshot for a regular `ParquetRelation` (not converted from Hive metastore table) loaded from a really long path: ![output](https://cloud.githubusercontent.com/assets/230655/11680582/37c66460-9e94-11e5-8f50-842db5309d5a.png) Author: Cheng Lian <lian@databricks.com> Closes #10004 from liancheng/spark-12012.physical-rdd-metadata.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala2
9 files changed, 80 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 623348f676..b8a4302588 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -97,22 +97,31 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
- extraInformation: String,
+ override val nodeName: String,
+ override val metadata: Map[String, String] = Map.empty,
override val outputsUnsafeRows: Boolean = false)
extends LeafNode {
protected override def doExecute(): RDD[InternalRow] = rdd
- override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
+ override def simpleString: String = {
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+ s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
+ }
}
private[sql] object PhysicalRDD {
+ // Metadata keys
+ val INPUT_PATHS = "InputPaths"
+ val PUSHED_FILTERS = "PushedFilters"
+
def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
- extraInformation: String = ""): PhysicalRDD = {
- PhysicalRDD(output, rdd, relation.toString + extraInformation,
- relation.isInstanceOf[HadoopFsRelation])
+ metadata: Map[String, String] = Map.empty): PhysicalRDD = {
+ // All HadoopFsRelations output UnsafeRows
+ val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
+ PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index a78177751c..ec98f81041 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -68,6 +68,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
/**
+ * Return all metadata that describes more details of this SparkPlan.
+ */
+ private[sql] def metadata: Map[String, String] = Map.empty
+
+ /**
* Return all metrics containing metrics of this SparkPlan.
*/
private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index 486ce34064..4f750ad13a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -30,6 +30,7 @@ class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
+ val metadata: Map[String, String],
val metrics: Seq[SQLMetricInfo])
private[sql] object SparkPlanInfo {
@@ -41,6 +42,6 @@ private[sql] object SparkPlanInfo {
}
val children = plan.children.map(fromSparkPlan)
- new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
+ new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f67c951bc0..25e98c0bdd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -363,7 +363,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
- case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
+ case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil
case BroadcastHint(child) => apply(child)
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 544d5eccec..8a15a51d82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
@@ -25,6 +27,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
+import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
@@ -315,7 +318,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
- val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
+ val metadata: Map[String, String] = {
+ val pairs = ArrayBuffer.empty[(String, String)]
+
+ if (pushedFilters.nonEmpty) {
+ pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
+ }
+
+ relation.relation match {
+ case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.paths.mkString(", ")
+ case _ =>
+ }
+
+ pairs.toMap
+ }
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
@@ -334,7 +350,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, pushedFiltersString)
+ relation.relation, metadata)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
@@ -344,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
- relation.relation, pushedFiltersString)
+ relation.relation, metadata)
execution.Project(
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index bb3e278697..1af2a394f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -146,6 +146,12 @@ private[sql] class ParquetRelation(
meta
}
+ override def toString: String = {
+ parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName =>
+ s"${getClass.getSimpleName}: $tableName"
+ }.getOrElse(super.toString)
+ }
+
override def equals(other: Any): Boolean = other match {
case that: ParquetRelation =>
val schemaEquality = if (shouldMergeSchemas) {
@@ -521,6 +527,10 @@ private[sql] object ParquetRelation extends Logging {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
+ // If a ParquetRelation is converted from a Hive metastore table, this option is set to the
+ // original Hive table name.
+ private[sql] val METASTORE_TABLE_NAME = "metastoreTableName"
+
/**
* If parquet's block size (row group size) setting is larger than the min split size,
* we use parquet's block size setting as the min split size. Otherwise, we will create
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 7af0ff09c5..3a6eff9399 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -66,7 +66,9 @@ private[sql] object SparkPlanGraph {
SQLMetrics.getMetricParam(metric.metricParam))
}
val node = SparkPlanGraphNode(
- nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
+ nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
+ planInfo.simpleString, planInfo.metadata, metrics)
+
nodes += node
val childrenNodes = planInfo.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
@@ -85,26 +87,33 @@ private[sql] object SparkPlanGraph {
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] case class SparkPlanGraphNode(
- id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) {
+ id: Long,
+ name: String,
+ desc: String,
+ metadata: Map[String, String],
+ metrics: Seq[SQLPlanMetric]) {
def makeDotNode(metricsValue: Map[Long, String]): String = {
- val values = {
- for (metric <- metrics;
- value <- metricsValue.get(metric.accumulatorId)) yield {
- metric.name + ": " + value
- }
+ val builder = new mutable.StringBuilder(name)
+
+ val values = for {
+ metric <- metrics
+ value <- metricsValue.get(metric.accumulatorId)
+ } yield {
+ metric.name + ": " + value
}
- val label = if (values.isEmpty) {
- name
- } else {
- // If there are metrics, display all metrics in a separate line. We should use an escaped
- // "\n" here to follow the dot syntax.
- //
- // Note: whitespace between two "\n"s is to create an empty line between the name of
- // SparkPlan and metrics. If removing it, it won't display the empty line in UI.
- name + "\\n \\n" + values.mkString("\\n")
- }
- s""" $id [label="$label"];"""
+
+ if (values.nonEmpty) {
+ // If there are metrics, display each entry in a separate line. We should use an escaped
+ // "\n" here to follow the dot syntax.
+ //
+ // Note: whitespace between two "\n"s is to create an empty line between the name of
+ // SparkPlan and metrics. If removing it, it won't display the empty line in UI.
+ builder ++= "\\n \\n"
+ builder ++= values.mkString("\\n")
+ }
+
+ s""" $id [label="${builder.toString()}"];"""
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 9ace25dc7d..fc8ce6901d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -422,7 +422,7 @@ abstract class HadoopFsRelation private[sql](
parameters: Map[String, String])
extends BaseRelation with FileRelation with Logging {
- override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
+ override def toString: String = getClass.getSimpleName
def this() = this(None, Map.empty[String, String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a4626259b2..2fb439f501 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext {
withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
- assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
+ assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
}
}
}