diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-12-10 08:47:45 -0800 |
---|---|---|
committer | gatorsmile <gatorsmile@gmail.com> | 2016-12-10 08:47:45 -0800 |
commit | 422a45cf0490cc354fa9348a2381a337d52c4f58 (patch) | |
tree | eebe12ca6ad39df6e00acd3439a85cd8c340ae85 /python | |
parent | 3a3e65adaf3e4c7b92d1284e61ae89ffdf8ed5c3 (diff) | |
download | spark-422a45cf0490cc354fa9348a2381a337d52c4f58.tar.gz spark-422a45cf0490cc354fa9348a2381a337d52c4f58.tar.bz2 spark-422a45cf0490cc354fa9348a2381a337d52c4f58.zip |
[SPARK-18766][SQL] Push Down Filter Through BatchEvalPython (Python UDF)
### What changes were proposed in this pull request?
Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through `BatchEvalPython`.
```Python
>>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import BooleanType
>>> my_filter = udf(lambda a: a < 2, BooleanType())
>>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
>>> sel.explain(True)
```
Before the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]
== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
+- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
+- Scan ExistingRDD[key#0L,value#1]
```
After the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]
== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter pythonUDF0#9: boolean
+- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
+- *Filter (isnotnull(value#1) && (value#1 < 2))
+- Scan ExistingRDD[key#0L,value#1]
```
### How was this patch tested?
Added both unit test cases for `BatchEvalPythonExec` and also add an end-to-end test case in Python test suite.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #16193 from gatorsmile/pythonUDFPredicatePushDown.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql/tests.py | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 66320bd050..af7d52cdac 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -360,6 +360,15 @@ class SQLTests(ReusedPySparkTestCase): [res] = self.spark.sql("SELECT MYUDF('')").collect() self.assertEqual("", res[0]) + def test_udf_with_filter_function(self): + df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) + from pyspark.sql.functions import udf, col + from pyspark.sql.types import BooleanType + + my_filter = udf(lambda a: a < 2, BooleanType()) + sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2")) + self.assertEqual(sel.collect(), [Row(key=1, value='1')]) + def test_udf_with_aggregate_function(self): df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) from pyspark.sql.functions import udf, col, sum |