aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-03-09 12:04:29 -0800
committerDavies Liu <davies.liu@gmail.com>2016-03-09 12:04:29 -0800
commit3dc9ae2e158e5b51df6f799767946fe1d190156b (patch)
tree06d65a71e1612051ec62498f5175d7e556a89acf /sql/catalyst/src
parent0dd06485c4222a896c0d1ee6a04d30043de3626c (diff)
downloadspark-3dc9ae2e158e5b51df6f799767946fe1d190156b.tar.gz
spark-3dc9ae2e158e5b51df6f799767946fe1d190156b.tar.bz2
spark-3dc9ae2e158e5b51df6f799767946fe1d190156b.zip
[SPARK-13523] [SQL] Reuse exchanges in a query
## What changes were proposed in this pull request? It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache). Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query. In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan. Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning. After the rule, the plan will looks like: ``` WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None : :- Project [id#0L] : : +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None : : :- Range 0, 1, 4, 1024, [id#0L] : : +- INPUT : +- INPUT :- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) : +- WholeStageCodegen : : +- Range 0, 1, 4, 1024, [id#1L] +- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) ``` ![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png) For three ways SortMergeJoin, ``` == Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- SortMergeJoin [id#0L], [id#4L], None : :- INPUT : +- INPUT :- WholeStageCodegen : : +- Project [id#0L] : : +- SortMergeJoin [id#0L], [id#3L], None : : :- INPUT : : +- INPUT : :- WholeStageCodegen : : : +- Sort [id#0L ASC], false, 0 : : : +- INPUT : : +- Exchange hashpartitioning(id#0L, 200), None : : +- WholeStageCodegen : : : +- Range 0, 1, 4, 33554432, [id#0L] : +- WholeStageCodegen : : +- Sort [id#3L ASC], false, 0 : : +- INPUT : +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- Sort [id#4L ASC], false, 0 : +- INPUT +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None ``` ![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png) If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents. ## How was this patch tested? Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ). Author: Davies Liu <davies@databricks.com> Closes #11403 from davies/dedup.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala63
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala55
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala9
3 files changed, 72 insertions, 55 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index c62d5ead86..371d72ef5a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.{DataType, StructType}
-abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
+abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
self: PlanType =>
def output: Seq[Attribute]
@@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
}
override def innerChildren: Seq[PlanType] = subqueries
+
+ /**
+ * Canonicalized copy of this query plan.
+ */
+ protected lazy val canonicalized: PlanType = this
+
+ /**
+ * Returns true when the given query plan will return the same results as this query plan.
+ *
+ * Since its likely undecidable to generally determine if two given plans will produce the same
+ * results, it is okay for this function to return false, even if the results are actually
+ * the same. Such behavior will not affect correctness, only the application of performance
+ * enhancements like caching. However, it is not acceptable to return true if the results could
+ * possibly be different.
+ *
+ * By default this function performs a modified version of equality that is tolerant of cosmetic
+ * differences like attribute naming and or expression id differences. Operators that
+ * can do better should override this function.
+ */
+ def sameResult(plan: PlanType): Boolean = {
+ val canonicalizedLeft = this.canonicalized
+ val canonicalizedRight = plan.canonicalized
+ canonicalizedLeft.getClass == canonicalizedRight.getClass &&
+ canonicalizedLeft.children.size == canonicalizedRight.children.size &&
+ canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs &&
+ (canonicalizedLeft.children, canonicalizedRight.children).zipped.forall(_ sameResult _)
+ }
+
+ /**
+ * All the attributes that are used for this plan.
+ */
+ lazy val allAttributes: Seq[Attribute] = children.flatMap(_.output)
+
+ private def cleanExpression(e: Expression): Expression = e match {
+ case a: Alias =>
+ // As the root of the expression, Alias will always take an arbitrary exprId, we need
+ // to erase that for equality testing.
+ val cleanedExprId =
+ Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated)
+ BindReferences.bindReference(cleanedExprId, allAttributes, allowFailures = true)
+ case other =>
+ BindReferences.bindReference(other, allAttributes, allowFailures = true)
+ }
+
+ /** Args that have cleaned such that differences in expression id should not affect equality */
+ protected lazy val cleanArgs: Seq[Any] = {
+ def cleanArg(arg: Any): Any = arg match {
+ case e: Expression => cleanExpression(e).canonicalized
+ case other => other
+ }
+
+ productIterator.map {
+ // Children are checked using sameResult above.
+ case tn: TreeNode[_] if containsChild(tn) => null
+ case e: Expression => cleanArg(e)
+ case s: Option[_] => s.map(cleanArg)
+ case s: Seq[_] => s.map(cleanArg)
+ case m: Map[_, _] => m.mapValues(cleanArg)
+ case other => other
+ }.toSeq
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 31e775d60f..b32c7d0fcb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -114,60 +114,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
def childrenResolved: Boolean = children.forall(_.resolved)
- /**
- * Returns true when the given logical plan will return the same results as this logical plan.
- *
- * Since its likely undecidable to generally determine if two given plans will produce the same
- * results, it is okay for this function to return false, even if the results are actually
- * the same. Such behavior will not affect correctness, only the application of performance
- * enhancements like caching. However, it is not acceptable to return true if the results could
- * possibly be different.
- *
- * By default this function performs a modified version of equality that is tolerant of cosmetic
- * differences like attribute naming and or expression id differences. Logical operators that
- * can do better should override this function.
- */
- def sameResult(plan: LogicalPlan): Boolean = {
- val cleanLeft = EliminateSubqueryAliases(this)
- val cleanRight = EliminateSubqueryAliases(plan)
-
- cleanLeft.getClass == cleanRight.getClass &&
- cleanLeft.children.size == cleanRight.children.size && {
- logDebug(
- s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
- cleanRight.cleanArgs == cleanLeft.cleanArgs
- } &&
- (cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
- }
-
- /** Args that have cleaned such that differences in expression id should not affect equality */
- protected lazy val cleanArgs: Seq[Any] = {
- val input = children.flatMap(_.output)
- def cleanExpression(e: Expression) = e match {
- case a: Alias =>
- // As the root of the expression, Alias will always take an arbitrary exprId, we need
- // to erase that for equality testing.
- val cleanedExprId =
- Alias(a.child, a.name)(ExprId(-1), a.qualifiers, isGenerated = a.isGenerated)
- BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
- case other => BindReferences.bindReference(other, input, allowFailures = true)
- }
-
- productIterator.map {
- // Children are checked using sameResult above.
- case tn: TreeNode[_] if containsChild(tn) => null
- case e: Expression => cleanExpression(e)
- case s: Option[_] => s.map {
- case e: Expression => cleanExpression(e)
- case other => other
- }
- case s: Seq[_] => s.map {
- case e: Expression => cleanExpression(e)
- case other => other
- }
- case other => other
- }.toSeq
- }
+ override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)
/**
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
index e01f69f813..9dfdf4da78 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
@@ -25,6 +25,11 @@ import org.apache.spark.sql.catalyst.InternalRow
*/
trait BroadcastMode {
def transform(rows: Array[InternalRow]): Any
+
+ /**
+ * Returns true iff this [[BroadcastMode]] generates the same result as `other`.
+ */
+ def compatibleWith(other: BroadcastMode): Boolean
}
/**
@@ -33,4 +38,8 @@ trait BroadcastMode {
case object IdentityBroadcastMode extends BroadcastMode {
// TODO: pack the UnsafeRows into single bytes array.
override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows
+
+ override def compatibleWith(other: BroadcastMode): Boolean = {
+ this eq other
+ }
}