aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-10-27 11:28:59 +0100
committerMichael Armbrust <michael@databricks.com>2015-10-27 11:28:59 +0100
commit360ed832f5213b805ac28cf1d2828be09480f2d6 (patch)
tree6375913415e936a7a847522fa03a27d8be94f312
parent958a0ec8fa58ff091f595db2b574a7aa3ff41253 (diff)
downloadspark-360ed832f5213b805ac28cf1d2828be09480f2d6.tar.gz
spark-360ed832f5213b805ac28cf1d2828be09480f2d6.tar.bz2
spark-360ed832f5213b805ac28cf1d2828be09480f2d6.zip
[SPARK-11303][SQL] filter should not be pushed down into sample
When sampling and then filtering DataFrame, the SQL Optimizer will push down filter into sample and produce wrong result. This is due to the sampler is calculated based on the original scope rather than the scope after filtering. Author: Yanbo Liang <ybliang8@gmail.com> Closes #9294 from yanboliang/spark-11303.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala10
2 files changed, 10 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0139b9e87c..d37f43888f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -74,10 +74,6 @@ object DefaultOptimizer extends Optimizer {
object SamplePushDown extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- // Push down filter into sample
- case Filter(condition, s @ Sample(lb, up, replace, seed, child)) =>
- Sample(lb, up, replace, seed,
- Filter(condition, child))
// Push down projection into sample
case Project(projectList, s @ Sample(lb, up, replace, seed, child)) =>
Sample(lb, up, replace, seed,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 298c322906..f5ae3ae49b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1860,4 +1860,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(1))
}
}
+
+ test("SPARK-11303: filter should not be pushed down into sample") {
+ val df = sqlContext.range(100)
+ List(true, false).foreach { withReplacement =>
+ val sampled = df.sample(withReplacement, 0.1, 1)
+ val sampledOdd = sampled.filter("id % 2 != 0")
+ val sampledEven = sampled.filter("id % 2 = 0")
+ assert(sampled.count() == sampledOdd.count() + sampledEven.count())
+ }
+ }
}