diff options
author | Davies Liu <davies@databricks.com> | 2017-01-20 16:11:40 -0800 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-01-20 16:11:40 -0800 |
commit | 9b7a03f15ac45e5f7dcf118d1e7ce1556339aa46 (patch) | |
tree | 67c03bb4a69f9631e845156ca6eaef25746bb02d /sql/core | |
parent | 552e5f08841828e55f5924f1686825626da8bcd0 (diff) | |
download | spark-9b7a03f15ac45e5f7dcf118d1e7ce1556339aa46.tar.gz spark-9b7a03f15ac45e5f7dcf118d1e7ce1556339aa46.tar.bz2 spark-9b7a03f15ac45e5f7dcf118d1e7ce1556339aa46.zip |
[SPARK-18589][SQL] Fix Python UDF accessing attributes from both side of join
## What changes were proposed in this pull request?
PythonUDF is unevaluable, which can not be used inside a join condition, currently the optimizer will push a PythonUDF which accessing both side of join into the join condition, then the query will fail to plan.
This PR fix this issue by checking the expression is evaluable or not before pushing it into Join.
## How was this patch tested?
Add a regression test.
Author: Davies Liu <davies@databricks.com>
Closes #16581 from davies/pyudf_join.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala | 14 |
1 files changed, 6 insertions, 8 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 81bea2fef8..2a3d1cf0b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.api.python.PythonFunction -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression, GreaterThan, In} import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.BooleanType @@ -86,13 +86,11 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { test("Python UDF refers to the attributes from more than one child") { val df = Seq(("Hello", 4)).toDF("a", "b") val df2 = Seq(("Hello", 4)).toDF("c", "d") - val joinDF = df.join(df2).where("dummyPythonUDF(a, c) == dummyPythonUDF(d, c)") - - val e = intercept[RuntimeException] { - joinDF.queryExecution.executedPlan - }.getMessage - assert(Seq("Invalid PythonUDF dummyUDF", "requires attributes from more than one child") - .forall(e.contains)) + val joinDF = df.crossJoin(df2).where("dummyPythonUDF(a, c) == dummyPythonUDF(d, c)") + val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect { + case b: BatchEvalPythonExec => b + } + assert(qualifiedPlanNodes.size == 1) } } |