aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-09-19 13:24:16 -0700
committerDavies Liu <davies.liu@gmail.com>2016-09-19 13:24:16 -0700
commitd8104158a922d86dd4f00e50d5d7dddc7b777a21 (patch)
treeb97cebcefcedbd7171bdfd40c55fe09802de8673
parente0632062635c37cbc77df7ebd2a1846655193e12 (diff)
downloadspark-d8104158a922d86dd4f00e50d5d7dddc7b777a21.tar.gz
spark-d8104158a922d86dd4f00e50d5d7dddc7b777a21.tar.bz2
spark-d8104158a922d86dd4f00e50d5d7dddc7b777a21.zip
[SPARK-17100] [SQL] fix Python udf in filter on top of outer join
## What changes were proposed in this pull request? In optimizer, we try to evaluate the condition to see whether it's nullable or not, but some expressions are not evaluable, we should check that before evaluate it. ## How was this patch tested? Added regression tests. Author: Davies Liu <davies@databricks.com> Closes #15103 from davies/udf_join.
-rw-r--r--python/pyspark/sql/tests.py8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala4
2 files changed, 11 insertions, 1 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1be0b72304..c2171c277c 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -328,6 +328,14 @@ class SQLTests(ReusedPySparkTestCase):
[row] = self.spark.sql("SELECT double(add(1, 2)), add(double(2), 1)").collect()
self.assertEqual(tuple(row), (6, 5))
+ def test_udf_in_filter_on_top_of_outer_join(self):
+ from pyspark.sql.functions import udf
+ left = self.spark.createDataFrame([Row(a=1)])
+ right = self.spark.createDataFrame([Row(a=1)])
+ df = left.join(right, on='a', how='left_outer')
+ df = df.withColumn('b', udf(lambda x: 'x')(df.a))
+ self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')])
+
def test_udf_without_arguments(self):
self.spark.catalog.registerFunction("foo", lambda: "bar")
[row] = self.spark.sql("SELECT foo()").collect()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 1621bffd61..2626057e49 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -109,7 +109,9 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
- val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
+ val boundE = BindReferences.bindReference(e, attributes)
+ if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false
+ val v = boundE.eval(emptyRow)
v == null || v == false
}