aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-04-10 16:20:33 -0700
committerReynold Xin <rxin@apache.org>2014-04-10 16:20:33 -0700
commitf99401a6308d5b9a9259d7597a35ba92f927aa50 (patch)
tree371ad2a26369040317fcba657f5f71e8c11a5072 /sql
parent930b70f0523e96fe01c1317ef7fad1b76b36d4d9 (diff)
downloadspark-f99401a6308d5b9a9259d7597a35ba92f927aa50.tar.gz
spark-f99401a6308d5b9a9259d7597a35ba92f927aa50.tar.bz2
spark-f99401a6308d5b9a9259d7597a35ba92f927aa50.zip
[SQL] Improve column pruning in the optimizer.
Author: Michael Armbrust <michael@databricks.com> Closes #378 from marmbrus/columnPruning and squashes the following commits: 779da56 [Michael Armbrust] More consistent naming. 1a4e9ea [Michael Armbrust] More comments. 2f4e7b9 [Michael Armbrust] Improve column pruning in the optimizer.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala51
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala2
2 files changed, 51 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 37b23ba582..c0a09a16ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -33,7 +33,56 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Filter Pushdown", Once,
CombineFilters,
PushPredicateThroughProject,
- PushPredicateThroughInnerJoin) :: Nil
+ PushPredicateThroughInnerJoin,
+ ColumnPruning) :: Nil
+}
+
+/**
+ * Attempts to eliminate the reading of unneeded columns from the query plan using the following
+ * transformations:
+ *
+ * - Inserting Projections beneath the following operators:
+ * - Aggregate
+ * - Project <- Join
+ * - Collapse adjacent projections, performing alias substitution.
+ */
+object ColumnPruning extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
+ // Project away references that are not needed to calculate the required aggregates.
+ a.copy(child = Project(a.references.toSeq, child))
+
+ case Project(projectList, Join(left, right, joinType, condition)) =>
+ // Collect the list of off references required either above or to evaluate the condition.
+ val allReferences: Set[Attribute] =
+ projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
+ /** Applies a projection when the child is producing unnecessary attributes */
+ def prunedChild(c: LogicalPlan) =
+ if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
+ Project(allReferences.filter(c.outputSet.contains).toSeq, c)
+ } else {
+ c
+ }
+
+ Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
+
+ case Project(projectList1, Project(projectList2, child)) =>
+ // Create a map of Aliases to their values from the child projection.
+ // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
+ val aliasMap = projectList2.collect {
+ case a @ Alias(e, _) => (a.toAttribute: Expression, a)
+ }.toMap
+
+ // Substitute any attributes that are produced by the child projection, so that we safely
+ // eliminate it.
+ // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'
+ // TODO: Fix TransformBase to avoid the cast below.
+ val substitutedProjection = projectList1.map(_.transform {
+ case a if aliasMap.contains(a) => aliasMap(a)
+ }).asInstanceOf[Seq[NamedExpression]]
+
+ Project(substitutedProjection, child)
+ }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index cfc0b0c3a8..397473e178 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -127,7 +127,7 @@ case class Aggregate(
extends UnaryNode {
def output = aggregateExpressions.map(_.toAttribute)
- def references = child.references
+ def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
}
case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode {