diff options
author | Xiao Li <gatorsmile@gmail.com> | 2017-04-10 09:15:04 -0700 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-04-10 09:15:04 -0700 |
commit | fd711ea13e558f0e7d3e01f08e01444d394499a6 (patch) | |
tree | c4c9815b52cbeec31e06bb873a926171fb44d26c /sql | |
parent | 5acaf8c0c685e47ec619fbdfd353163721e1cf50 (diff) | |
download | spark-fd711ea13e558f0e7d3e01f08e01444d394499a6.tar.gz spark-fd711ea13e558f0e7d3e01f08e01444d394499a6.tar.bz2 spark-fd711ea13e558f0e7d3e01f08e01444d394499a6.zip |
[SPARK-20273][SQL] Disallow Non-deterministic Filter push-down into Join Conditions
## What changes were proposed in this pull request?
```
sql("SELECT t1.b, rand(0) as r FROM cachedData, cachedData t1 GROUP BY t1.b having r > 0.5").show()
```
We will get the following error:
```
Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 8, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
```
Filters could be pushed down to the join conditions by the optimizer rule `PushPredicateThroughJoin`. However, Analyzer [blocks users to add non-deterministics conditions](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L386-L395) (For details, see the PR https://github.com/apache/spark/pull/7535).
We should not push down non-deterministic conditions; otherwise, we need to explicitly initialize the non-deterministic expressions. This PR is to simply block it.
### How was this patch tested?
Added a test case
Author: Xiao Li <gatorsmile@gmail.com>
Closes #17585 from gatorsmile/joinRandCondition.
Diffstat (limited to 'sql')
2 files changed, 12 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 1235204591..8acb740f8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -90,6 +90,8 @@ trait PredicateHelper { * Returns true iff `expr` could be evaluated as a condition within join. */ protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match { + // Non-deterministic expressions are not allowed as join conditions. + case e if !e.deterministic => false case l: ListQuery => // A ListQuery defines the query which we want to search in an IN subquery expression. // Currently the only way to evaluate an IN subquery is to convert it to a diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ccd0b7c5d7..950aa23795 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -241,6 +241,16 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("joins: do not push down non-deterministic filters into join condition") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = x.join(y).where(Rand(10) > 5.0).analyze + val optimized = Optimize.execute(originalQuery) + + comparePlans(optimized, originalQuery) + } + test("joins: push to one side after transformCondition") { val x = testRelation.subquery('x) val y = testRelation1.subquery('y) |