diff options
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 51 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 2 |
2 files changed, 51 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 37b23ba582..c0a09a16ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -33,7 +33,56 @@ object Optimizer extends RuleExecutor[LogicalPlan] { Batch("Filter Pushdown", Once, CombineFilters, PushPredicateThroughProject, - PushPredicateThroughInnerJoin) :: Nil + PushPredicateThroughInnerJoin, + ColumnPruning) :: Nil +} + +/** + * Attempts to eliminate the reading of unneeded columns from the query plan using the following + * transformations: + * + * - Inserting Projections beneath the following operators: + * - Aggregate + * - Project <- Join + * - Collapse adjacent projections, performing alias substitution. + */ +object ColumnPruning extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => + // Project away references that are not needed to calculate the required aggregates. + a.copy(child = Project(a.references.toSeq, child)) + + case Project(projectList, Join(left, right, joinType, condition)) => + // Collect the list of off references required either above or to evaluate the condition. + val allReferences: Set[Attribute] = + projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) + /** Applies a projection when the child is producing unnecessary attributes */ + def prunedChild(c: LogicalPlan) = + if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) { + Project(allReferences.filter(c.outputSet.contains).toSeq, c) + } else { + c + } + + Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + + case Project(projectList1, Project(projectList2, child)) => + // Create a map of Aliases to their values from the child projection. + // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)). + val aliasMap = projectList2.collect { + case a @ Alias(e, _) => (a.toAttribute: Expression, a) + }.toMap + + // Substitute any attributes that are produced by the child projection, so that we safely + // eliminate it. + // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...' + // TODO: Fix TransformBase to avoid the cast below. + val substitutedProjection = projectList1.map(_.transform { + case a if aliasMap.contains(a) => aliasMap(a) + }).asInstanceOf[Seq[NamedExpression]] + + Project(substitutedProjection, child) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index cfc0b0c3a8..397473e178 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -127,7 +127,7 @@ case class Aggregate( extends UnaryNode { def output = aggregateExpressions.map(_.toAttribute) - def references = child.references + def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet } case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode { |