aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-08-17 07:03:24 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-17 07:03:24 -0700
commit0b0c8b95e3594db36d87ef0e59a30eefe8508ac1 (patch)
treeb3f8bb96171a72a422d5e64caacc110fee5319bc /sql/core
parent56d86742d2600b8426d75bd87ab3c73332dca1d2 (diff)
downloadspark-0b0c8b95e3594db36d87ef0e59a30eefe8508ac1.tar.gz
spark-0b0c8b95e3594db36d87ef0e59a30eefe8508ac1.tar.bz2
spark-0b0c8b95e3594db36d87ef0e59a30eefe8508ac1.zip
[SPARK-17106] [SQL] Simplify the SubqueryExpression interface
## What changes were proposed in this pull request? The current subquery expression interface contains a little bit of technical debt in the form of a few different access paths to get and set the query contained by the expression. This is confusing to anyone who goes over this code. This PR unifies these access paths. ## How was this patch tested? (Existing tests) Author: Herman van Hovell <hvanhovell@databricks.com> Closes #14685 from hvanhovell/SPARK-17106.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala1
4 files changed, 20 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index ff8e0f2642..0f51aa58d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -80,7 +80,7 @@ class SQLBuilder private (
try {
val replaced = finalPlan.transformAllExpressions {
case s: SubqueryExpression =>
- val query = new SQLBuilder(s.query, nextSubqueryId, nextGenAttrId, exprIdMap).toSQL
+ val query = new SQLBuilder(s.plan, nextSubqueryId, nextGenAttrId, exprIdMap).toSQL
val sql = s match {
case _: ListQuery => query
case _: Exists => s"EXISTS($query)"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index c730bee6ae..730ca27f82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -22,9 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, DataType, StructType}
@@ -32,18 +31,7 @@ import org.apache.spark.sql.types.{BooleanType, DataType, StructType}
/**
* The base class for subquery that is used in SparkPlan.
*/
-trait ExecSubqueryExpression extends SubqueryExpression {
-
- val executedPlan: SubqueryExec
- def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression
-
- // does not have logical plan
- override def query: LogicalPlan = throw new UnsupportedOperationException
- override def withNewPlan(plan: LogicalPlan): SubqueryExpression =
- throw new UnsupportedOperationException
-
- override def plan: SparkPlan = executedPlan
-
+abstract class ExecSubqueryExpression extends PlanExpression[SubqueryExec] {
/**
* Fill the expression with collected result from executed plan.
*/
@@ -56,30 +44,29 @@ trait ExecSubqueryExpression extends SubqueryExpression {
* This is the physical copy of ScalarSubquery to be used inside SparkPlan.
*/
case class ScalarSubquery(
- executedPlan: SubqueryExec,
+ plan: SubqueryExec,
exprId: ExprId)
extends ExecSubqueryExpression {
- override def dataType: DataType = executedPlan.schema.fields.head.dataType
+ override def dataType: DataType = plan.schema.fields.head.dataType
override def children: Seq[Expression] = Nil
override def nullable: Boolean = true
- override def toString: String = executedPlan.simpleString
-
- def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = copy(executedPlan = plan)
+ override def toString: String = plan.simpleString
+ override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query)
override def semanticEquals(other: Expression): Boolean = other match {
- case s: ScalarSubquery => executedPlan.sameResult(executedPlan)
+ case s: ScalarSubquery => plan.sameResult(s.plan)
case _ => false
}
// the first column in first row from `query`.
- @volatile private var result: Any = null
+ @volatile private var result: Any = _
@volatile private var updated: Boolean = false
def updateResult(): Unit = {
val rows = plan.executeCollect()
if (rows.length > 1) {
- sys.error(s"more than one row returned by a subquery used as an expression:\n${plan}")
+ sys.error(s"more than one row returned by a subquery used as an expression:\n$plan")
}
if (rows.length == 1) {
assert(rows(0).numFields == 1,
@@ -108,7 +95,7 @@ case class ScalarSubquery(
*/
case class InSubquery(
child: Expression,
- executedPlan: SubqueryExec,
+ plan: SubqueryExec,
exprId: ExprId,
private var result: Array[Any] = null,
private var updated: Boolean = false) extends ExecSubqueryExpression {
@@ -116,13 +103,11 @@ case class InSubquery(
override def dataType: DataType = BooleanType
override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = child.nullable
- override def toString: String = s"$child IN ${executedPlan.name}"
-
- def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = copy(executedPlan = plan)
+ override def toString: String = s"$child IN ${plan.name}"
+ override def withNewPlan(plan: SubqueryExec): InSubquery = copy(plan = plan)
override def semanticEquals(other: Expression): Boolean = other match {
- case in: InSubquery => child.semanticEquals(in.child) &&
- executedPlan.sameResult(in.executedPlan)
+ case in: InSubquery => child.semanticEquals(in.child) && plan.sameResult(in.plan)
case _ => false
}
@@ -159,8 +144,8 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
ScalarSubquery(
SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan),
subquery.exprId)
- case expressions.PredicateSubquery(plan, Seq(e: Expression), _, exprId) =>
- val executedPlan = new QueryExecution(sparkSession, plan).executedPlan
+ case expressions.PredicateSubquery(query, Seq(e: Expression), _, exprId) =>
+ val executedPlan = new QueryExecution(sparkSession, query).executedPlan
InSubquery(e, SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId)
}
}
@@ -184,9 +169,9 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]())
val sameResult = sameSchema.find(_.sameResult(sub.plan))
if (sameResult.isDefined) {
- sub.withExecutedPlan(sameResult.get)
+ sub.withNewPlan(sameResult.get)
} else {
- sameSchema += sub.executedPlan
+ sameSchema += sub.plan
sub
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 304881d4a4..cff9d22d08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -292,7 +292,7 @@ abstract class QueryTest extends PlanTest {
p.expressions.foreach {
_.foreach {
case s: SubqueryExpression =>
- s.query.foreach(collectData)
+ s.plan.foreach(collectData)
case _ =>
}
}
@@ -334,7 +334,7 @@ abstract class QueryTest extends PlanTest {
case p =>
p.transformExpressions {
case s: SubqueryExpression =>
- s.withNewPlan(s.query.transformDown(renormalize))
+ s.withNewPlan(s.plan.transformDown(renormalize))
}
}
val normalized2 = jsonBackPlan.transformDown(renormalize)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index 957a1d6426..3988d9750b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Benchmark
/**