aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-03-24 22:15:51 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-24 22:15:51 -0700
commitb637f2d91ab4d3d5bf13e8d959c919ebd776f6af (patch)
tree8c6555150402e804f00eca24e7c71eebc3426a23 /sql/core
parent5140598df889f7227c9d6a7953031eeef524badd (diff)
downloadspark-b637f2d91ab4d3d5bf13e8d959c919ebd776f6af.tar.gz
spark-b637f2d91ab4d3d5bf13e8d959c919ebd776f6af.tar.bz2
spark-b637f2d91ab4d3d5bf13e8d959c919ebd776f6af.zip
Unify the logic for column pruning, projection, and filtering of table scans.
This removes duplicated logic, dead code and casting when planning parquet table scans and hive table scans. Other changes: - Fix tests now that we are doing a better job of column pruning (i.e., since pruning predicates are applied before we even start scanning tuples, columns required by these predicates do not need to be included in the output of the scan unless they are also included in the final output of this logical plan fragment). - Add rule to simplify trivial filters. This was required to avoid `WHERE false` from getting pushed into table scans, since `HiveTableScan` (reasonably) refuses to apply partition pruning predicates to non-partitioned tables. Author: Michael Armbrust <michael@databricks.com> Closes #213 from marmbrus/strategyCleanup and squashes the following commits: 48ce403 [Michael Armbrust] Move one more bit of parquet stuff into the core SQLContext. 834ce08 [Michael Armbrust] Address comments. 0f2c6f5 [Michael Armbrust] Unify the logic for column pruning, projection, and filtering of table scans for both Hive and Parquet relations. Fix tests now that we are doing a better job of column pruning.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala41
2 files changed, 59 insertions, 20 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 3e98bd3ca6..cf3c06acce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -118,9 +118,47 @@ class SQLContext(@transient val sparkContext: SparkContext)
TopK ::
PartialAggregation ::
SparkEquiInnerJoin ::
+ ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil
+
+ /**
+ * Used to build table scan operators where complex projection and filtering are done using
+ * separate physical operators. This function returns the given scan operator with Project and
+ * Filter nodes added only when needed. For example, a Project operator is only used when the
+ * final desired output requires complex expressions to be evaluated or when columns can be
+ * further eliminated out after filtering has been done.
+ *
+ * The required attributes for both filtering and expression evaluation are passed to the
+ * provided `scanBuilder` function so that it can avoid unnecessary column materialization.
+ */
+ def pruneFilterProject(
+ projectList: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
+
+ val projectSet = projectList.flatMap(_.references).toSet
+ val filterSet = filterPredicates.flatMap(_.references).toSet
+ val filterCondition = filterPredicates.reduceLeftOption(And)
+
+ // Right now we still use a projection even if the only evaluation is applying an alias
+ // to a column. Since this is a no-op, it could be avoided. However, using this
+ // optimization with the current implementation would change the output schema.
+ // TODO: Decouple final output schema from expression evaluation so this copy can be
+ // avoided safely.
+
+ if (projectList.toSet == projectSet && filterSet.subsetOf(projectSet)) {
+ // When it is possible to just use column pruning to get the right projection and
+ // when the columns of this projection are enough to evaluate all filter conditions,
+ // just do a scan followed by a filter, with no extra project.
+ val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
+ filterCondition.map(Filter(_, scan)).getOrElse(scan)
+ } else {
+ val scan = scanBuilder((projectSet ++ filterSet).toSeq)
+ Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
+ }
+ }
}
@transient
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9eb1032113..8a39ded0a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -18,19 +18,15 @@
package org.apache.spark.sql
package execution
-import org.apache.spark.SparkContext
-
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.parquet.InsertIntoParquetTable
+import org.apache.spark.sql.parquet._
abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
-
- val sparkContext: SparkContext
+ self: SQLContext#SparkPlanner =>
object SparkEquiInnerJoin extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -170,6 +166,25 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
+ object ParquetOperations extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ // TODO: need to support writing to other types of files. Unify the below code paths.
+ case logical.WriteToFile(path, child) =>
+ val relation =
+ ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None)
+ InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil
+ case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
+ InsertIntoParquetTable(table, planLater(child))(sparkContext) :: Nil
+ case PhysicalOperation(projectList, filters, relation: parquet.ParquetRelation) =>
+ // TODO: Should be pushing down filters as well.
+ pruneFilterProject(
+ projectList,
+ filters,
+ ParquetTableScan(_, relation, None)(sparkContext)) :: Nil
+ case _ => Nil
+ }
+ }
+
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
// TODO: Set
@@ -185,14 +200,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
- case logical.Project(projectList, r: ParquetRelation)
- if projectList.forall(_.isInstanceOf[Attribute]) =>
-
- // simple projection of data loaded from Parquet file
- parquet.ParquetTableScan(
- projectList.asInstanceOf[Seq[Attribute]],
- r,
- None)(sparkContext) :: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
@@ -216,12 +223,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ExistingRdd(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
- case logical.WriteToFile(path, child) =>
- val relation =
- ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None)
- InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil
- case p: parquet.ParquetRelation =>
- parquet.ParquetTableScan(p.output, p, None)(sparkContext) :: Nil
case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
case _ => Nil
}