aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2017-04-12 12:18:01 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-12 12:18:01 +0800
commitb14bfc3f8e97479ac5927c071b00ed18f2104c95 (patch)
tree0ee9e829fa341baa93677ecd578055d3267cc4d5 /sql/catalyst/src
parent8ad63ee158815de5ffff7bf03cdf25aef312095f (diff)
downloadspark-b14bfc3f8e97479ac5927c071b00ed18f2104c95.tar.gz
spark-b14bfc3f8e97479ac5927c071b00ed18f2104c95.tar.bz2
spark-b14bfc3f8e97479ac5927c071b00ed18f2104c95.zip
[SPARK-19993][SQL] Caching logical plans containing subquery expressions does not work.
## What changes were proposed in this pull request? The sameResult() method does not work when the logical plan contains subquery expressions. **Before the fix** ```SQL scala> val ds = spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)") ds: org.apache.spark.sql.DataFrame = [c1: int] scala> ds.cache res13: ds.type = [c1: int] scala> spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)").explain(true) == Analyzed Logical Plan == c1: int Project [c1#86] +- Filter c1#86 IN (list#78 [c1#86]) : +- Project [c1#87] : +- Filter (outer(c1#86) = c1#87) : +- SubqueryAlias s2 : +- Relation[c1#87] parquet +- SubqueryAlias s1 +- Relation[c1#86] parquet == Optimized Logical Plan == Join LeftSemi, ((c1#86 = c1#87) && (c1#86 = c1#87)) :- Relation[c1#86] parquet +- Relation[c1#87] parquet ``` **Plan after fix** ```SQL == Analyzed Logical Plan == c1: int Project [c1#22] +- Filter c1#22 IN (list#14 [c1#22]) : +- Project [c1#23] : +- Filter (outer(c1#22) = c1#23) : +- SubqueryAlias s2 : +- Relation[c1#23] parquet +- SubqueryAlias s1 +- Relation[c1#22] parquet == Optimized Logical Plan == InMemoryRelation [c1#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *BroadcastHashJoin [c1#1, c1#1], [c1#2, c1#2], LeftSemi, BuildRight :- *FileScan parquet default.s1[c1#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> +- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[0, int, true] as bigint) & 4294967295)))) +- *FileScan parquet default.s2[c1#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> ``` ## How was this patch tested? New tests are added to CachedTableSuite. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #17330 from dilipbiswal/subquery_cache_final.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala43
2 files changed, 49 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
index 59db28d58a..d7b493d521 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -47,7 +47,6 @@ abstract class SubqueryExpression(
plan: LogicalPlan,
children: Seq[Expression],
exprId: ExprId) extends PlanExpression[LogicalPlan] {
-
override lazy val resolved: Boolean = childrenResolved && plan.resolved
override lazy val references: AttributeSet =
if (plan.resolved) super.references -- plan.outputSet else super.references
@@ -59,6 +58,13 @@ abstract class SubqueryExpression(
children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
case _ => false
}
+ def canonicalize(attrs: AttributeSeq): SubqueryExpression = {
+ // Normalize the outer references in the subquery plan.
+ val normalizedPlan = plan.transformAllExpressions {
+ case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))
+ }
+ withNewPlan(normalizedPlan).canonicalized.asInstanceOf[SubqueryExpression]
+ }
}
object SubqueryExpression {
@@ -236,6 +242,12 @@ case class ScalarSubquery(
override def nullable: Boolean = true
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = copy(plan = plan)
override def toString: String = s"scalar-subquery#${exprId.id} $conditionString"
+ override lazy val canonicalized: Expression = {
+ ScalarSubquery(
+ plan.canonicalized,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
}
object ScalarSubquery {
@@ -268,6 +280,12 @@ case class ListQuery(
override def nullable: Boolean = false
override def withNewPlan(plan: LogicalPlan): ListQuery = copy(plan = plan)
override def toString: String = s"list#${exprId.id} $conditionString"
+ override lazy val canonicalized: Expression = {
+ ListQuery(
+ plan.canonicalized,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
}
/**
@@ -290,4 +308,10 @@ case class Exists(
override def nullable: Boolean = false
override def withNewPlan(plan: LogicalPlan): Exists = copy(plan = plan)
override def toString: String = s"exists#${exprId.id} $conditionString"
+ override lazy val canonicalized: Expression = {
+ Exists(
+ plan.canonicalized,
+ children.map(_.canonicalized),
+ ExprId(0))
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 3008e8cb84..2fb65bd435 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -377,7 +377,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
// As the root of the expression, Alias will always take an arbitrary exprId, we need to
// normalize that for equality testing, by assigning expr id from 0 incrementally. The
// alias name doesn't matter and should be erased.
- Alias(normalizeExprId(a.child), "")(ExprId(id), a.qualifier, isGenerated = a.isGenerated)
+ val normalizedChild = QueryPlan.normalizeExprId(a.child, allAttributes)
+ Alias(normalizedChild, "")(ExprId(id), a.qualifier, isGenerated = a.isGenerated)
case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
// Top level `AttributeReference` may also be used for output like `Alias`, we should
@@ -385,7 +386,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
id += 1
ar.withExprId(ExprId(id))
- case other => normalizeExprId(other)
+ case other => QueryPlan.normalizeExprId(other, allAttributes)
}.withNewChildren(canonicalizedChildren)
}
@@ -395,23 +396,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
*/
protected def preCanonicalized: PlanType = this
- /**
- * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
- * with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
- * do not use `BindReferences` here as the plan may take the expression as a parameter with type
- * `Attribute`, and replace it with `BoundReference` will cause error.
- */
- protected def normalizeExprId[T <: Expression](e: T, input: AttributeSeq = allAttributes): T = {
- e.transformUp {
- case ar: AttributeReference =>
- val ordinal = input.indexOf(ar.exprId)
- if (ordinal == -1) {
- ar
- } else {
- ar.withExprId(ExprId(ordinal))
- }
- }.canonicalized.asInstanceOf[T]
- }
/**
* Returns true when the given query plan will return the same results as this query plan.
@@ -438,3 +422,24 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
*/
lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
}
+
+object QueryPlan {
+ /**
+ * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
+ * with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
+ * do not use `BindReferences` here as the plan may take the expression as a parameter with type
+ * `Attribute`, and replace it with `BoundReference` will cause error.
+ */
+ def normalizeExprId[T <: Expression](e: T, input: AttributeSeq): T = {
+ e.transformUp {
+ case s: SubqueryExpression => s.canonicalize(input)
+ case ar: AttributeReference =>
+ val ordinal = input.indexOf(ar.exprId)
+ if (ordinal == -1) {
+ ar
+ } else {
+ ar.withExprId(ExprId(ordinal))
+ }
+ }.canonicalized.asInstanceOf[T]
+ }
+}