aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/context.py
diff options
context:
space:
mode:
authorJeff Zhang <zjffdu@apache.org>2016-10-14 15:50:35 -0700
committerMichael Armbrust <michael@databricks.com>2016-10-14 15:50:35 -0700
commitf00df40cfefef0f3fc73f16ada1006e4dcfa5a39 (patch)
tree17197231fa9c82e3158b039d950c021383e94885 /python/pyspark/sql/context.py
parent5aeb7384c7aa5f487f031f9ae07d3f1653399d14 (diff)
downloadspark-f00df40cfefef0f3fc73f16ada1006e4dcfa5a39.tar.gz
spark-f00df40cfefef0f3fc73f16ada1006e4dcfa5a39.tar.bz2
spark-f00df40cfefef0f3fc73f16ada1006e4dcfa5a39.zip
[SPARK-11775][PYSPARK][SQL] Allow PySpark to register Java UDF
Currently pyspark can only call the builtin java UDF, but can not call custom java UDF. It would be better to allow that. 2 benefits: * Leverage the power of rich third party java library * Improve the performance. Because if we use python UDF, python daemons will be started on worker which will affect the performance. Author: Jeff Zhang <zjffdu@apache.org> Closes #9766 from zjffdu/SPARK-11775.
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r--python/pyspark/sql/context.py28
1 files changed, 27 insertions, 1 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 8264dcf8a9..de4c335ad2 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -28,7 +28,7 @@ from pyspark.sql.session import _monkey_patch_RDD, SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.readwriter import DataFrameReader
from pyspark.sql.streaming import DataStreamReader
-from pyspark.sql.types import Row, StringType
+from pyspark.sql.types import IntegerType, Row, StringType
from pyspark.sql.utils import install_exception_handler
__all__ = ["SQLContext", "HiveContext", "UDFRegistration"]
@@ -202,6 +202,32 @@ class SQLContext(object):
"""
self.sparkSession.catalog.registerFunction(name, f, returnType)
+ @ignore_unicode_prefix
+ @since(2.1)
+ def registerJavaFunction(self, name, javaClassName, returnType=None):
+ """Register a java UDF so it can be used in SQL statements.
+
+ In addition to a name and the function itself, the return type can be optionally specified.
+ When the return type is not specified we would infer it via reflection.
+ :param name: name of the UDF
+ :param javaClassName: fully qualified name of java class
+ :param returnType: a :class:`pyspark.sql.types.DataType` object
+
+ >>> sqlContext.registerJavaFunction("javaStringLength",
+ ... "test.org.apache.spark.sql.JavaStringLength", IntegerType())
+ >>> sqlContext.sql("SELECT javaStringLength('test')").collect()
+ [Row(UDF(test)=4)]
+ >>> sqlContext.registerJavaFunction("javaStringLength2",
+ ... "test.org.apache.spark.sql.JavaStringLength")
+ >>> sqlContext.sql("SELECT javaStringLength2('test')").collect()
+ [Row(UDF(test)=4)]
+
+ """
+ jdt = None
+ if returnType is not None:
+ jdt = self.sparkSession._jsparkSession.parseDataType(returnType.json())
+ self.sparkSession._jsparkSession.udf().registerJava(name, javaClassName, jdt)
+
# TODO(andrew): delete this once we refactor things to take in SparkSession
def _inferSchema(self, rdd, samplingRatio=None):
"""