diff options
author | Jeff Zhang <zjffdu@apache.org> | 2016-10-14 15:50:35 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-10-14 15:50:35 -0700 |
commit | f00df40cfefef0f3fc73f16ada1006e4dcfa5a39 (patch) | |
tree | 17197231fa9c82e3158b039d950c021383e94885 /python/pyspark/sql/context.py | |
parent | 5aeb7384c7aa5f487f031f9ae07d3f1653399d14 (diff) | |
download | spark-f00df40cfefef0f3fc73f16ada1006e4dcfa5a39.tar.gz spark-f00df40cfefef0f3fc73f16ada1006e4dcfa5a39.tar.bz2 spark-f00df40cfefef0f3fc73f16ada1006e4dcfa5a39.zip |
[SPARK-11775][PYSPARK][SQL] Allow PySpark to register Java UDF
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits:
* Leverage the power of rich third party java library
* Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance.
Author: Jeff Zhang <zjffdu@apache.org>
Closes #9766 from zjffdu/SPARK-11775.
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r-- | python/pyspark/sql/context.py | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 8264dcf8a9..de4c335ad2 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -28,7 +28,7 @@ from pyspark.sql.session import _monkey_patch_RDD, SparkSession from pyspark.sql.dataframe import DataFrame from pyspark.sql.readwriter import DataFrameReader from pyspark.sql.streaming import DataStreamReader -from pyspark.sql.types import Row, StringType +from pyspark.sql.types import IntegerType, Row, StringType from pyspark.sql.utils import install_exception_handler __all__ = ["SQLContext", "HiveContext", "UDFRegistration"] @@ -202,6 +202,32 @@ class SQLContext(object): """ self.sparkSession.catalog.registerFunction(name, f, returnType) + @ignore_unicode_prefix + @since(2.1) + def registerJavaFunction(self, name, javaClassName, returnType=None): + """Register a java UDF so it can be used in SQL statements. + + In addition to a name and the function itself, the return type can be optionally specified. + When the return type is not specified we would infer it via reflection. + :param name: name of the UDF + :param javaClassName: fully qualified name of java class + :param returnType: a :class:`pyspark.sql.types.DataType` object + + >>> sqlContext.registerJavaFunction("javaStringLength", + ... "test.org.apache.spark.sql.JavaStringLength", IntegerType()) + >>> sqlContext.sql("SELECT javaStringLength('test')").collect() + [Row(UDF(test)=4)] + >>> sqlContext.registerJavaFunction("javaStringLength2", + ... "test.org.apache.spark.sql.JavaStringLength") + >>> sqlContext.sql("SELECT javaStringLength2('test')").collect() + [Row(UDF(test)=4)] + + """ + jdt = None + if returnType is not None: + jdt = self.sparkSession._jsparkSession.parseDataType(returnType.json()) + self.sparkSession._jsparkSession.udf().registerJava(name, javaClassName, jdt) + # TODO(andrew): delete this once we refactor things to take in SparkSession def _inferSchema(self, rdd, samplingRatio=None): """ |