From 9b7a03f15ac45e5f7dcf118d1e7ce1556339aa46 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 20 Jan 2017 16:11:40 -0800 Subject: [SPARK-18589][SQL] Fix Python UDF accessing attributes from both side of join ## What changes were proposed in this pull request? PythonUDF is unevaluable, which can not be used inside a join condition, currently the optimizer will push a PythonUDF which accessing both side of join into the join condition, then the query will fail to plan. This PR fix this issue by checking the expression is evaluable or not before pushing it into Join. ## How was this patch tested? Add a regression test. Author: Davies Liu Closes #16581 from davies/pyudf_join. --- python/pyspark/sql/tests.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'python') diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 73a5df65e0..4bfe6e9eb3 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -342,6 +342,15 @@ class SQLTests(ReusedPySparkTestCase): df = df.withColumn('b', udf(lambda x: 'x')(df.a)) self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')]) + def test_udf_in_filter_on_top_of_join(self): + # regression test for SPARK-18589 + from pyspark.sql.functions import udf + left = self.spark.createDataFrame([Row(a=1)]) + right = self.spark.createDataFrame([Row(b=1)]) + f = udf(lambda a, b: a == b, BooleanType()) + df = left.crossJoin(right).filter(f("a", "b")) + self.assertEqual(df.collect(), [Row(a=1, b=1)]) + def test_udf_without_arguments(self): self.spark.catalog.registerFunction("foo", lambda: "bar") [row] = self.spark.sql("SELECT foo()").collect() -- cgit v1.2.3