diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-03-02 09:59:22 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-03-02 09:59:22 -0800 |
commit | 8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91 (patch) | |
tree | 2f413b57d7688c3af91b926bbdb0a9f545c1fba7 | |
parent | d8afd45f8949e0914ce4bd56d832b1158e3c9220 (diff) | |
download | spark-8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91.tar.gz spark-8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91.tar.bz2 spark-8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91.zip |
[SPARK-13609] [SQL] Support Column Pruning for MapPartitions
#### What changes were proposed in this pull request?
This PR is to prune unnecessary columns when the operator is `MapPartitions`. The solution is to add an extra `Project` in the child node.
For the other two operators `AppendColumns` and `MapGroups`, it sounds doable. More discussions are required. The major reason is the current implementation of the `inputPlan` of `groupBy` is based on the child of `AppendColumns`. It might be a bug? Thus, will submit a separate PR.
#### How was this patch tested?
Added a test case in ColumnPruningSuite to verify the rule. Added another test case in DatasetSuite.scala to verify the data.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #11460 from gatorsmile/datasetPruningNew.
3 files changed, 28 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2aeb9575f1..55adc06320 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -331,7 +331,10 @@ object ColumnPruning extends Rule[LogicalPlan] { }.unzip._1 } a.copy(child = Expand(newProjects, newOutput, grandChild)) - // TODO: support some logical plan for Dataset + + // Prunes the unused columns from child of MapPartitions + case mp @ MapPartitions(_, _, _, child) if (child.outputSet -- mp.references).nonEmpty => + mp.copy(child = prunedChild(child, mp.references)) // Prunes the unused columns from child of Aggregate/Window/Expand/Generate case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 715d01a3cd..5cab1fc95a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql.catalyst.optimizer +import scala.reflect.runtime.universe.TypeTag + import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ @@ -249,5 +252,16 @@ class ColumnPruningSuite extends PlanTest { comparePlans(Optimize.execute(query), expected) } + implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() + private val func = identity[Iterator[OtherTuple]] _ + + test("Column pruning on MapPartitions") { + val input = LocalRelation('_1.int, '_2.int, 'c.int) + val plan1 = MapPartitions(func, input) + val correctAnswer1 = + MapPartitions(func, Project(Seq('_1, '_2), input)).analyze + comparePlans(Optimize.execute(plan1.analyze), correctAnswer1) + } + // todo: add more tests for column pruning } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 33df6375e3..79e10215f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -113,7 +113,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { ("a", 2), ("b", 3), ("c", 4)) } - test("map with type change") { + test("map with type change with the exact matched number of attributes") { val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() checkAnswer( @@ -123,6 +123,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3)) } + test("map with type change with less attributes") { + val ds = Seq(("a", 1, 3), ("b", 2, 4), ("c", 3, 5)).toDS() + + checkAnswer( + ds.as[OtherTuple] + .map(identity[OtherTuple]), + OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3)) + } + test("map and group by with class data") { // We inject a group by here to make sure this test case is future proof // when we implement better pipelining and local execution mode. |