aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/ml/feature.py
diff options
context:
space:
mode:
authorNick Pentreath <nickp@za.ibm.com>2017-03-24 08:01:15 -0700
committerNick Pentreath <nickp@za.ibm.com>2017-03-24 08:01:15 -0700
commitd9f4ce6943c16a7e29f98e57c33acbfc0379b54d (patch)
tree438d4e983ae420ba382b905331d3a42845bc8829 /python/pyspark/ml/feature.py
parent344f38b04b271b5f3ec2748b34db4e52d54da1bc (diff)
downloadspark-d9f4ce6943c16a7e29f98e57c33acbfc0379b54d.tar.gz
spark-d9f4ce6943c16a7e29f98e57c33acbfc0379b54d.tar.bz2
spark-d9f4ce6943c16a7e29f98e57c33acbfc0379b54d.zip
[SPARK-15040][ML][PYSPARK] Add Imputer to PySpark
Add Python wrapper for `Imputer` feature transformer. ## How was this patch tested? New doc tests and tweak to PySpark ML `tests.py` Author: Nick Pentreath <nickp@za.ibm.com> Closes #17316 from MLnick/SPARK-15040-pyspark-imputer.
Diffstat (limited to 'python/pyspark/ml/feature.py')
-rwxr-xr-xpython/pyspark/ml/feature.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 92f8549e9c..8d25f5b3a7 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -36,6 +36,7 @@ __all__ = ['Binarizer',
'ElementwiseProduct',
'HashingTF',
'IDF', 'IDFModel',
+ 'Imputer', 'ImputerModel',
'IndexToString',
'MaxAbsScaler', 'MaxAbsScalerModel',
'MinHashLSH', 'MinHashLSHModel',
@@ -871,6 +872,165 @@ class IDFModel(JavaModel, JavaMLReadable, JavaMLWritable):
@inherit_doc
+class Imputer(JavaEstimator, HasInputCols, JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Imputation estimator for completing missing values, either using the mean or the median
+ of the columns in which the missing values are located. The input columns should be of
+ DoubleType or FloatType. Currently Imputer does not support categorical features and
+ possibly creates incorrect values for a categorical feature.
+
+ Note that the mean/median value is computed after filtering out missing values.
+ All Null values in the input columns are treated as missing, and so are also imputed. For
+ computing median, :py:meth:`pyspark.sql.DataFrame.approxQuantile` is used with a
+ relative error of `0.001`.
+
+ >>> df = spark.createDataFrame([(1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0),
+ ... (4.0, 4.0), (5.0, 5.0)], ["a", "b"])
+ >>> imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
+ >>> model = imputer.fit(df)
+ >>> model.surrogateDF.show()
+ +---+---+
+ | a| b|
+ +---+---+
+ |3.0|4.0|
+ +---+---+
+ ...
+ >>> model.transform(df).show()
+ +---+---+-----+-----+
+ | a| b|out_a|out_b|
+ +---+---+-----+-----+
+ |1.0|NaN| 1.0| 4.0|
+ |2.0|NaN| 2.0| 4.0|
+ |NaN|3.0| 3.0| 3.0|
+ ...
+ >>> imputer.setStrategy("median").setMissingValue(1.0).fit(df).transform(df).show()
+ +---+---+-----+-----+
+ | a| b|out_a|out_b|
+ +---+---+-----+-----+
+ |1.0|NaN| 4.0| NaN|
+ ...
+ >>> imputerPath = temp_path + "/imputer"
+ >>> imputer.save(imputerPath)
+ >>> loadedImputer = Imputer.load(imputerPath)
+ >>> loadedImputer.getStrategy() == imputer.getStrategy()
+ True
+ >>> loadedImputer.getMissingValue()
+ 1.0
+ >>> modelPath = temp_path + "/imputer-model"
+ >>> model.save(modelPath)
+ >>> loadedModel = ImputerModel.load(modelPath)
+ >>> loadedModel.transform(df).head().out_a == model.transform(df).head().out_a
+ True
+
+ .. versionadded:: 2.2.0
+ """
+
+ outputCols = Param(Params._dummy(), "outputCols",
+ "output column names.", typeConverter=TypeConverters.toListString)
+
+ strategy = Param(Params._dummy(), "strategy",
+ "strategy for imputation. If mean, then replace missing values using the mean "
+ "value of the feature. If median, then replace missing values using the "
+ "median value of the feature.",
+ typeConverter=TypeConverters.toString)
+
+ missingValue = Param(Params._dummy(), "missingValue",
+ "The placeholder for the missing values. All occurrences of missingValue "
+ "will be imputed.", typeConverter=TypeConverters.toFloat)
+
+ @keyword_only
+ def __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None,
+ outputCols=None):
+ """
+ __init__(self, strategy="mean", missingValue=float("nan"), inputCols=None, \
+ outputCols=None):
+ """
+ super(Imputer, self).__init__()
+ self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.Imputer", self.uid)
+ self._setDefault(strategy="mean", missingValue=float("nan"))
+ kwargs = self._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.2.0")
+ def setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None,
+ outputCols=None):
+ """
+ setParams(self, strategy="mean", missingValue=float("nan"), inputCols=None, \
+ outputCols=None)
+ Sets params for this Imputer.
+ """
+ kwargs = self._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.2.0")
+ def setOutputCols(self, value):
+ """
+ Sets the value of :py:attr:`outputCols`.
+ """
+ return self._set(outputCols=value)
+
+ @since("2.2.0")
+ def getOutputCols(self):
+ """
+ Gets the value of :py:attr:`outputCols` or its default value.
+ """
+ return self.getOrDefault(self.outputCols)
+
+ @since("2.2.0")
+ def setStrategy(self, value):
+ """
+ Sets the value of :py:attr:`strategy`.
+ """
+ return self._set(strategy=value)
+
+ @since("2.2.0")
+ def getStrategy(self):
+ """
+ Gets the value of :py:attr:`strategy` or its default value.
+ """
+ return self.getOrDefault(self.strategy)
+
+ @since("2.2.0")
+ def setMissingValue(self, value):
+ """
+ Sets the value of :py:attr:`missingValue`.
+ """
+ return self._set(missingValue=value)
+
+ @since("2.2.0")
+ def getMissingValue(self):
+ """
+ Gets the value of :py:attr:`missingValue` or its default value.
+ """
+ return self.getOrDefault(self.missingValue)
+
+ def _create_model(self, java_model):
+ return ImputerModel(java_model)
+
+
+class ImputerModel(JavaModel, JavaMLReadable, JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Model fitted by :py:class:`Imputer`.
+
+ .. versionadded:: 2.2.0
+ """
+
+ @property
+ @since("2.2.0")
+ def surrogateDF(self):
+ """
+ Returns a DataFrame containing inputCols and their corresponding surrogates,
+ which are used to replace the missing values in the input DataFrame.
+ """
+ return self._call_java("surrogateDF")
+
+
+@inherit_doc
class MaxAbsScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):
"""
Rescale each feature individually to range [-1, 1] by dividing through the largest maximum