aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-08-17 07:03:24 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-17 07:03:24 -0700
commit0b0c8b95e3594db36d87ef0e59a30eefe8508ac1 (patch)
treeb3f8bb96171a72a422d5e64caacc110fee5319bc /sql/catalyst/src/main
parent56d86742d2600b8426d75bd87ab3c73332dca1d2 (diff)
downloadspark-0b0c8b95e3594db36d87ef0e59a30eefe8508ac1.tar.gz
spark-0b0c8b95e3594db36d87ef0e59a30eefe8508ac1.tar.bz2
spark-0b0c8b95e3594db36d87ef0e59a30eefe8508ac1.zip
[SPARK-17106] [SQL] Simplify the SubqueryExpression interface
## What changes were proposed in this pull request? The current subquery expression interface contains a little bit of technical debt in the form of a few different access paths to get and set the query contained by the expression. This is confusing to anyone who goes over this code. This PR unifies these access paths. ## How was this patch tested? (Existing tests) Author: Herman van Hovell <hvanhovell@databricks.com> Closes #14685 from hvanhovell/SPARK-17106.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala60
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala4
4 files changed, 36 insertions, 38 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index bd4c19181f..f540816366 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -146,7 +146,7 @@ class Analyzer(
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other transformExpressions {
case e: SubqueryExpression =>
- e.withNewPlan(substituteCTE(e.query, cteRelations))
+ e.withNewPlan(substituteCTE(e.plan, cteRelations))
}
}
}
@@ -1091,7 +1091,7 @@ class Analyzer(
f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = {
// Step 1: Resolve the outer expressions.
var previous: LogicalPlan = null
- var current = e.query
+ var current = e.plan
do {
// Try to resolve the subquery plan using the regular analyzer.
previous = current
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
index ddbe937cba..e2e7d98e33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -17,33 +17,33 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._
/**
- * An interface for subquery that is used in expressions.
+ * An interface for expressions that contain a [[QueryPlan]].
*/
-abstract class SubqueryExpression extends Expression {
+abstract class PlanExpression[T <: QueryPlan[_]] extends Expression {
/** The id of the subquery expression. */
def exprId: ExprId
- /** The logical plan of the query. */
- def query: LogicalPlan
+ /** The plan being wrapped in the query. */
+ def plan: T
- /**
- * Either a logical plan or a physical plan. The generated tree string (explain output) uses this
- * field to explain the subquery.
- */
- def plan: QueryPlan[_]
-
- /** Updates the query with new logical plan. */
- def withNewPlan(plan: LogicalPlan): SubqueryExpression
+ /** Updates the expression with a new plan. */
+ def withNewPlan(plan: T): PlanExpression[T]
protected def conditionString: String = children.mkString("[", " && ", "]")
}
+/**
+ * A base interface for expressions that contain a [[LogicalPlan]].
+ */
+abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
+ override def withNewPlan(plan: LogicalPlan): SubqueryExpression
+}
+
object SubqueryExpression {
def hasCorrelatedSubquery(e: Expression): Boolean = {
e.find {
@@ -60,20 +60,19 @@ object SubqueryExpression {
* Note: `exprId` is used to have a unique name in explain string output.
*/
case class ScalarSubquery(
- query: LogicalPlan,
+ plan: LogicalPlan,
children: Seq[Expression] = Seq.empty,
exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression with Unevaluable {
- override lazy val resolved: Boolean = childrenResolved && query.resolved
+ override lazy val resolved: Boolean = childrenResolved && plan.resolved
override lazy val references: AttributeSet = {
- if (query.resolved) super.references -- query.outputSet
+ if (plan.resolved) super.references -- plan.outputSet
else super.references
}
- override def dataType: DataType = query.schema.fields.head.dataType
+ override def dataType: DataType = plan.schema.fields.head.dataType
override def foldable: Boolean = false
override def nullable: Boolean = true
- override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
- override def withNewPlan(plan: LogicalPlan): ScalarSubquery = copy(query = plan)
+ override def withNewPlan(plan: LogicalPlan): ScalarSubquery = copy(plan = plan)
override def toString: String = s"scalar-subquery#${exprId.id} $conditionString"
}
@@ -92,19 +91,18 @@ object ScalarSubquery {
* be rewritten into a left semi/anti join during analysis.
*/
case class PredicateSubquery(
- query: LogicalPlan,
+ plan: LogicalPlan,
children: Seq[Expression] = Seq.empty,
nullAware: Boolean = false,
exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression with Predicate with Unevaluable {
- override lazy val resolved = childrenResolved && query.resolved
- override lazy val references: AttributeSet = super.references -- query.outputSet
+ override lazy val resolved = childrenResolved && plan.resolved
+ override lazy val references: AttributeSet = super.references -- plan.outputSet
override def nullable: Boolean = nullAware
- override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
- override def withNewPlan(plan: LogicalPlan): PredicateSubquery = copy(query = plan)
+ override def withNewPlan(plan: LogicalPlan): PredicateSubquery = copy(plan = plan)
override def semanticEquals(o: Expression): Boolean = o match {
case p: PredicateSubquery =>
- query.sameResult(p.query) && nullAware == p.nullAware &&
+ plan.sameResult(p.plan) && nullAware == p.nullAware &&
children.length == p.children.length &&
children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
case _ => false
@@ -146,14 +144,13 @@ object PredicateSubquery {
* FROM b)
* }}}
*/
-case class ListQuery(query: LogicalPlan, exprId: ExprId = NamedExpression.newExprId)
+case class ListQuery(plan: LogicalPlan, exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression with Unevaluable {
override lazy val resolved = false
override def children: Seq[Expression] = Seq.empty
override def dataType: DataType = ArrayType(NullType)
override def nullable: Boolean = false
- override def withNewPlan(plan: LogicalPlan): ListQuery = copy(query = plan)
- override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
+ override def withNewPlan(plan: LogicalPlan): ListQuery = copy(plan = plan)
override def toString: String = s"list#${exprId.id}"
}
@@ -168,12 +165,11 @@ case class ListQuery(query: LogicalPlan, exprId: ExprId = NamedExpression.newExp
* WHERE b.id = a.id)
* }}}
*/
-case class Exists(query: LogicalPlan, exprId: ExprId = NamedExpression.newExprId)
+case class Exists(plan: LogicalPlan, exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression with Predicate with Unevaluable {
override lazy val resolved = false
override def children: Seq[Expression] = Seq.empty
override def nullable: Boolean = false
- override def withNewPlan(plan: LogicalPlan): Exists = copy(query = plan)
- override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
+ override def withNewPlan(plan: LogicalPlan): Exists = copy(plan = plan)
override def toString: String = s"exists#${exprId.id}"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f97a78b411..aa15f4a823 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -127,7 +127,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
object OptimizeSubqueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case s: SubqueryExpression =>
- s.withNewPlan(Optimizer.this.execute(s.query))
+ s.withNewPlan(Optimizer.this.execute(s.plan))
}
}
}
@@ -1814,7 +1814,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
val newExpression = expression transform {
case s: ScalarSubquery if s.children.nonEmpty =>
subqueries += s
- s.query.output.head
+ s.plan.output.head
}
newExpression.asInstanceOf[E]
}
@@ -2029,7 +2029,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
// grouping expressions. As a result we need to replace all the scalar subqueries in the
// grouping expressions by their result.
val newGrouping = grouping.map { e =>
- subqueries.find(_.semanticEquals(e)).map(_.query.output.head).getOrElse(e)
+ subqueries.find(_.semanticEquals(e)).map(_.plan.output.head).getOrElse(e)
}
Aggregate(newGrouping, newExpressions, constructLeftJoins(child, subqueries))
} else {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index becf6945a2..8ee31f42ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -263,7 +263,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
* All the subqueries of current plan.
*/
def subqueries: Seq[PlanType] = {
- expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]})
+ expressions.flatMap(_.collect {
+ case e: PlanExpression[_] => e.plan.asInstanceOf[PlanType]
+ })
}
override protected def innerChildren: Seq[QueryPlan[_]] = subqueries