diff options
author | Liang-Chi Hsieh <viirya@appier.com> | 2015-07-07 15:49:22 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-07-07 15:49:22 -0700 |
commit | da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319 (patch) | |
tree | 4dc1ae6b8384665e737c3bd1c77ea5aa84062b4a | |
parent | 3bf20c27ff3cb3a32bfc3a44e08a57865957c117 (diff) | |
download | spark-da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319.tar.gz spark-da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319.tar.bz2 spark-da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319.zip |
[SPARK-8794] [SQL] Make PrunedScan work for Sample
JIRA: https://issues.apache.org/jira/browse/SPARK-8794
Currently `PrunedScan` works only when followed by project or filter operations. However, even if there is a `Sample` between these operations and `PrunedScan`, `PrunedScan` should work too.
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #7228 from viirya/sample_prunedscan and squashes the following commits:
ede7cd8 [Liang-Chi Hsieh] Keep PrunedScanSuite untouched.
6f05d30 [Liang-Chi Hsieh] Move unit test to FilterPushdownSuite.
5f32473 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into sample_prunedscan
7e4ba76 [Liang-Chi Hsieh] Use Optimzier for push down projection and filter.
0686830 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into sample_prunedscan
df82785 [Liang-Chi Hsieh] Make PrunedScan work on Sample.
2 files changed, 34 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bfd24287c9..7d41ef9aaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -41,6 +41,7 @@ object DefaultOptimizer extends Optimizer { Batch("Operator Optimizations", FixedPoint(100), // Operator push down UnionPushDown, + SamplePushDown, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -66,6 +67,23 @@ object DefaultOptimizer extends Optimizer { } /** + * Pushes operations down into a Sample. + */ +object SamplePushDown extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Push down filter into sample + case Filter(condition, s @ Sample(lb, up, replace, seed, child)) => + Sample(lb, up, replace, seed, + Filter(condition, child)) + // Push down projection into sample + case Project(projectList, s @ Sample(lb, up, replace, seed, child)) => + Sample(lb, up, replace, seed, + Project(projectList, child)) + } +} + +/** * Pushes operations to either side of a Union. */ object UnionPushDown extends Rule[LogicalPlan] { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ffdc673cdc..dc28b3ffb5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -34,6 +34,7 @@ class FilterPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubQueries) :: Batch("Filter Pushdown", Once, + SamplePushDown, CombineFilters, PushPredicateThroughProject, BooleanSimplification, @@ -593,4 +594,19 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized1, analysis.EliminateSubQueries(correctAnswer1)) } + + test("push project and filter down into sample") { + val x = testRelation.subquery('x) + val originalQuery = + Sample(0.0, 0.6, false, 11L, x).select('a) + + val originalQueryAnalyzed = EliminateSubQueries(analysis.SimpleAnalyzer.execute(originalQuery)) + + val optimized = Optimize.execute(originalQueryAnalyzed) + + val correctAnswer = + Sample(0.0, 0.6, false, 11L, x.select('a)) + + comparePlans(optimized, correctAnswer.analyze) + } } |