aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala37
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala9
3 files changed, 39 insertions, 30 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 648a65e7c0..324f40a051 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -85,7 +85,22 @@ object SamplePushDown extends Rule[LogicalPlan] {
}
/**
- * Pushes operations to either side of a Union, Intersect or Except.
+ * Pushes certain operations to both sides of a Union, Intersect or Except operator.
+ * Operations that are safe to pushdown are listed as follows.
+ * Union:
+ * Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is
+ * safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT,
+ * we will not be able to pushdown Projections.
+ *
+ * Intersect:
+ * It is not safe to pushdown Projections through it because we need to get the
+ * intersect of rows by comparing the entire rows. It is fine to pushdown Filters
+ * because we will not have non-deterministic expressions.
+ *
+ * Except:
+ * It is not safe to pushdown Projections through it because we need to get the
+ * intersect of rows by comparing the entire rows. It is fine to pushdown Filters
+ * because we will not have non-deterministic expressions.
*/
object SetOperationPushDown extends Rule[LogicalPlan] {
@@ -122,40 +137,26 @@ object SetOperationPushDown extends Rule[LogicalPlan] {
Filter(condition, left),
Filter(pushToRight(condition, rewrites), right))
- // Push down projection into union
+ // Push down projection through UNION ALL
case Project(projectList, u @ Union(left, right)) =>
val rewrites = buildRewrites(u)
Union(
Project(projectList, left),
Project(projectList.map(pushToRight(_, rewrites)), right))
- // Push down filter into intersect
+ // Push down filter through INTERSECT
case Filter(condition, i @ Intersect(left, right)) =>
val rewrites = buildRewrites(i)
Intersect(
Filter(condition, left),
Filter(pushToRight(condition, rewrites), right))
- // Push down projection into intersect
- case Project(projectList, i @ Intersect(left, right)) =>
- val rewrites = buildRewrites(i)
- Intersect(
- Project(projectList, left),
- Project(projectList.map(pushToRight(_, rewrites)), right))
-
- // Push down filter into except
+ // Push down filter through EXCEPT
case Filter(condition, e @ Except(left, right)) =>
val rewrites = buildRewrites(e)
Except(
Filter(condition, left),
Filter(pushToRight(condition, rewrites), right))
-
- // Push down projection into except
- case Project(projectList, e @ Except(left, right)) =>
- val rewrites = buildRewrites(e)
- Except(
- Project(projectList, left),
- Project(projectList.map(pushToRight(_, rewrites)), right))
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala
index 49c979bc7d..3fca47a023 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala
@@ -60,23 +60,22 @@ class SetOperationPushDownSuite extends PlanTest {
comparePlans(exceptOptimized, exceptCorrectAnswer)
}
- test("union/intersect/except: project to each side") {
+ test("union: project to each side") {
val unionQuery = testUnion.select('a)
+ val unionOptimized = Optimize.execute(unionQuery.analyze)
+ val unionCorrectAnswer =
+ Union(testRelation.select('a), testRelation2.select('d)).analyze
+ comparePlans(unionOptimized, unionCorrectAnswer)
+ }
+
+ test("SPARK-10539: Project should not be pushed down through Intersect or Except") {
val intersectQuery = testIntersect.select('b, 'c)
val exceptQuery = testExcept.select('a, 'b, 'c)
- val unionOptimized = Optimize.execute(unionQuery.analyze)
val intersectOptimized = Optimize.execute(intersectQuery.analyze)
val exceptOptimized = Optimize.execute(exceptQuery.analyze)
- val unionCorrectAnswer =
- Union(testRelation.select('a), testRelation2.select('d)).analyze
- val intersectCorrectAnswer =
- Intersect(testRelation.select('b, 'c), testRelation2.select('e, 'f)).analyze
- val exceptCorrectAnswer =
- Except(testRelation.select('a, 'b, 'c), testRelation2.select('d, 'e, 'f)).analyze
-
- comparePlans(unionOptimized, unionCorrectAnswer)
- comparePlans(intersectOptimized, intersectCorrectAnswer)
- comparePlans(exceptOptimized, exceptCorrectAnswer) }
+ comparePlans(intersectOptimized, intersectQuery.analyze)
+ comparePlans(exceptOptimized, exceptQuery.analyze)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c167999af5..1370713975 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -907,4 +907,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(row.getDouble(1) - row.getDouble(3) === 0.0 +- 0.001)
}
}
+
+ test("SPARK-10539: Project should not be pushed down through Intersect or Except") {
+ val df1 = (1 to 100).map(Tuple1.apply).toDF("i")
+ val df2 = (1 to 30).map(Tuple1.apply).toDF("i")
+ val intersect = df1.intersect(df2)
+ val except = df1.except(df2)
+ assert(intersect.count() === 30)
+ assert(except.count() === 70)
+ }
}