From 511d4929c87ebf364b96bd46890628f736eaa026 Mon Sep 17 00:00:00 2001 From: JeremyNixon Date: Thu, 3 Mar 2016 09:50:05 -0800 Subject: [SPARK-12877][ML] Add train-validation-split to pyspark ## What changes were proposed in this pull request? The changes proposed were to add train-validation-split to pyspark.ml.tuning. ## How was the this patch tested? This patch was tested through unit tests located in pyspark/ml/test.py. This is my original work and I license it to Spark. Author: JeremyNixon Closes #11335 from JeremyNixon/tvs_pyspark. --- python/pyspark/ml/tests.py | 53 +++++++++++- python/pyspark/ml/tuning.py | 193 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 244 insertions(+), 2 deletions(-) (limited to 'python/pyspark') diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 5fcfa9e61f..8182fcfb4e 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -45,7 +45,7 @@ from pyspark.ml.feature import * from pyspark.ml.param import Param, Params from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed from pyspark.ml.regression import LinearRegression -from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel +from pyspark.ml.tuning import * from pyspark.ml.util import keyword_only from pyspark.mllib.linalg import DenseVector from pyspark.sql import DataFrame, SQLContext, Row @@ -423,6 +423,57 @@ class CrossValidatorTests(PySparkTestCase): self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") +class TrainValidationSplitTests(PySparkTestCase): + + def test_fit_minimize_metric(self): + sqlContext = SQLContext(self.sc) + dataset = sqlContext.createDataFrame([ + (10, 10.0), + (50, 50.0), + (100, 100.0), + (500, 500.0)] * 10, + ["feature", "label"]) + + iee = InducedErrorEstimator() + evaluator = RegressionEvaluator(metricName="rmse") + + grid = (ParamGridBuilder() + .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) + .build()) + tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) + tvsModel = tvs.fit(dataset) + bestModel = tvsModel.bestModel + bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) + + self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), + "Best model should have zero induced error") + self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") + + def test_fit_maximize_metric(self): + sqlContext = SQLContext(self.sc) + dataset = sqlContext.createDataFrame([ + (10, 10.0), + (50, 50.0), + (100, 100.0), + (500, 500.0)] * 10, + ["feature", "label"]) + + iee = InducedErrorEstimator() + evaluator = RegressionEvaluator(metricName="r2") + + grid = (ParamGridBuilder() + .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) + .build()) + tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) + tvsModel = tvs.fit(dataset) + bestModel = tvsModel.bestModel + bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) + + self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), + "Best model should have zero induced error") + self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") + + class PersistenceTest(PySparkTestCase): def test_linear_regression(self): diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 0cbe97f1d8..77af0094df 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -25,7 +25,8 @@ from pyspark.ml.param.shared import HasSeed from pyspark.ml.util import keyword_only from pyspark.sql.functions import rand -__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel'] +__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit', + 'TrainValidationSplitModel'] class ParamGridBuilder(object): @@ -288,6 +289,196 @@ class CrossValidatorModel(Model): return CrossValidatorModel(self.bestModel.copy(extra)) +class TrainValidationSplit(Estimator, HasSeed): + """ + Train-Validation-Split. + + >>> from pyspark.ml.classification import LogisticRegression + >>> from pyspark.ml.evaluation import BinaryClassificationEvaluator + >>> from pyspark.mllib.linalg import Vectors + >>> dataset = sqlContext.createDataFrame( + ... [(Vectors.dense([0.0]), 0.0), + ... (Vectors.dense([0.4]), 1.0), + ... (Vectors.dense([0.5]), 0.0), + ... (Vectors.dense([0.6]), 1.0), + ... (Vectors.dense([1.0]), 1.0)] * 10, + ... ["features", "label"]) + >>> lr = LogisticRegression() + >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + >>> evaluator = BinaryClassificationEvaluator() + >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + >>> tvsModel = tvs.fit(dataset) + >>> evaluator.evaluate(tvsModel.transform(dataset)) + 0.8333... + + .. versionadded:: 2.0.0 + """ + + estimator = Param(Params._dummy(), "estimator", "estimator to be tested") + estimatorParamMaps = Param(Params._dummy(), "estimatorParamMaps", "estimator param maps") + evaluator = Param( + Params._dummy(), "evaluator", + "evaluator used to select hyper-parameters that maximize the validated metric") + trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\ + validation data. Must be between 0 and 1.") + + @keyword_only + def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, + seed=None): + """ + __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ + seed=None) + """ + super(TrainValidationSplit, self).__init__() + self._setDefault(trainRatio=0.75) + kwargs = self.__init__._input_kwargs + self._set(**kwargs) + + @since("2.0.0") + @keyword_only + def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, + seed=None): + """ + setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ + seed=None): + Sets params for the train validation split. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.0.0") + def setEstimator(self, value): + """ + Sets the value of :py:attr:`estimator`. + """ + self._paramMap[self.estimator] = value + return self + + @since("2.0.0") + def getEstimator(self): + """ + Gets the value of estimator or its default value. + """ + return self.getOrDefault(self.estimator) + + @since("2.0.0") + def setEstimatorParamMaps(self, value): + """ + Sets the value of :py:attr:`estimatorParamMaps`. + """ + self._paramMap[self.estimatorParamMaps] = value + return self + + @since("2.0.0") + def getEstimatorParamMaps(self): + """ + Gets the value of estimatorParamMaps or its default value. + """ + return self.getOrDefault(self.estimatorParamMaps) + + @since("2.0.0") + def setEvaluator(self, value): + """ + Sets the value of :py:attr:`evaluator`. + """ + self._paramMap[self.evaluator] = value + return self + + @since("2.0.0") + def getEvaluator(self): + """ + Gets the value of evaluator or its default value. + """ + return self.getOrDefault(self.evaluator) + + @since("2.0.0") + def setTrainRatio(self, value): + """ + Sets the value of :py:attr:`trainRatio`. + """ + self._paramMap[self.trainRatio] = value + return self + + @since("2.0.0") + def getTrainRatio(self): + """ + Gets the value of trainRatio or its default value. + """ + return self.getOrDefault(self.trainRatio) + + def _fit(self, dataset): + est = self.getOrDefault(self.estimator) + epm = self.getOrDefault(self.estimatorParamMaps) + numModels = len(epm) + eva = self.getOrDefault(self.evaluator) + tRatio = self.getOrDefault(self.trainRatio) + seed = self.getOrDefault(self.seed) + randCol = self.uid + "_rand" + df = dataset.select("*", rand(seed).alias(randCol)) + metrics = np.zeros(numModels) + condition = (df[randCol] >= tRatio) + validation = df.filter(condition) + train = df.filter(~condition) + for j in range(numModels): + model = est.fit(train, epm[j]) + metric = eva.evaluate(model.transform(validation, epm[j])) + metrics[j] += metric + if eva.isLargerBetter(): + bestIndex = np.argmax(metrics) + else: + bestIndex = np.argmin(metrics) + bestModel = est.fit(dataset, epm[bestIndex]) + return TrainValidationSplitModel(bestModel) + + @since("2.0.0") + def copy(self, extra=None): + """ + Creates a copy of this instance with a randomly generated uid + and some extra params. This copies creates a deep copy of + the embedded paramMap, and copies the embedded and extra parameters over. + + :param extra: Extra parameters to copy to the new instance + :return: Copy of this instance + """ + if extra is None: + extra = dict() + newTVS = Params.copy(self, extra) + if self.isSet(self.estimator): + newTVS.setEstimator(self.getEstimator().copy(extra)) + # estimatorParamMaps remain the same + if self.isSet(self.evaluator): + newTVS.setEvaluator(self.getEvaluator().copy(extra)) + return newTVS + + +class TrainValidationSplitModel(Model): + """ + Model from train validation split. + """ + + def __init__(self, bestModel): + super(TrainValidationSplitModel, self).__init__() + #: best model from cross validation + self.bestModel = bestModel + + def _transform(self, dataset): + return self.bestModel.transform(dataset) + + @since("2.0.0") + def copy(self, extra=None): + """ + Creates a copy of this instance with a randomly generated uid + and some extra params. This copies the underlying bestModel, + creates a deep copy of the embedded paramMap, and + copies the embedded and extra parameters over. + + :param extra: Extra parameters to copy to the new instance + :return: Copy of this instance + """ + if extra is None: + extra = dict() + return TrainValidationSplitModel(self.bestModel.copy(extra)) + if __name__ == "__main__": import doctest from pyspark.context import SparkContext -- cgit v1.2.3