aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKoert Kuipers <koert@tresata.com>2017-04-19 15:52:47 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-19 15:52:47 +0800
commit608bf30f0b9759fd0b9b9f33766295550996a9eb (patch)
treeb98d4b1ad4d4205051c0e316bf388914cbe5d7f5
parent702d85af2df9433254af6fa029683aa19c52a276 (diff)
downloadspark-608bf30f0b9759fd0b9b9f33766295550996a9eb.tar.gz
spark-608bf30f0b9759fd0b9b9f33766295550996a9eb.tar.bz2
spark-608bf30f0b9759fd0b9b9f33766295550996a9eb.zip
[SPARK-20359][SQL] Avoid unnecessary execution in EliminateOuterJoin optimization that can lead to NPE
Avoid necessary execution that can lead to NPE in EliminateOuterJoin and add test in DataFrameSuite to confirm NPE is no longer thrown ## What changes were proposed in this pull request? Change leftHasNonNullPredicate and rightHasNonNullPredicate to lazy so they are only executed when needed. ## How was this patch tested? Added test in DataFrameSuite that failed before this fix and now succeeds. Note that a test in catalyst project would be better but i am unsure how to do this. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Koert Kuipers <koert@tresata.com> Closes #17660 from koertkuipers/feat-catch-npe-in-eliminate-outer-join.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala10
2 files changed, 12 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index c3ab587449..2fe3039774 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -134,8 +134,8 @@ case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with Pred
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
- val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
- val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
+ lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+ lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 52bd4e19f8..b4893b56a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1722,4 +1722,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
"Cannot have map type columns in DataFrame which calls set operations"))
}
}
+
+ test("SPARK-20359: catalyst outer join optimization should not throw npe") {
+ val df1 = Seq("a", "b", "c").toDF("x")
+ .withColumn("y", udf{ (x: String) => x.substring(0, 1) + "!" }.apply($"x"))
+ val df2 = Seq("a", "b").toDF("x1")
+ df1
+ .join(df2, df1("x") === df2("x1"), "left_outer")
+ .filter($"x1".isNotNull || !$"y".isin("a!"))
+ .count
+ }
}