diff options
author | zero323 <zero323@users.noreply.github.com> | 2017-01-31 18:03:39 -0800 |
---|---|---|
committer | Holden Karau <holden@us.ibm.com> | 2017-01-31 18:03:39 -0800 |
commit | 9063835803e54538c94d95bbddcb4810dd7a1c55 (patch) | |
tree | d22369466eca165a1be27c44de96bef1ed9b8b3f /python/pyspark/sql/functions.py | |
parent | 081b7addaf9560563af0ce25912972e91a78cee6 (diff) | |
download | spark-9063835803e54538c94d95bbddcb4810dd7a1c55.tar.gz spark-9063835803e54538c94d95bbddcb4810dd7a1c55.tar.bz2 spark-9063835803e54538c94d95bbddcb4810dd7a1c55.zip |
[SPARK-19163][PYTHON][SQL] Delay _judf initialization to the __call__
## What changes were proposed in this pull request?
Defer `UserDefinedFunction._judf` initialization to the first call. This prevents unintended `SparkSession` initialization. This allows users to define and import UDF without creating a context / session as a side effect.
[SPARK-19163](https://issues.apache.org/jira/browse/SPARK-19163)
## How was this patch tested?
Unit tests.
Author: zero323 <zero323@users.noreply.github.com>
Closes #16536 from zero323/SPARK-19163.
Diffstat (limited to 'python/pyspark/sql/functions.py')
-rw-r--r-- | python/pyspark/sql/functions.py | 35 |
1 files changed, 24 insertions, 11 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 66d993a814..02c2350dc2 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1826,25 +1826,38 @@ class UserDefinedFunction(object): def __init__(self, func, returnType, name=None): self.func = func self.returnType = returnType - self._judf = self._create_judf(name) - - def _create_judf(self, name): + # Stores UserDefinedPythonFunctions jobj, once initialized + self._judf_placeholder = None + self._name = name or ( + func.__name__ if hasattr(func, '__name__') + else func.__class__.__name__) + + @property + def _judf(self): + # It is possible that concurrent access, to newly created UDF, + # will initialize multiple UserDefinedPythonFunctions. + # This is unlikely, doesn't affect correctness, + # and should have a minimal performance impact. + if self._judf_placeholder is None: + self._judf_placeholder = self._create_judf() + return self._judf_placeholder + + def _create_judf(self): from pyspark.sql import SparkSession - sc = SparkContext.getOrCreate() - wrapped_func = _wrap_function(sc, self.func, self.returnType) + spark = SparkSession.builder.getOrCreate() + sc = spark.sparkContext + + wrapped_func = _wrap_function(sc, self.func, self.returnType) jdt = spark._jsparkSession.parseDataType(self.returnType.json()) - if name is None: - f = self.func - name = f.__name__ if hasattr(f, '__name__') else f.__class__.__name__ judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction( - name, wrapped_func, jdt) + self._name, wrapped_func, jdt) return judf def __call__(self, *cols): + judf = self._judf sc = SparkContext._active_spark_context - jc = self._judf.apply(_to_seq(sc, cols, _to_java_column)) - return Column(jc) + return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) @since(1.3) |