From 3aa52be39def24a1b285ede8f2610d93079de5a1 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 5 Jul 2014 11:48:08 -0700 Subject: [SPARK-2366] [SQL] Add column pruning for the right side of LeftSemi join. The right side of `LeftSemi` join needs columns only used in join condition. Author: Takuya UESHIN Closes #1301 from ueshin/issues/SPARK-2366 and squashes the following commits: 7677a39 [Takuya UESHIN] Update comments. 786d3a0 [Takuya UESHIN] Rename method name. e0957b1 [Takuya UESHIN] Add column pruning for the right side of LeftSemi join. (cherry picked from commit 3da8df939ec63064692ba64d9188aeea908b305c) Signed-off-by: Michael Armbrust --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 28 +++++++++++++++------- 1 file changed, 20 insertions(+), 8 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b20b5de8c4..5bc478484c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -52,6 +52,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] { * - Inserting Projections beneath the following operators: * - Aggregate * - Project <- Join + * - LeftSemiJoin * - Collapse adjacent projections, performing alias substitution. */ object ColumnPruning extends Rule[LogicalPlan] { @@ -62,19 +63,22 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => - // Collect the list of off references required either above or to evaluate the condition. + // Collect the list of all references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) /** Applies a projection only when the child is producing unnecessary attributes */ - def prunedChild(c: LogicalPlan) = - if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { - Project(allReferences.filter(c.outputSet.contains).toSeq, c) - } else { - c - } + def pruneJoinChild(c: LogicalPlan) = prunedChild(c, allReferences) - Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition)) + + // Eliminate unneeded attributes from right side of a LeftSemiJoin. + case Join(left, right, LeftSemi, condition) => + // Collect the list of all references required to evaluate the condition. + val allReferences: Set[Attribute] = + condition.map(_.references).getOrElse(Set.empty) + + Join(left, prunedChild(right, allReferences), LeftSemi, condition) // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => @@ -97,6 +101,14 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate no-op Projects case Project(projectList, child) if child.output == projectList => child } + + /** Applies a projection only when the child is producing unnecessary attributes */ + private def prunedChild(c: LogicalPlan, allReferences: Set[Attribute]) = + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { + Project(allReferences.filter(c.outputSet.contains).toSeq, c) + } else { + c + } } /** -- cgit v1.2.3