From da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Jul 2015 15:49:22 -0700 Subject: [SPARK-8794] [SQL] Make PrunedScan work for Sample JIRA: https://issues.apache.org/jira/browse/SPARK-8794 Currently `PrunedScan` works only when followed by project or filter operations. However, even if there is a `Sample` between these operations and `PrunedScan`, `PrunedScan` should work too. Author: Liang-Chi Hsieh Author: Liang-Chi Hsieh Closes #7228 from viirya/sample_prunedscan and squashes the following commits: ede7cd8 [Liang-Chi Hsieh] Keep PrunedScanSuite untouched. 6f05d30 [Liang-Chi Hsieh] Move unit test to FilterPushdownSuite. 5f32473 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into sample_prunedscan 7e4ba76 [Liang-Chi Hsieh] Use Optimzier for push down projection and filter. 0686830 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into sample_prunedscan df82785 [Liang-Chi Hsieh] Make PrunedScan work on Sample. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 18 ++++++++++++++++++ .../sql/catalyst/optimizer/FilterPushdownSuite.scala | 16 ++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bfd24287c9..7d41ef9aaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -41,6 +41,7 @@ object DefaultOptimizer extends Optimizer { Batch("Operator Optimizations", FixedPoint(100), // Operator push down UnionPushDown, + SamplePushDown, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -65,6 +66,23 @@ object DefaultOptimizer extends Optimizer { ConvertToLocalRelation) :: Nil } +/** + * Pushes operations down into a Sample. + */ +object SamplePushDown extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Push down filter into sample + case Filter(condition, s @ Sample(lb, up, replace, seed, child)) => + Sample(lb, up, replace, seed, + Filter(condition, child)) + // Push down projection into sample + case Project(projectList, s @ Sample(lb, up, replace, seed, child)) => + Sample(lb, up, replace, seed, + Project(projectList, child)) + } +} + /** * Pushes operations to either side of a Union. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ffdc673cdc..dc28b3ffb5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -34,6 +34,7 @@ class FilterPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubQueries) :: Batch("Filter Pushdown", Once, + SamplePushDown, CombineFilters, PushPredicateThroughProject, BooleanSimplification, @@ -593,4 +594,19 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized1, analysis.EliminateSubQueries(correctAnswer1)) } + + test("push project and filter down into sample") { + val x = testRelation.subquery('x) + val originalQuery = + Sample(0.0, 0.6, false, 11L, x).select('a) + + val originalQueryAnalyzed = EliminateSubQueries(analysis.SimpleAnalyzer.execute(originalQuery)) + + val optimized = Optimize.execute(originalQueryAnalyzed) + + val correctAnswer = + Sample(0.0, 0.6, false, 11L, x.select('a)) + + comparePlans(optimized, correctAnswer.analyze) + } } -- cgit v1.2.3