aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-07-07 15:49:22 -0700
committerMichael Armbrust <michael@databricks.com>2015-07-07 15:49:22 -0700
commitda56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319 (patch)
tree4dc1ae6b8384665e737c3bd1c77ea5aa84062b4a
parent3bf20c27ff3cb3a32bfc3a44e08a57865957c117 (diff)
downloadspark-da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319.tar.gz
spark-da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319.tar.bz2
spark-da56c4e728d6bdb6d5559ec7fd8a3f7ddd1e2319.zip
[SPARK-8794] [SQL] Make PrunedScan work for Sample
JIRA: https://issues.apache.org/jira/browse/SPARK-8794 Currently `PrunedScan` works only when followed by project or filter operations. However, even if there is a `Sample` between these operations and `PrunedScan`, `PrunedScan` should work too. Author: Liang-Chi Hsieh <viirya@appier.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #7228 from viirya/sample_prunedscan and squashes the following commits: ede7cd8 [Liang-Chi Hsieh] Keep PrunedScanSuite untouched. 6f05d30 [Liang-Chi Hsieh] Move unit test to FilterPushdownSuite. 5f32473 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into sample_prunedscan 7e4ba76 [Liang-Chi Hsieh] Use Optimzier for push down projection and filter. 0686830 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into sample_prunedscan df82785 [Liang-Chi Hsieh] Make PrunedScan work on Sample.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala16
2 files changed, 34 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index bfd24287c9..7d41ef9aaf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -41,6 +41,7 @@ object DefaultOptimizer extends Optimizer {
Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
UnionPushDown,
+ SamplePushDown,
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
@@ -66,6 +67,23 @@ object DefaultOptimizer extends Optimizer {
}
/**
+ * Pushes operations down into a Sample.
+ */
+object SamplePushDown extends Rule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // Push down filter into sample
+ case Filter(condition, s @ Sample(lb, up, replace, seed, child)) =>
+ Sample(lb, up, replace, seed,
+ Filter(condition, child))
+ // Push down projection into sample
+ case Project(projectList, s @ Sample(lb, up, replace, seed, child)) =>
+ Sample(lb, up, replace, seed,
+ Project(projectList, child))
+ }
+}
+
+/**
* Pushes operations to either side of a Union.
*/
object UnionPushDown extends Rule[LogicalPlan] {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index ffdc673cdc..dc28b3ffb5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -34,6 +34,7 @@ class FilterPushdownSuite extends PlanTest {
Batch("Subqueries", Once,
EliminateSubQueries) ::
Batch("Filter Pushdown", Once,
+ SamplePushDown,
CombineFilters,
PushPredicateThroughProject,
BooleanSimplification,
@@ -593,4 +594,19 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized1, analysis.EliminateSubQueries(correctAnswer1))
}
+
+ test("push project and filter down into sample") {
+ val x = testRelation.subquery('x)
+ val originalQuery =
+ Sample(0.0, 0.6, false, 11L, x).select('a)
+
+ val originalQueryAnalyzed = EliminateSubQueries(analysis.SimpleAnalyzer.execute(originalQuery))
+
+ val optimized = Optimize.execute(originalQueryAnalyzed)
+
+ val correctAnswer =
+ Sample(0.0, 0.6, false, 11L, x.select('a))
+
+ comparePlans(optimized, correctAnswer.analyze)
+ }
}