aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
author蒋星博 <jiangxingbo@meituan.com>2016-07-14 00:21:27 +0800
committerCheng Lian <lian@databricks.com>2016-07-14 00:21:27 +0800
commitf376c37268848dbb4b2fb57677e22ef2bf207b49 (patch)
treebc4fc046291880943c4d2c5ad37625a7548baa84 /sql/catalyst
parentea06e4ef34c860219a9aeec81816ef53ada96253 (diff)
downloadspark-f376c37268848dbb4b2fb57677e22ef2bf207b49.tar.gz
spark-f376c37268848dbb4b2fb57677e22ef2bf207b49.tar.bz2
spark-f376c37268848dbb4b2fb57677e22ef2bf207b49.zip
[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.
## What changes were proposed in this pull request? Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example: ```SELECT a FROM t WHERE rand() < 0.1 AND a = 1``` And ```SELECT a FROM t WHERE a = 1 AND rand() < 0.1``` may call rand() for different times and therefore the output rows differ. This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates. ## How was this patch tested? Expanded related testcases in FilterPushdownSuite. Author: 蒋星博 <jiangxingbo@meituan.com> Closes #14012 from jiangxb1987/ppd.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala44
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala8
2 files changed, 33 insertions, 19 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 368e9a5396..08fb0199fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1128,19 +1128,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
- // pushed beneath must satisfy the following two conditions:
+ // pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
- // 2. Deterministic
+ // 2. Deterministic.
+ // 3. Placed before any non-deterministic predicates.
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
- val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
- cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
- // This is for ensuring all the partitioning expressions have been converted to alias
- // in Analyzer. Thus, we do not need to check if the expressions in conditions are
- // the same as the expressions used in partitioning columns.
- partitionAttrs.forall(_.isInstanceOf[Attribute])
+
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(condition).span(_.deterministic)
+
+ val (pushDown, rest) = candidates.partition { cond =>
+ cond.references.subsetOf(partitionAttrs)
}
+
+ val stayUp = rest ++ containingNonDeterministic
+
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
@@ -1159,11 +1163,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// For each filter, expand the alias and check if the filter can be evaluated using
// attributes produced by the aggregate operator's child operator.
- val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(condition).span(_.deterministic)
+
+ val (pushDown, rest) = candidates.partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
- replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic
+ replaced.references.subsetOf(aggregate.child.outputSet)
}
+ val stayUp = rest ++ containingNonDeterministic
+
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
@@ -1177,9 +1186,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
case filter @ Filter(condition, union: Union) =>
// Union could change the rows, so non-deterministic predicate can't be pushed down
- val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
- cond.deterministic
- }
+ val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
+
if (pushDown.nonEmpty) {
val pushDownCond = pushDown.reduceLeft(And)
val output = union.output
@@ -1219,9 +1227,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// come from grandchild.
// TODO: non-deterministic predicates could be pushed through some operators that do not change
// the rows.
- val (pushDown, stayUp) = splitConjunctivePredicates(filter.condition).partition { cond =>
- cond.deterministic && cond.references.subsetOf(grandchild.outputSet)
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(filter.condition).span(_.deterministic)
+
+ val (pushDown, rest) = candidates.partition { cond =>
+ cond.references.subsetOf(grandchild.outputSet)
}
+
+ val stayUp = rest ++ containingNonDeterministic
+
if (pushDown.nonEmpty) {
val newChild = insertFilter(pushDown.reduceLeft(And))
if (stayUp.nonEmpty) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 9cb49e74ad..780e78ed1c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -531,14 +531,14 @@ class FilterPushdownSuite extends PlanTest {
val originalQuery = {
testRelationWithArrayType
.generate(Explode('c_arr), true, false, Some("arr"))
- .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6))
+ .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('c > 6))
}
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = {
testRelationWithArrayType
.where('b >= 5)
.generate(Explode('c_arr), true, false, Some("arr"))
- .where('a + Rand(10).as("rnd") > 6)
+ .where('a + Rand(10).as("rnd") > 6 && 'c > 6)
.analyze
}
@@ -715,14 +715,14 @@ class FilterPushdownSuite extends PlanTest {
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
val originalQuery = Union(Seq(testRelation, testRelation2))
- .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
+ .where('a === 2L && 'b + Rand(10).as("rnd") === 3 && 'c > 5L)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Union(Seq(
testRelation.where('a === 2L),
testRelation2.where('d === 2L)))
- .where('b + Rand(10).as("rnd") === 3)
+ .where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
.analyze
comparePlans(optimized, correctAnswer)