aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-04-10 13:36:08 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-10 13:36:08 +0800
commit3d7f201f2adc2d33be6f564fa76435c18552f4ba (patch)
tree2c34606cf5cf36da43cf4d9b7056bf2b0c33cd44 /sql/catalyst
parent1a0bc41659eef317dcac18df35c26857216a4314 (diff)
downloadspark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.tar.gz
spark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.tar.bz2
spark-3d7f201f2adc2d33be6f564fa76435c18552f4ba.zip
[SPARK-20229][SQL] add semanticHash to QueryPlan
## What changes were proposed in this pull request? Like `Expression`, `QueryPlan` should also have a `semanticHash` method, then we can put plans to a hash map and look it up fast. This PR refactors `QueryPlan` to follow `Expression` and put all the normalization logic in `QueryPlan.canonicalized`, so that it's very natural to implement `semanticHash`. follow-up: improve `CacheManager` to leverage this `semanticHash` and speed up plan lookup, instead of iterating all cached plans. ## How was this patch tested? existing tests. Note that we don't need to test the `semanticHash` method, once the existing tests prove `sameResult` is correct, we are good. Author: Wenchen Fan <wenchen@databricks.com> Closes #17541 from cloud-fan/plan-semantic.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala102
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala9
7 files changed, 75 insertions, 61 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c698ca6a83..b0cdef7029 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -617,7 +617,7 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
- lookupTableFromCatalog(u).canonicalized match {
+ EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 360e55d922..cc0cbba275 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -423,8 +423,15 @@ case class CatalogRelation(
Objects.hashCode(tableMeta.identifier, output)
}
- /** Only compare table identifier. */
- override lazy val cleanArgs: Seq[Any] = Seq(tableMeta.identifier)
+ override def preCanonicalized: LogicalPlan = copy(tableMeta = CatalogTable(
+ identifier = tableMeta.identifier,
+ tableType = tableMeta.tableType,
+ storage = CatalogStorageFormat.empty,
+ schema = tableMeta.schema,
+ partitionColumnNames = tableMeta.partitionColumnNames,
+ bucketSpec = tableMeta.bucketSpec,
+ createTime = -1
+ ))
override def computeStats(conf: SQLConf): Statistics = {
// For data source tables, we will create a `LogicalRelation` and won't call this method, for
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 2d8ec2053a..3008e8cb84 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -359,9 +359,59 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
/**
- * Canonicalized copy of this query plan.
+ * Returns a plan where a best effort attempt has been made to transform `this` in a way
+ * that preserves the result but removes cosmetic variations (case sensitivity, ordering for
+ * commutative operations, expression id, etc.)
+ *
+ * Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same
+ * result.
+ *
+ * Some nodes should overwrite this to provide proper canonicalize logic.
+ */
+ lazy val canonicalized: PlanType = {
+ val canonicalizedChildren = children.map(_.canonicalized)
+ var id = -1
+ preCanonicalized.mapExpressions {
+ case a: Alias =>
+ id += 1
+ // As the root of the expression, Alias will always take an arbitrary exprId, we need to
+ // normalize that for equality testing, by assigning expr id from 0 incrementally. The
+ // alias name doesn't matter and should be erased.
+ Alias(normalizeExprId(a.child), "")(ExprId(id), a.qualifier, isGenerated = a.isGenerated)
+
+ case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
+ // Top level `AttributeReference` may also be used for output like `Alias`, we should
+ // normalize the epxrId too.
+ id += 1
+ ar.withExprId(ExprId(id))
+
+ case other => normalizeExprId(other)
+ }.withNewChildren(canonicalizedChildren)
+ }
+
+ /**
+ * Do some simple transformation on this plan before canonicalizing. Implementations can override
+ * this method to provide customized canonicalize logic without rewriting the whole logic.
*/
- protected lazy val canonicalized: PlanType = this
+ protected def preCanonicalized: PlanType = this
+
+ /**
+ * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
+ * with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
+ * do not use `BindReferences` here as the plan may take the expression as a parameter with type
+ * `Attribute`, and replace it with `BoundReference` will cause error.
+ */
+ protected def normalizeExprId[T <: Expression](e: T, input: AttributeSeq = allAttributes): T = {
+ e.transformUp {
+ case ar: AttributeReference =>
+ val ordinal = input.indexOf(ar.exprId)
+ if (ordinal == -1) {
+ ar
+ } else {
+ ar.withExprId(ExprId(ordinal))
+ }
+ }.canonicalized.asInstanceOf[T]
+ }
/**
* Returns true when the given query plan will return the same results as this query plan.
@@ -372,49 +422,19 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* enhancements like caching. However, it is not acceptable to return true if the results could
* possibly be different.
*
- * By default this function performs a modified version of equality that is tolerant of cosmetic
- * differences like attribute naming and or expression id differences. Operators that
- * can do better should override this function.
+ * This function performs a modified version of equality that is tolerant of cosmetic
+ * differences like attribute naming and or expression id differences.
*/
- def sameResult(plan: PlanType): Boolean = {
- val left = this.canonicalized
- val right = plan.canonicalized
- left.getClass == right.getClass &&
- left.children.size == right.children.size &&
- left.cleanArgs == right.cleanArgs &&
- (left.children, right.children).zipped.forall(_ sameResult _)
- }
+ final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized
+
+ /**
+ * Returns a `hashCode` for the calculation performed by this plan. Unlike the standard
+ * `hashCode`, an attempt has been made to eliminate cosmetic differences.
+ */
+ final def semanticHash(): Int = canonicalized.hashCode()
/**
* All the attributes that are used for this plan.
*/
lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
-
- protected def cleanExpression(e: Expression): Expression = e match {
- case a: Alias =>
- // As the root of the expression, Alias will always take an arbitrary exprId, we need
- // to erase that for equality testing.
- val cleanedExprId =
- Alias(a.child, a.name)(ExprId(-1), a.qualifier, isGenerated = a.isGenerated)
- BindReferences.bindReference(cleanedExprId, allAttributes, allowFailures = true)
- case other =>
- BindReferences.bindReference(other, allAttributes, allowFailures = true)
- }
-
- /** Args that have cleaned such that differences in expression id should not affect equality */
- protected lazy val cleanArgs: Seq[Any] = {
- def cleanArg(arg: Any): Any = arg match {
- // Children are checked using sameResult above.
- case tn: TreeNode[_] if containsChild(tn) => null
- case e: Expression => cleanExpression(e).canonicalized
- case other => other
- }
-
- mapProductIterator {
- case s: Option[_] => s.map(cleanArg)
- case s: Seq[_] => s.map(cleanArg)
- case m: Map[_, _] => m.mapValues(cleanArg)
- case other => cleanArg(other)
- }.toSeq
- }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index b7177c4a2c..9cd5dfd21b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -67,14 +67,6 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
}
}
- override def sameResult(plan: LogicalPlan): Boolean = {
- plan.canonicalized match {
- case LocalRelation(otherOutput, otherData) =>
- otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
- case _ => false
- }
- }
-
override def computeStats(conf: SQLConf): Statistics =
Statistics(sizeInBytes =
output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 036b625668..6bdcf490ca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -143,8 +143,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def childrenResolved: Boolean = children.forall(_.resolved)
- override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)
-
/**
* Resolves a given schema to concrete [[Attribute]] references in this query plan. This function
* should only be called on analyzed plans since it will throw [[AnalysisException]] for
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c91de08ca5..3ad757ebba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -803,6 +803,8 @@ case class SubqueryAlias(
child: LogicalPlan)
extends UnaryNode {
+ override lazy val canonicalized: LogicalPlan = child.canonicalized
+
override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
index 9dfdf4da78..2ab46dc833 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
@@ -26,10 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
trait BroadcastMode {
def transform(rows: Array[InternalRow]): Any
- /**
- * Returns true iff this [[BroadcastMode]] generates the same result as `other`.
- */
- def compatibleWith(other: BroadcastMode): Boolean
+ def canonicalized: BroadcastMode
}
/**
@@ -39,7 +36,5 @@ case object IdentityBroadcastMode extends BroadcastMode {
// TODO: pack the UnsafeRows into single bytes array.
override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows
- override def compatibleWith(other: BroadcastMode): Boolean = {
- this eq other
- }
+ override def canonicalized: BroadcastMode = this
}