aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/ml
diff options
context:
space:
mode:
authorJeremyNixon <jnixon2@gmail.com>2016-03-03 09:50:05 -0800
committerXiangrui Meng <meng@databricks.com>2016-03-03 09:50:05 -0800
commit511d4929c87ebf364b96bd46890628f736eaa026 (patch)
tree64ef0fed4ed117f1135439be4cd2109de01c5245 /python/pyspark/ml
parent9a48c656eec8e47a55e5e0ecc6a7901c5be31566 (diff)
downloadspark-511d4929c87ebf364b96bd46890628f736eaa026.tar.gz
spark-511d4929c87ebf364b96bd46890628f736eaa026.tar.bz2
spark-511d4929c87ebf364b96bd46890628f736eaa026.zip
[SPARK-12877][ML] Add train-validation-split to pyspark
## What changes were proposed in this pull request? The changes proposed were to add train-validation-split to pyspark.ml.tuning. ## How was the this patch tested? This patch was tested through unit tests located in pyspark/ml/test.py. This is my original work and I license it to Spark. Author: JeremyNixon <jnixon2@gmail.com> Closes #11335 from JeremyNixon/tvs_pyspark.
Diffstat (limited to 'python/pyspark/ml')
-rw-r--r--python/pyspark/ml/tests.py53
-rw-r--r--python/pyspark/ml/tuning.py193
2 files changed, 244 insertions, 2 deletions
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 5fcfa9e61f..8182fcfb4e 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -45,7 +45,7 @@ from pyspark.ml.feature import *
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.regression import LinearRegression
-from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
+from pyspark.ml.tuning import *
from pyspark.ml.util import keyword_only
from pyspark.mllib.linalg import DenseVector
from pyspark.sql import DataFrame, SQLContext, Row
@@ -423,6 +423,57 @@ class CrossValidatorTests(PySparkTestCase):
self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
+class TrainValidationSplitTests(PySparkTestCase):
+
+ def test_fit_minimize_metric(self):
+ sqlContext = SQLContext(self.sc)
+ dataset = sqlContext.createDataFrame([
+ (10, 10.0),
+ (50, 50.0),
+ (100, 100.0),
+ (500, 500.0)] * 10,
+ ["feature", "label"])
+
+ iee = InducedErrorEstimator()
+ evaluator = RegressionEvaluator(metricName="rmse")
+
+ grid = (ParamGridBuilder()
+ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+ .build())
+ tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+ tvsModel = tvs.fit(dataset)
+ bestModel = tvsModel.bestModel
+ bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+ self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+ "Best model should have zero induced error")
+ self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
+
+ def test_fit_maximize_metric(self):
+ sqlContext = SQLContext(self.sc)
+ dataset = sqlContext.createDataFrame([
+ (10, 10.0),
+ (50, 50.0),
+ (100, 100.0),
+ (500, 500.0)] * 10,
+ ["feature", "label"])
+
+ iee = InducedErrorEstimator()
+ evaluator = RegressionEvaluator(metricName="r2")
+
+ grid = (ParamGridBuilder()
+ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+ .build())
+ tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+ tvsModel = tvs.fit(dataset)
+ bestModel = tvsModel.bestModel
+ bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+ self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+ "Best model should have zero induced error")
+ self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
+
+
class PersistenceTest(PySparkTestCase):
def test_linear_regression(self):
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index 0cbe97f1d8..77af0094df 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -25,7 +25,8 @@ from pyspark.ml.param.shared import HasSeed
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import rand
-__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel']
+__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit',
+ 'TrainValidationSplitModel']
class ParamGridBuilder(object):
@@ -288,6 +289,196 @@ class CrossValidatorModel(Model):
return CrossValidatorModel(self.bestModel.copy(extra))
+class TrainValidationSplit(Estimator, HasSeed):
+ """
+ Train-Validation-Split.
+
+ >>> from pyspark.ml.classification import LogisticRegression
+ >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
+ >>> from pyspark.mllib.linalg import Vectors
+ >>> dataset = sqlContext.createDataFrame(
+ ... [(Vectors.dense([0.0]), 0.0),
+ ... (Vectors.dense([0.4]), 1.0),
+ ... (Vectors.dense([0.5]), 0.0),
+ ... (Vectors.dense([0.6]), 1.0),
+ ... (Vectors.dense([1.0]), 1.0)] * 10,
+ ... ["features", "label"])
+ >>> lr = LogisticRegression()
+ >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
+ >>> evaluator = BinaryClassificationEvaluator()
+ >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
+ >>> tvsModel = tvs.fit(dataset)
+ >>> evaluator.evaluate(tvsModel.transform(dataset))
+ 0.8333...
+
+ .. versionadded:: 2.0.0
+ """
+
+ estimator = Param(Params._dummy(), "estimator", "estimator to be tested")
+ estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps")
+ evaluator = Param(
+ Params._dummy(), "evaluator",
+ "evaluator used to select hyper-parameters that maximize the validated metric")
+ trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\
+ validation data. Must be between 0 and 1.")
+
+ @keyword_only
+ def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
+ seed=None):
+ """
+ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
+ seed=None)
+ """
+ super(TrainValidationSplit, self).__init__()
+ self._setDefault(trainRatio=0.75)
+ kwargs = self.__init__._input_kwargs
+ self._set(**kwargs)
+
+ @since("2.0.0")
+ @keyword_only
+ def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
+ seed=None):
+ """
+ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
+ seed=None):
+ Sets params for the train validation split.
+ """
+ kwargs = self.setParams._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.0.0")
+ def setEstimator(self, value):
+ """
+ Sets the value of :py:attr:`estimator`.
+ """
+ self._paramMap[self.estimator] = value
+ return self
+
+ @since("2.0.0")
+ def getEstimator(self):
+ """
+ Gets the value of estimator or its default value.
+ """
+ return self.getOrDefault(self.estimator)
+
+ @since("2.0.0")
+ def setEstimatorParamMaps(self, value):
+ """
+ Sets the value of :py:attr:`estimatorParamMaps`.
+ """
+ self._paramMap[self.estimatorParamMaps] = value
+ return self
+
+ @since("2.0.0")
+ def getEstimatorParamMaps(self):
+ """
+ Gets the value of estimatorParamMaps or its default value.
+ """
+ return self.getOrDefault(self.estimatorParamMaps)
+
+ @since("2.0.0")
+ def setEvaluator(self, value):
+ """
+ Sets the value of :py:attr:`evaluator`.
+ """
+ self._paramMap[self.evaluator] = value
+ return self
+
+ @since("2.0.0")
+ def getEvaluator(self):
+ """
+ Gets the value of evaluator or its default value.
+ """
+ return self.getOrDefault(self.evaluator)
+
+ @since("2.0.0")
+ def setTrainRatio(self, value):
+ """
+ Sets the value of :py:attr:`trainRatio`.
+ """
+ self._paramMap[self.trainRatio] = value
+ return self
+
+ @since("2.0.0")
+ def getTrainRatio(self):
+ """
+ Gets the value of trainRatio or its default value.
+ """
+ return self.getOrDefault(self.trainRatio)
+
+ def _fit(self, dataset):
+ est = self.getOrDefault(self.estimator)
+ epm = self.getOrDefault(self.estimatorParamMaps)
+ numModels = len(epm)
+ eva = self.getOrDefault(self.evaluator)
+ tRatio = self.getOrDefault(self.trainRatio)
+ seed = self.getOrDefault(self.seed)
+ randCol = self.uid + "_rand"
+ df = dataset.select("*", rand(seed).alias(randCol))
+ metrics = np.zeros(numModels)
+ condition = (df[randCol] >= tRatio)
+ validation = df.filter(condition)
+ train = df.filter(~condition)
+ for j in range(numModels):
+ model = est.fit(train, epm[j])
+ metric = eva.evaluate(model.transform(validation, epm[j]))
+ metrics[j] += metric
+ if eva.isLargerBetter():
+ bestIndex = np.argmax(metrics)
+ else:
+ bestIndex = np.argmin(metrics)
+ bestModel = est.fit(dataset, epm[bestIndex])
+ return TrainValidationSplitModel(bestModel)
+
+ @since("2.0.0")
+ def copy(self, extra=None):
+ """
+ Creates a copy of this instance with a randomly generated uid
+ and some extra params. This copies creates a deep copy of
+ the embedded paramMap, and copies the embedded and extra parameters over.
+
+ :param extra: Extra parameters to copy to the new instance
+ :return: Copy of this instance
+ """
+ if extra is None:
+ extra = dict()
+ newTVS = Params.copy(self, extra)
+ if self.isSet(self.estimator):
+ newTVS.setEstimator(self.getEstimator().copy(extra))
+ # estimatorParamMaps remain the same
+ if self.isSet(self.evaluator):
+ newTVS.setEvaluator(self.getEvaluator().copy(extra))
+ return newTVS
+
+
+class TrainValidationSplitModel(Model):
+ """
+ Model from train validation split.
+ """
+
+ def __init__(self, bestModel):
+ super(TrainValidationSplitModel, self).__init__()
+ #: best model from cross validation
+ self.bestModel = bestModel
+
+ def _transform(self, dataset):
+ return self.bestModel.transform(dataset)
+
+ @since("2.0.0")
+ def copy(self, extra=None):
+ """
+ Creates a copy of this instance with a randomly generated uid
+ and some extra params. This copies the underlying bestModel,
+ creates a deep copy of the embedded paramMap, and
+ copies the embedded and extra parameters over.
+
+ :param extra: Extra parameters to copy to the new instance
+ :return: Copy of this instance
+ """
+ if extra is None:
+ extra = dict()
+ return TrainValidationSplitModel(self.bestModel.copy(extra))
+
if __name__ == "__main__":
import doctest
from pyspark.context import SparkContext