From 0b0c8b95e3594db36d87ef0e59a30eefe8508ac1 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Wed, 17 Aug 2016 07:03:24 -0700 Subject: [SPARK-17106] [SQL] Simplify the SubqueryExpression interface ## What changes were proposed in this pull request? The current subquery expression interface contains a little bit of technical debt in the form of a few different access paths to get and set the query contained by the expression. This is confusing to anyone who goes over this code. This PR unifies these access paths. ## How was this patch tested? (Existing tests) Author: Herman van Hovell Closes #14685 from hvanhovell/SPARK-17106. --- .../org/apache/spark/sql/catalyst/SQLBuilder.scala | 2 +- .../org/apache/spark/sql/execution/subquery.scala | 49 ++++++++-------------- .../scala/org/apache/spark/sql/QueryTest.scala | 4 +- .../execution/benchmark/TPCDSQueryBenchmark.scala | 1 - 4 files changed, 20 insertions(+), 36 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala index ff8e0f2642..0f51aa58d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala @@ -80,7 +80,7 @@ class SQLBuilder private ( try { val replaced = finalPlan.transformAllExpressions { case s: SubqueryExpression => - val query = new SQLBuilder(s.query, nextSubqueryId, nextGenAttrId, exprIdMap).toSQL + val query = new SQLBuilder(s.plan, nextSubqueryId, nextGenAttrId, exprIdMap).toSQL val sql = s match { case _: ListQuery => query case _: Exists => s"EXISTS($query)" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index c730bee6ae..730ca27f82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -22,9 +22,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType, StructType} @@ -32,18 +31,7 @@ import org.apache.spark.sql.types.{BooleanType, DataType, StructType} /** * The base class for subquery that is used in SparkPlan. */ -trait ExecSubqueryExpression extends SubqueryExpression { - - val executedPlan: SubqueryExec - def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression - - // does not have logical plan - override def query: LogicalPlan = throw new UnsupportedOperationException - override def withNewPlan(plan: LogicalPlan): SubqueryExpression = - throw new UnsupportedOperationException - - override def plan: SparkPlan = executedPlan - +abstract class ExecSubqueryExpression extends PlanExpression[SubqueryExec] { /** * Fill the expression with collected result from executed plan. */ @@ -56,30 +44,29 @@ trait ExecSubqueryExpression extends SubqueryExpression { * This is the physical copy of ScalarSubquery to be used inside SparkPlan. */ case class ScalarSubquery( - executedPlan: SubqueryExec, + plan: SubqueryExec, exprId: ExprId) extends ExecSubqueryExpression { - override def dataType: DataType = executedPlan.schema.fields.head.dataType + override def dataType: DataType = plan.schema.fields.head.dataType override def children: Seq[Expression] = Nil override def nullable: Boolean = true - override def toString: String = executedPlan.simpleString - - def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = copy(executedPlan = plan) + override def toString: String = plan.simpleString + override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query) override def semanticEquals(other: Expression): Boolean = other match { - case s: ScalarSubquery => executedPlan.sameResult(executedPlan) + case s: ScalarSubquery => plan.sameResult(s.plan) case _ => false } // the first column in first row from `query`. - @volatile private var result: Any = null + @volatile private var result: Any = _ @volatile private var updated: Boolean = false def updateResult(): Unit = { val rows = plan.executeCollect() if (rows.length > 1) { - sys.error(s"more than one row returned by a subquery used as an expression:\n${plan}") + sys.error(s"more than one row returned by a subquery used as an expression:\n$plan") } if (rows.length == 1) { assert(rows(0).numFields == 1, @@ -108,7 +95,7 @@ case class ScalarSubquery( */ case class InSubquery( child: Expression, - executedPlan: SubqueryExec, + plan: SubqueryExec, exprId: ExprId, private var result: Array[Any] = null, private var updated: Boolean = false) extends ExecSubqueryExpression { @@ -116,13 +103,11 @@ case class InSubquery( override def dataType: DataType = BooleanType override def children: Seq[Expression] = child :: Nil override def nullable: Boolean = child.nullable - override def toString: String = s"$child IN ${executedPlan.name}" - - def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = copy(executedPlan = plan) + override def toString: String = s"$child IN ${plan.name}" + override def withNewPlan(plan: SubqueryExec): InSubquery = copy(plan = plan) override def semanticEquals(other: Expression): Boolean = other match { - case in: InSubquery => child.semanticEquals(in.child) && - executedPlan.sameResult(in.executedPlan) + case in: InSubquery => child.semanticEquals(in.child) && plan.sameResult(in.plan) case _ => false } @@ -159,8 +144,8 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] { ScalarSubquery( SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan), subquery.exprId) - case expressions.PredicateSubquery(plan, Seq(e: Expression), _, exprId) => - val executedPlan = new QueryExecution(sparkSession, plan).executedPlan + case expressions.PredicateSubquery(query, Seq(e: Expression), _, exprId) => + val executedPlan = new QueryExecution(sparkSession, query).executedPlan InSubquery(e, SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId) } } @@ -184,9 +169,9 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]()) val sameResult = sameSchema.find(_.sameResult(sub.plan)) if (sameResult.isDefined) { - sub.withExecutedPlan(sameResult.get) + sub.withNewPlan(sameResult.get) } else { - sameSchema += sub.executedPlan + sameSchema += sub.plan sub } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 304881d4a4..cff9d22d08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -292,7 +292,7 @@ abstract class QueryTest extends PlanTest { p.expressions.foreach { _.foreach { case s: SubqueryExpression => - s.query.foreach(collectData) + s.plan.foreach(collectData) case _ => } } @@ -334,7 +334,7 @@ abstract class QueryTest extends PlanTest { case p => p.transformExpressions { case s: SubqueryExpression => - s.withNewPlan(s.query.transformDown(renormalize)) + s.withNewPlan(s.plan.transformDown(renormalize)) } } val normalized2 = jsonBackPlan.transformDown(renormalize) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index 957a1d6426..3988d9750b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Benchmark /** -- cgit v1.2.3