From 9c6556c5f8ab013b36312db4bf02c4c6d965a535 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 6 Apr 2016 12:07:47 -0700 Subject: [SPARK-13430][PYSPARK][ML] Python API for training summaries of linear and logistic regression ## What changes were proposed in this pull request? Adding Python API for training summaries of LogisticRegression and LinearRegression in PySpark ML. ## How was this patch tested? Added unit tests to exercise the api calls for the summary classes. Also, manually verified values are expected and match those from Scala directly. Author: Bryan Cutler Closes #11621 from BryanCutler/pyspark-ml-summary-SPARK-13430. --- python/pyspark/ml/classification.py | 218 +++++++++++++++++++++++++++++++- python/pyspark/ml/regression.py | 245 +++++++++++++++++++++++++++++++++++- python/pyspark/ml/tests.py | 87 +++++++++++-- python/pyspark/ml/wrapper.py | 30 +++-- 4 files changed, 562 insertions(+), 18 deletions(-) (limited to 'python/pyspark') diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 067009559b..be7f9ea9ef 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -19,15 +19,18 @@ import warnings from pyspark import since from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel +from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaCallable from pyspark.ml.param import TypeConverters from pyspark.ml.param.shared import * from pyspark.ml.regression import ( RandomForestParams, TreeEnsembleParams, DecisionTreeModel, TreeEnsembleModels) from pyspark.mllib.common import inherit_doc +from pyspark.sql import DataFrame __all__ = ['LogisticRegression', 'LogisticRegressionModel', + 'LogisticRegressionSummary', 'LogisticRegressionTrainingSummary', + 'BinaryLogisticRegressionSummary', 'BinaryLogisticRegressionTrainingSummary', 'DecisionTreeClassifier', 'DecisionTreeClassificationModel', 'GBTClassifier', 'GBTClassificationModel', 'RandomForestClassifier', 'RandomForestClassificationModel', @@ -233,6 +236,219 @@ class LogisticRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable): """ return self._call_java("intercept") + @property + @since("2.0.0") + def summary(self): + """ + Gets summary (e.g. residuals, mse, r-squared ) of model on + training set. An exception is thrown if + `trainingSummary is None`. + """ + java_blrt_summary = self._call_java("summary") + # Note: Once multiclass is added, update this to return correct summary + return BinaryLogisticRegressionTrainingSummary(java_blrt_summary) + + @property + @since("2.0.0") + def hasSummary(self): + """ + Indicates whether a training summary exists for this model + instance. + """ + return self._call_java("hasSummary") + + @since("2.0.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + + :param dataset: + Test dataset to evaluate model on, where dataset is an + instance of :py:class:`pyspark.sql.DataFrame` + """ + if not isinstance(dataset, DataFrame): + raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) + java_blr_summary = self._call_java("evaluate", dataset) + return BinaryLogisticRegressionSummary(java_blr_summary) + + +class LogisticRegressionSummary(JavaCallable): + """ + Abstraction for Logistic Regression Results for a given model. + + .. versionadded:: 2.0.0 + """ + + @property + @since("2.0.0") + def predictions(self): + """ + Dataframe outputted by the model's `transform` method. + """ + return self._call_java("predictions") + + @property + @since("2.0.0") + def probabilityCol(self): + """ + Field in "predictions" which gives the calibrated probability + of each class as a vector. + """ + return self._call_java("probabilityCol") + + @property + @since("2.0.0") + def labelCol(self): + """ + Field in "predictions" which gives the true label of each + instance. + """ + return self._call_java("labelCol") + + @property + @since("2.0.0") + def featuresCol(self): + """ + Field in "predictions" which gives the features of each instance + as a vector. + """ + return self._call_java("featuresCol") + + +@inherit_doc +class LogisticRegressionTrainingSummary(LogisticRegressionSummary): + """ + Abstraction for multinomial Logistic Regression Training results. + Currently, the training summary ignores the training weights except + for the objective trace. + + .. versionadded:: 2.0.0 + """ + + @property + @since("2.0.0") + def objectiveHistory(self): + """ + Objective function (scaled loss + regularization) at each + iteration. + """ + return self._call_java("objectiveHistory") + + @property + @since("2.0.0") + def totalIterations(self): + """ + Number of training iterations until termination. + """ + return self._call_java("totalIterations") + + +@inherit_doc +class BinaryLogisticRegressionSummary(LogisticRegressionSummary): + """ + .. note:: Experimental + + Binary Logistic regression results for a given model. + + .. versionadded:: 2.0.0 + """ + + @property + @since("2.0.0") + def roc(self): + """ + Returns the receiver operating characteristic (ROC) curve, + which is an Dataframe having two fields (FPR, TPR) with + (0.0, 0.0) prepended and (1.0, 1.0) appended to it. + Reference: http://en.wikipedia.org/wiki/Receiver_operating_characteristic + + Note: This ignores instance weights (setting all to 1.0) from + `LogisticRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("roc") + + @property + @since("2.0.0") + def areaUnderROC(self): + """ + Computes the area under the receiver operating characteristic + (ROC) curve. + + Note: This ignores instance weights (setting all to 1.0) from + `LogisticRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("areaUnderROC") + + @property + @since("2.0.0") + def pr(self): + """ + Returns the precision-recall curve, which is an Dataframe + containing two fields recall, precision with (0.0, 1.0) prepended + to it. + + Note: This ignores instance weights (setting all to 1.0) from + `LogisticRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("pr") + + @property + @since("2.0.0") + def fMeasureByThreshold(self): + """ + Returns a dataframe with two fields (threshold, F-Measure) curve + with beta = 1.0. + + Note: This ignores instance weights (setting all to 1.0) from + `LogisticRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("fMeasureByThreshold") + + @property + @since("2.0.0") + def precisionByThreshold(self): + """ + Returns a dataframe with two fields (threshold, precision) curve. + Every possible probability obtained in transforming the dataset + are used as thresholds used in calculating the precision. + + Note: This ignores instance weights (setting all to 1.0) from + `LogisticRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("precisionByThreshold") + + @property + @since("2.0.0") + def recallByThreshold(self): + """ + Returns a dataframe with two fields (threshold, recall) curve. + Every possible probability obtained in transforming the dataset + are used as thresholds used in calculating the recall. + + Note: This ignores instance weights (setting all to 1.0) from + `LogisticRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("recallByThreshold") + + +@inherit_doc +class BinaryLogisticRegressionTrainingSummary(BinaryLogisticRegressionSummary, + LogisticRegressionTrainingSummary): + """ + .. note:: Experimental + + Binary Logistic regression training results for a given model. + + .. versionadded:: 2.0.0 + """ + pass + class TreeClassifierParams(object): """ diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index de8a5e4bed..6cd1b4bf3a 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -20,8 +20,9 @@ import warnings from pyspark import since from pyspark.ml.param.shared import * from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel +from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaCallable from pyspark.mllib.common import inherit_doc +from pyspark.sql import DataFrame __all__ = ['AFTSurvivalRegression', 'AFTSurvivalRegressionModel', @@ -29,6 +30,7 @@ __all__ = ['AFTSurvivalRegression', 'AFTSurvivalRegressionModel', 'GBTRegressor', 'GBTRegressionModel', 'IsotonicRegression', 'IsotonicRegressionModel', 'LinearRegression', 'LinearRegressionModel', + 'LinearRegressionSummary', 'LinearRegressionTrainingSummary', 'RandomForestRegressor', 'RandomForestRegressionModel'] @@ -131,7 +133,6 @@ class LinearRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable): """ Model weights. """ - warnings.warn("weights is deprecated. Use coefficients instead.") return self._call_java("weights") @@ -151,6 +152,246 @@ class LinearRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable): """ return self._call_java("intercept") + @property + @since("2.0.0") + def summary(self): + """ + Gets summary (e.g. residuals, mse, r-squared ) of model on + training set. An exception is thrown if + `trainingSummary is None`. + """ + java_lrt_summary = self._call_java("summary") + return LinearRegressionTrainingSummary(java_lrt_summary) + + @property + @since("2.0.0") + def hasSummary(self): + """ + Indicates whether a training summary exists for this model + instance. + """ + return self._call_java("hasSummary") + + @since("2.0.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + + :param dataset: + Test dataset to evaluate model on, where dataset is an + instance of :py:class:`pyspark.sql.DataFrame` + """ + if not isinstance(dataset, DataFrame): + raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) + java_lr_summary = self._call_java("evaluate", dataset) + return LinearRegressionSummary(java_lr_summary) + + +class LinearRegressionSummary(JavaCallable): + """ + .. note:: Experimental + + Linear regression results evaluated on a dataset. + + .. versionadded:: 2.0.0 + """ + + @property + @since("2.0.0") + def predictions(self): + """ + Dataframe outputted by the model's `transform` method. + """ + return self._call_java("predictions") + + @property + @since("2.0.0") + def predictionCol(self): + """ + Field in "predictions" which gives the predicted value of + the label at each instance. + """ + return self._call_java("predictionCol") + + @property + @since("2.0.0") + def labelCol(self): + """ + Field in "predictions" which gives the true label of each + instance. + """ + return self._call_java("labelCol") + + @property + @since("2.0.0") + def featuresCol(self): + """ + Field in "predictions" which gives the features of each instance + as a vector. + """ + return self._call_java("featuresCol") + + @property + @since("2.0.0") + def explainedVariance(self): + """ + Returns the explained variance regression score. + explainedVariance = 1 - variance(y - \hat{y}) / variance(y) + Reference: http://en.wikipedia.org/wiki/Explained_variation + + Note: This ignores instance weights (setting all to 1.0) from + `LinearRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("explainedVariance") + + @property + @since("2.0.0") + def meanAbsoluteError(self): + """ + Returns the mean absolute error, which is a risk function + corresponding to the expected value of the absolute error + loss or l1-norm loss. + + Note: This ignores instance weights (setting all to 1.0) from + `LinearRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("meanAbsoluteError") + + @property + @since("2.0.0") + def meanSquaredError(self): + """ + Returns the mean squared error, which is a risk function + corresponding to the expected value of the squared error + loss or quadratic loss. + + Note: This ignores instance weights (setting all to 1.0) from + `LinearRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("meanSquaredError") + + @property + @since("2.0.0") + def rootMeanSquaredError(self): + """ + Returns the root mean squared error, which is defined as the + square root of the mean squared error. + + Note: This ignores instance weights (setting all to 1.0) from + `LinearRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("rootMeanSquaredError") + + @property + @since("2.0.0") + def r2(self): + """ + Returns R^2^, the coefficient of determination. + Reference: http://en.wikipedia.org/wiki/Coefficient_of_determination + + Note: This ignores instance weights (setting all to 1.0) from + `LinearRegression.weightCol`. This will change in later Spark + versions. + """ + return self._call_java("r2") + + @property + @since("2.0.0") + def residuals(self): + """ + Residuals (label - predicted value) + """ + return self._call_java("residuals") + + @property + @since("2.0.0") + def numInstances(self): + """ + Number of instances in DataFrame predictions + """ + return self._call_java("numInstances") + + @property + @since("2.0.0") + def devianceResiduals(self): + """ + The weighted residuals, the usual residuals rescaled by the + square root of the instance weights. + """ + return self._call_java("devianceResiduals") + + @property + @since("2.0.0") + def coefficientStandardErrors(self): + """ + Standard error of estimated coefficients and intercept. + This value is only available when using the "normal" solver. + + .. seealso:: :py:attr:`LinearRegression.solver` + """ + return self._call_java("coefficientStandardErrors") + + @property + @since("2.0.0") + def tValues(self): + """ + T-statistic of estimated coefficients and intercept. + This value is only available when using the "normal" solver. + + .. seealso:: :py:attr:`LinearRegression.solver` + """ + return self._call_java("tValues") + + @property + @since("2.0.0") + def pValues(self): + """ + Two-sided p-value of estimated coefficients and intercept. + This value is only available when using the "normal" solver. + + .. seealso:: :py:attr:`LinearRegression.solver` + """ + return self._call_java("pValues") + + +@inherit_doc +class LinearRegressionTrainingSummary(LinearRegressionSummary): + """ + .. note:: Experimental + + Linear regression training results. Currently, the training summary ignores the + training weights except for the objective trace. + + .. versionadded:: 2.0.0 + """ + + @property + @since("2.0.0") + def objectiveHistory(self): + """ + Objective function (scaled loss + regularization) at each + iteration. + This value is only available when using the "l-bfgs" solver. + + .. seealso:: :py:attr:`LinearRegression.solver` + """ + return self._call_java("objectiveHistory") + + @property + @since("2.0.0") + def totalIterations(self): + """ + Number of training iterations until termination. + This value is only available when using the "l-bfgs" solver. + + .. seealso:: :py:attr:`LinearRegression.solver` + """ + return self._call_java("totalIterations") + @inherit_doc class IsotonicRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index e3f873e3a7..2dcd5eeb52 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -239,6 +239,17 @@ class OtherTestParams(HasMaxIter, HasInputCol, HasSeed): return self._set(**kwargs) +class HasThrowableProperty(Params): + + def __init__(self): + super(HasThrowableProperty, self).__init__() + self.p = Param(self, "none", "empty param") + + @property + def test_property(self): + raise RuntimeError("Test property to raise error when invoked") + + class ParamTests(PySparkTestCase): def test_copy_new_parent(self): @@ -749,15 +760,75 @@ class PersistenceTest(PySparkTestCase): pass -class HasThrowableProperty(Params): - - def __init__(self): - super(HasThrowableProperty, self).__init__() - self.p = Param(self, "none", "empty param") +class TrainingSummaryTest(PySparkTestCase): - @property - def test_property(self): - raise RuntimeError("Test property to raise error when invoked") + def test_linear_regression_summary(self): + from pyspark.mllib.linalg import Vectors + sqlContext = SQLContext(self.sc) + df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], + ["label", "weight", "features"]) + lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight", + fitIntercept=False) + model = lr.fit(df) + self.assertTrue(model.hasSummary) + s = model.summary + # test that api is callable and returns expected types + self.assertGreater(s.totalIterations, 0) + self.assertTrue(isinstance(s.predictions, DataFrame)) + self.assertEqual(s.predictionCol, "prediction") + self.assertEqual(s.labelCol, "label") + self.assertEqual(s.featuresCol, "features") + objHist = s.objectiveHistory + self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) + self.assertAlmostEqual(s.explainedVariance, 0.25, 2) + self.assertAlmostEqual(s.meanAbsoluteError, 0.0) + self.assertAlmostEqual(s.meanSquaredError, 0.0) + self.assertAlmostEqual(s.rootMeanSquaredError, 0.0) + self.assertAlmostEqual(s.r2, 1.0, 2) + self.assertTrue(isinstance(s.residuals, DataFrame)) + self.assertEqual(s.numInstances, 2) + devResiduals = s.devianceResiduals + self.assertTrue(isinstance(devResiduals, list) and isinstance(devResiduals[0], float)) + coefStdErr = s.coefficientStandardErrors + self.assertTrue(isinstance(coefStdErr, list) and isinstance(coefStdErr[0], float)) + tValues = s.tValues + self.assertTrue(isinstance(tValues, list) and isinstance(tValues[0], float)) + pValues = s.pValues + self.assertTrue(isinstance(pValues, list) and isinstance(pValues[0], float)) + # test evaluation (with training dataset) produces a summary with same values + # one check is enough to verify a summary is returned, Scala version runs full test + sameSummary = model.evaluate(df) + self.assertAlmostEqual(sameSummary.explainedVariance, s.explainedVariance) + + def test_logistic_regression_summary(self): + from pyspark.mllib.linalg import Vectors + sqlContext = SQLContext(self.sc) + df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], + ["label", "weight", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False) + model = lr.fit(df) + self.assertTrue(model.hasSummary) + s = model.summary + # test that api is callable and returns expected types + self.assertTrue(isinstance(s.predictions, DataFrame)) + self.assertEqual(s.probabilityCol, "probability") + self.assertEqual(s.labelCol, "label") + self.assertEqual(s.featuresCol, "features") + objHist = s.objectiveHistory + self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) + self.assertGreater(s.totalIterations, 0) + self.assertTrue(isinstance(s.roc, DataFrame)) + self.assertAlmostEqual(s.areaUnderROC, 1.0, 2) + self.assertTrue(isinstance(s.pr, DataFrame)) + self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) + self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) + self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + # test evaluation (with training dataset) produces a summary with same values + # one check is enough to verify a summary is returned, Scala version runs full test + sameSummary = model.evaluate(df) + self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) if __name__ == "__main__": diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index ca93bf7d7d..a2cf2296fb 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -213,8 +213,30 @@ class JavaTransformer(Transformer, JavaWrapper): return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx) +class JavaCallable(object): + """ + Wrapper for a plain object in JVM to make Java calls, can be used + as a mixin to another class that defines a _java_obj wrapper + """ + def __init__(self, java_obj=None, sc=None): + super(JavaCallable, self).__init__() + self._sc = sc if sc is not None else SparkContext._active_spark_context + # if this class is a mixin and _java_obj is already defined then don't initialize + if java_obj is not None or not hasattr(self, "_java_obj"): + self._java_obj = java_obj + + def __del__(self): + if self._java_obj is not None: + self._sc._gateway.detach(self._java_obj) + + def _call_java(self, name, *args): + m = getattr(self._java_obj, name) + java_args = [_py2java(self._sc, arg) for arg in args] + return _java2py(self._sc, m(*java_args)) + + @inherit_doc -class JavaModel(Model, JavaTransformer): +class JavaModel(Model, JavaCallable, JavaTransformer): """ Base class for :py:class:`Model`s that wrap Java/Scala implementations. Subclasses should inherit this class before @@ -259,9 +281,3 @@ class JavaModel(Model, JavaTransformer): that._java_obj = self._java_obj.copy(self._empty_java_param_map()) that._transfer_params_to_java() return that - - def _call_java(self, name, *args): - m = getattr(self._java_obj, name) - sc = SparkContext._active_spark_context - java_args = [_py2java(sc, arg) for arg in args] - return _java2py(sc, m(*java_args)) -- cgit v1.2.3