aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-03-02 09:59:22 -0800
committerDavies Liu <davies.liu@gmail.com>2016-03-02 09:59:22 -0800
commit8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91 (patch)
tree2f413b57d7688c3af91b926bbdb0a9f545c1fba7 /sql
parentd8afd45f8949e0914ce4bd56d832b1158e3c9220 (diff)
downloadspark-8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91.tar.gz
spark-8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91.tar.bz2
spark-8f8d8a2315514cd1f3609bc06e5cf6e6d06fdd91.zip
[SPARK-13609] [SQL] Support Column Pruning for MapPartitions
#### What changes were proposed in this pull request? This PR is to prune unnecessary columns when the operator is `MapPartitions`. The solution is to add an extra `Project` in the child node. For the other two operators `AppendColumns` and `MapGroups`, it sounds doable. More discussions are required. The major reason is the current implementation of the `inputPlan` of `groupBy` is based on the child of `AppendColumns`. It might be a bug? Thus, will submit a separate PR. #### How was this patch tested? Added a test case in ColumnPruningSuite to verify the rule. Added another test case in DatasetSuite.scala to verify the data. Author: gatorsmile <gatorsmile@gmail.com> Closes #11460 from gatorsmile/datasetPruningNew.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala11
3 files changed, 28 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2aeb9575f1..55adc06320 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -331,7 +331,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
}.unzip._1
}
a.copy(child = Expand(newProjects, newOutput, grandChild))
- // TODO: support some logical plan for Dataset
+
+ // Prunes the unused columns from child of MapPartitions
+ case mp @ MapPartitions(_, _, _, child) if (child.outputSet -- mp.references).nonEmpty =>
+ mp.copy(child = prunedChild(child, mp.references))
// Prunes the unused columns from child of Aggregate/Window/Expand/Generate
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 715d01a3cd..5cab1fc95a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.catalyst.optimizer
+import scala.reflect.runtime.universe.TypeTag
+
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
@@ -249,5 +252,16 @@ class ColumnPruningSuite extends PlanTest {
comparePlans(Optimize.execute(query), expected)
}
+ implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
+ private val func = identity[Iterator[OtherTuple]] _
+
+ test("Column pruning on MapPartitions") {
+ val input = LocalRelation('_1.int, '_2.int, 'c.int)
+ val plan1 = MapPartitions(func, input)
+ val correctAnswer1 =
+ MapPartitions(func, Project(Seq('_1, '_2), input)).analyze
+ comparePlans(Optimize.execute(plan1.analyze), correctAnswer1)
+ }
+
// todo: add more tests for column pruning
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 33df6375e3..79e10215f4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -113,7 +113,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
("a", 2), ("b", 3), ("c", 4))
}
- test("map with type change") {
+ test("map with type change with the exact matched number of attributes") {
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
checkAnswer(
@@ -123,6 +123,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3))
}
+ test("map with type change with less attributes") {
+ val ds = Seq(("a", 1, 3), ("b", 2, 4), ("c", 3, 5)).toDS()
+
+ checkAnswer(
+ ds.as[OtherTuple]
+ .map(identity[OtherTuple]),
+ OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3))
+ }
+
test("map and group by with class data") {
// We inject a group by here to make sure this test case is future proof
// when we implement better pipelining and local execution mode.