aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/ml/tests.py
diff options
context:
space:
mode:
authornoelsmith <mail@noelsmith.com>2015-08-27 23:59:30 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-08-27 23:59:30 -0700
commit7583681e6b0824d7eed471dc4d8fa0b2addf9ffc (patch)
treed8cfb483586f9aa024fc328aaf515048426b664e /python/pyspark/ml/tests.py
parent89b943438512fcfb239c268b43431397de46cbcf (diff)
downloadspark-7583681e6b0824d7eed471dc4d8fa0b2addf9ffc.tar.gz
spark-7583681e6b0824d7eed471dc4d8fa0b2addf9ffc.tar.bz2
spark-7583681e6b0824d7eed471dc4d8fa0b2addf9ffc.zip
[SPARK-10188] [PYSPARK] Pyspark CrossValidator with RMSE selects incorrect model
* Added isLargerBetter() method to Pyspark Evaluator to match the Scala version. * JavaEvaluator delegates isLargerBetter() to underlying Scala object. * Added check for isLargerBetter() in CrossValidator to determine whether to use argmin or argmax. * Added test cases for where smaller is better (RMSE) and larger is better (R-Squared). (This contribution is my original work and that I license the work to the project under Sparks' open source license) Author: noelsmith <mail@noelsmith.com> Closes #8399 from noel-smith/pyspark-rmse-xval-fix.
Diffstat (limited to 'python/pyspark/ml/tests.py')
-rw-r--r--python/pyspark/ml/tests.py87
1 files changed, 87 insertions, 0 deletions
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index c151d21fd6..60e4237293 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -32,11 +32,14 @@ else:
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
from pyspark.sql import DataFrame, SQLContext
+from pyspark.sql.functions import rand
+from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
from pyspark.ml.util import keyword_only
from pyspark.ml import Estimator, Model, Pipeline, Transformer
from pyspark.ml.feature import *
+from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.mllib.linalg import DenseVector
@@ -264,5 +267,89 @@ class FeatureTests(PySparkTestCase):
self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"])
+class HasInducedError(Params):
+
+ def __init__(self):
+ super(HasInducedError, self).__init__()
+ self.inducedError = Param(self, "inducedError",
+ "Uniformly-distributed error added to feature")
+
+ def getInducedError(self):
+ return self.getOrDefault(self.inducedError)
+
+
+class InducedErrorModel(Model, HasInducedError):
+
+ def __init__(self):
+ super(InducedErrorModel, self).__init__()
+
+ def _transform(self, dataset):
+ return dataset.withColumn("prediction",
+ dataset.feature + (rand(0) * self.getInducedError()))
+
+
+class InducedErrorEstimator(Estimator, HasInducedError):
+
+ def __init__(self, inducedError=1.0):
+ super(InducedErrorEstimator, self).__init__()
+ self._set(inducedError=inducedError)
+
+ def _fit(self, dataset):
+ model = InducedErrorModel()
+ self._copyValues(model)
+ return model
+
+
+class CrossValidatorTests(PySparkTestCase):
+
+ def test_fit_minimize_metric(self):
+ sqlContext = SQLContext(self.sc)
+ dataset = sqlContext.createDataFrame([
+ (10, 10.0),
+ (50, 50.0),
+ (100, 100.0),
+ (500, 500.0)] * 10,
+ ["feature", "label"])
+
+ iee = InducedErrorEstimator()
+ evaluator = RegressionEvaluator(metricName="rmse")
+
+ grid = (ParamGridBuilder()
+ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+ .build())
+ cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+ cvModel = cv.fit(dataset)
+ bestModel = cvModel.bestModel
+ bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+ self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+ "Best model should have zero induced error")
+ self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
+
+ def test_fit_maximize_metric(self):
+ sqlContext = SQLContext(self.sc)
+ dataset = sqlContext.createDataFrame([
+ (10, 10.0),
+ (50, 50.0),
+ (100, 100.0),
+ (500, 500.0)] * 10,
+ ["feature", "label"])
+
+ iee = InducedErrorEstimator()
+ evaluator = RegressionEvaluator(metricName="r2")
+
+ grid = (ParamGridBuilder()
+ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
+ .build())
+ cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
+ cvModel = cv.fit(dataset)
+ bestModel = cvModel.bestModel
+ bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
+
+ self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
+ "Best model should have zero induced error")
+ self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
+
+
if __name__ == "__main__":
unittest.main()