aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-04-10 13:36:08 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-10 13:36:08 +0800
commit3d7f201f2adc2d33be6f564fa76435c18552f4ba (patch)
tree2c34606cf5cf36da43cf4d9b7056bf2b0c33cd44 /sql/core/src/main
parent1a0bc41659eef317dcac18df35c26857216a4314 (diff)
downloadspark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.tar.gz
spark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.tar.bz2
spark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.zip
[SPARK-20229][SQL] add semanticHash to QueryPlan
## What changes were proposed in this pull request? Like `Expression`, `QueryPlan` should also have a `semanticHash` method, then we can put plans to a hash map and look it up fast. This PR refactors `QueryPlan` to follow `Expression` and put all the normalization logic in `QueryPlan.canonicalized`, so that it's very natural to implement `semanticHash`. follow-up: improve `CacheManager` to leverage this `semanticHash` and speed up plan lookup, instead of iterating all cached plans. ## How was this patch tested? existing tests. Note that we don't need to test the `semanticHash` method, once the existing tests prove `sameResult` is correct, we are good. Author: Wenchen Fan <wenchen@databricks.com> Closes #17541 from cloud-fan/plan-semantic.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala11
8 files changed, 30 insertions, 69 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 2fa660c4d5..3a9132d74a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -119,7 +119,7 @@ case class RowDataSourceScanExec(
val input = ctx.freshName("input")
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) =>
- new BoundReference(i, a.dataType, a.nullable)
+ BoundReference(i, a.dataType, a.nullable)
}
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
@@ -136,19 +136,17 @@ case class RowDataSourceScanExec(
""".stripMargin
}
- // Ignore rdd when checking results
- override def sameResult(plan: SparkPlan): Boolean = plan match {
- case other: RowDataSourceScanExec => relation == other.relation && metadata == other.metadata
- case _ => false
- }
+ // Only care about `relation` and `metadata` when canonicalizing.
+ override def preCanonicalized: SparkPlan =
+ copy(rdd = null, outputPartitioning = null, metastoreTableIdentifier = None)
}
/**
* Physical plan node for scanning data from HadoopFsRelations.
*
* @param relation The file-based relation to scan.
- * @param output Output attributes of the scan.
- * @param outputSchema Output schema of the scan.
+ * @param output Output attributes of the scan, including data attributes and partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding partition columns.
* @param partitionFilters Predicates to use for partition pruning.
* @param dataFilters Filters on non-partition columns.
* @param metastoreTableIdentifier identifier for the table in the metastore.
@@ -156,7 +154,7 @@ case class RowDataSourceScanExec(
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
- outputSchema: StructType,
+ requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
@@ -267,7 +265,7 @@ case class FileSourceScanExec(
val metadata =
Map(
"Format" -> relation.fileFormat.toString,
- "ReadSchema" -> outputSchema.catalogString,
+ "ReadSchema" -> requiredSchema.catalogString,
"Batched" -> supportsBatch.toString,
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
@@ -287,7 +285,7 @@ case class FileSourceScanExec(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
- requiredSchema = outputSchema,
+ requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
@@ -515,14 +513,13 @@ case class FileSourceScanExec(
}
}
- override def sameResult(plan: SparkPlan): Boolean = plan match {
- case other: FileSourceScanExec =>
- val thisPredicates = partitionFilters.map(cleanExpression)
- val otherPredicates = other.partitionFilters.map(cleanExpression)
- val result = relation == other.relation && metadata == other.metadata &&
- thisPredicates.length == otherPredicates.length &&
- thisPredicates.zip(otherPredicates).forall(p => p._1.semanticEquals(p._2))
- result
- case _ => false
+ override lazy val canonicalized: FileSourceScanExec = {
+ FileSourceScanExec(
+ relation,
+ output.map(normalizeExprId(_, output)),
+ requiredSchema,
+ partitionFilters.map(normalizeExprId(_, output)),
+ dataFilters.map(normalizeExprId(_, output)),
+ None)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 2827b8ac00..3d1b481a53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -87,13 +87,6 @@ case class ExternalRDD[T](
override def newInstance(): ExternalRDD.this.type =
ExternalRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type]
- override def sameResult(plan: LogicalPlan): Boolean = {
- plan.canonicalized match {
- case ExternalRDD(_, otherRDD) => rdd.id == otherRDD.id
- case _ => false
- }
- }
-
override protected def stringArgs: Iterator[Any] = Iterator(output)
@transient override def computeStats(conf: SQLConf): Statistics = Statistics(
@@ -162,13 +155,6 @@ case class LogicalRDD(
)(session).asInstanceOf[this.type]
}
- override def sameResult(plan: LogicalPlan): Boolean = {
- plan.canonicalized match {
- case LogicalRDD(_, otherRDD, _, _) => rdd.id == otherRDD.id
- case _ => false
- }
- }
-
override protected def stringArgs: Iterator[Any] = Iterator(output)
@transient override def computeStats(conf: SQLConf): Statistics = Statistics(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index e366b9af35..19c68c1326 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -33,7 +33,7 @@ case class LocalTableScanExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
- private val unsafeRows: Array[InternalRow] = {
+ private lazy val unsafeRows: Array[InternalRow] = {
if (rows.isEmpty) {
Array.empty
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 66a8e044ab..44278e37c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -342,8 +342,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numGeneratedRows" -> SQLMetrics.createMetric(sparkContext, "number of generated rows"))
- // output attributes should not affect the results
- override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)
+ override lazy val canonicalized: SparkPlan = {
+ RangeExec(range.canonicalized.asInstanceOf[org.apache.spark.sql.catalyst.plans.logical.Range])
+ }
override def inputRDDs(): Seq[RDD[InternalRow]] = {
sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
@@ -607,11 +608,6 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
- override def sameResult(o: SparkPlan): Boolean = o match {
- case s: SubqueryExec => child.sameResult(s.child)
- case _ => false
- }
-
@transient
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 4215203960..3813f953e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -43,17 +43,8 @@ case class LogicalRelation(
com.google.common.base.Objects.hashCode(relation, output)
}
- override def sameResult(otherPlan: LogicalPlan): Boolean = {
- otherPlan.canonicalized match {
- case LogicalRelation(otherRelation, _, _) => relation == otherRelation
- case _ => false
- }
- }
-
- // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need
- // LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's
- // expId can be different but the relation is still the same.
- override lazy val cleanArgs: Seq[Any] = Seq(relation)
+ // Only care about relation when canonicalizing.
+ override def preCanonicalized: LogicalPlan = copy(catalogTable = None)
@transient override def computeStats(conf: SQLConf): Statistics = {
catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index efcaca9338..9c859e41f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -48,10 +48,8 @@ case class BroadcastExchangeExec(
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
- override def sameResult(plan: SparkPlan): Boolean = plan match {
- case p: BroadcastExchangeExec =>
- mode.compatibleWith(p.mode) && child.sameResult(p.child)
- case _ => false
+ override lazy val canonicalized: SparkPlan = {
+ BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
}
@transient
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index 9a9597d373..d993ea6c6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -48,10 +48,8 @@ abstract class Exchange extends UnaryExecNode {
case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchange)
extends LeafExecNode {
- override def sameResult(plan: SparkPlan): Boolean = {
- // Ignore this wrapper. `plan` could also be a ReusedExchange, so we reverse the order here.
- plan.sameResult(child)
- }
+ // Ignore this wrapper for canonicalizing.
+ override lazy val canonicalized: SparkPlan = child.canonicalized
def doExecute(): RDD[InternalRow] = {
child.execute()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index b9f6601ea8..2dd1dc3da9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -829,15 +829,10 @@ private[execution] case class HashedRelationBroadcastMode(key: Seq[Expression])
extends BroadcastMode {
override def transform(rows: Array[InternalRow]): HashedRelation = {
- HashedRelation(rows.iterator, canonicalizedKey, rows.length)
+ HashedRelation(rows.iterator, canonicalized.key, rows.length)
}
- private lazy val canonicalizedKey: Seq[Expression] = {
- key.map { e => e.canonicalized }
- }
-
- override def compatibleWith(other: BroadcastMode): Boolean = other match {
- case m: HashedRelationBroadcastMode => canonicalizedKey == m.canonicalizedKey
- case _ => false
+ override lazy val canonicalized: HashedRelationBroadcastMode = {
+ this.copy(key = key.map(_.canonicalized))
}
}