aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/context.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-07-20 12:14:47 -0700
committerDavies Liu <davies.liu@gmail.com>2015-07-20 12:14:47 -0700
commit9f913c4fd6f0f223fd378e453d5b9a87beda1ac4 (patch)
treeb0e89b1b7f4c0617c7bab6b314b161fc5240c95b /python/pyspark/sql/context.py
parent02181fb6d14833448fb5c501045655213d3cf340 (diff)
downloadspark-9f913c4fd6f0f223fd378e453d5b9a87beda1ac4.tar.gz
spark-9f913c4fd6f0f223fd378e453d5b9a87beda1ac4.tar.bz2
spark-9f913c4fd6f0f223fd378e453d5b9a87beda1ac4.zip
[SPARK-9114] [SQL] [PySpark] convert returned object from UDF into internal type
This PR also remove the duplicated code between registerFunction and UserDefinedFunction. cc JoshRosen Author: Davies Liu <davies@databricks.com> Closes #7450 from davies/fix_return_type and squashes the following commits: e80bf9f [Davies Liu] remove debugging code f94b1f6 [Davies Liu] fix mima 8f9c58b [Davies Liu] convert returned object from UDF into internal type
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r--python/pyspark/sql/context.py16
1 files changed, 3 insertions, 13 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index c93a15bada..abb6522dde 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -34,6 +34,7 @@ from pyspark.sql.types import Row, StringType, StructType, _verify_type, \
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.readwriter import DataFrameReader
from pyspark.sql.utils import install_exception_handler
+from pyspark.sql.functions import UserDefinedFunction
try:
import pandas
@@ -191,19 +192,8 @@ class SQLContext(object):
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(_c0=4)]
"""
- func = lambda _, it: map(lambda x: f(*x), it)
- ser = AutoBatchedSerializer(PickleSerializer())
- command = (func, None, ser, ser)
- pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self._sc, command, self)
- self._ssql_ctx.udf().registerPython(name,
- bytearray(pickled_cmd),
- env,
- includes,
- self._sc.pythonExec,
- self._sc.pythonVer,
- bvars,
- self._sc._javaAccumulator,
- returnType.json())
+ udf = UserDefinedFunction(f, returnType, name)
+ self._ssql_ctx.udf().registerPython(name, udf._judf)
def _inferSchemaFromList(self, data):
"""