aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornoelsmith <mail@noelsmith.com>2015-08-27 23:59:30 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-08-27 23:59:40 -0700
commitbcb8fa849e7684685e0761153daf976ff79e726f (patch)
tree752b5ea59ea5cb81a2f874155785ca8a2ea8d678
parentc77cf867258caede0639445798e2cd3288e246a7 (diff)
downloadspark-bcb8fa849e7684685e0761153daf976ff79e726f.tar.gz
spark-bcb8fa849e7684685e0761153daf976ff79e726f.tar.bz2
spark-bcb8fa849e7684685e0761153daf976ff79e726f.zip
[SPARK-10188] [PYSPARK] Pyspark CrossValidator with RMSE selects incorrect model
* Added isLargerBetter() method to Pyspark Evaluator to match the Scala version. * JavaEvaluator delegates isLargerBetter() to underlying Scala object. * Added check for isLargerBetter() in CrossValidator to determine whether to use argmin or argmax. * Added test cases for where smaller is better (RMSE) and larger is better (R-Squared). (This contribution is my original work and that I license the work to the project under Sparks' open source license) Author: noelsmith <mail@noelsmith.com> Closes #8399 from noel-smith/pyspark-rmse-xval-fix. (cherry picked from commit 7583681e6b0824d7eed471dc4d8fa0b2addf9ffc) Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
-rw-r--r--python/pyspark/ml/evaluation.py12
-rw-r--r--python/pyspark/ml/tests.py87
-rw-r--r--python/pyspark/ml/tuning.py6
3 files changed, 104 insertions, 1 deletions
diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py
index 6b0a9ffde9..cb3b07947e 100644
--- a/python/pyspark/ml/evaluation.py
+++ b/python/pyspark/ml/evaluation.py
@@ -66,6 +66,14 @@ class Evaluator(Params):
else:
raise ValueError("Params must be a param map but got %s." % type(params))
+ def isLargerBetter(self):
+ """
+ Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
+ (True, default) or minimized (False).
+ A given evaluator may support multiple metrics which may be maximized or minimized.
+ """
+ return True
+
@inherit_doc
class JavaEvaluator(Evaluator, JavaWrapper):
@@ -85,6 +93,10 @@ class JavaEvaluator(Evaluator, JavaWrapper):
self._transfer_params_to_java()
return self._java_obj.evaluate(dataset._jdf)
+ def isLargerBetter(self):
+ self._transfer_params_to_java()
+ return self._java_obj.isLargerBetter()
+
@inherit_doc
class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol):
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index c151d21fd6..60e4237293 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -32,11 +32,14 @@ else:
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
from pyspark.sql import DataFrame, SQLContext
+from pyspark.sql.functions import rand
+from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.util import keyword_only
from pyspark.ml import Estimator, Model, Pipeline, Transformer
from pyspark.ml.feature import *
+from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.mllib.linalg import DenseVector
@@ -264,5 +267,89 @@ class FeatureTests(PySparkTestCase):
self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"])
+class HasInducedError(Params):
+
+ def __init__(self):
+ super(HasInducedError, self).__init__()
+ self.inducedError = Param(self, "inducedError",
+ "Uniformly-distributed error added to feature")
+
+ def getInducedError(self):
+ return self.getOrDefault(self.inducedError)
+
+
+class InducedErrorModel(Model, HasInducedError):
+
+ def __init__(self):
+ super(InducedErrorModel, self).__init__()
+
+ def _transform(self, dataset):
+ return dataset.withColumn("prediction",
+ dataset.feature + (rand(0) * self.getInducedError()))
+
+
+class InducedErrorEstimator(Estimator, HasInducedError):
+
+ def __init__(self, inducedError=1.0):
+ super(InducedErrorEstimator, self).__init__()
+ self._set(inducedError=inducedError)
+
+ def _fit(self, dataset):
+ model = InducedErrorModel()
+ self._copyValues(model)
+ return model
+
+
+class CrossValidatorTests(PySparkTestCase):
+
+ def test_fit_minimize_metric(self):
+ sqlContext = SQLContext(self.sc)
+ dataset = sqlContext.createDataFrame([
+ (10, 10.0),
+ (50, 50.0),
+ (100, 100.0),
+ (500, 500.0)] * 10,
+ ["feature", "label"])
+
+ iee = InducedErrorEstimator()
+ evaluator = RegressionEvaluator(metricName="rmse")
+
+ grid = (ParamGridBuilder()
+ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+ .build())
+ cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+ cvModel = cv.fit(dataset)
+ bestModel = cvModel.bestModel
+ bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+ self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+ "Best model should have zero induced error")
+ self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
+
+ def test_fit_maximize_metric(self):
+ sqlContext = SQLContext(self.sc)
+ dataset = sqlContext.createDataFrame([
+ (10, 10.0),
+ (50, 50.0),
+ (100, 100.0),
+ (500, 500.0)] * 10,
+ ["feature", "label"])
+
+ iee = InducedErrorEstimator()
+ evaluator = RegressionEvaluator(metricName="r2")
+
+ grid = (ParamGridBuilder()
+ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+ .build())
+ cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+ cvModel = cv.fit(dataset)
+ bestModel = cvModel.bestModel
+ bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+ self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+ "Best model should have zero induced error")
+ self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
+
+
if __name__ == "__main__":
unittest.main()
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index dcfee6a317..cae778869e 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -223,7 +223,11 @@ class CrossValidator(Estimator):
# TODO: duplicate evaluator to take extra params from input
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric
- bestIndex = np.argmax(metrics)
+
+ if eva.isLargerBetter():
+ bestIndex = np.argmax(metrics)
+ else:
+ bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return CrossValidatorModel(bestModel)