diff options
author | Xusen Yin <yinxusen@gmail.com> | 2016-05-01 12:29:01 -0700 |
---|---|---|
committer | Joseph K. Bradley <joseph@databricks.com> | 2016-05-01 12:29:01 -0700 |
commit | a6428292f78fd594f41a4a7bf254d40268f46305 (patch) | |
tree | 4abbc07b299f0b05e563e21bcfdcc42afdfc4b2b /mllib/src/main | |
parent | cdf9e9753df4e7f2fa4e972d1bfded4e22943c27 (diff) | |
download | spark-a6428292f78fd594f41a4a7bf254d40268f46305.tar.gz spark-a6428292f78fd594f41a4a7bf254d40268f46305.tar.bz2 spark-a6428292f78fd594f41a4a7bf254d40268f46305.zip |
[SPARK-14931][ML][PYTHON] Mismatched default values between pipelines in Spark and PySpark - update
## What changes were proposed in this pull request?
This PR is an update for [https://github.com/apache/spark/pull/12738] which:
* Adds a generic unit test for JavaParams wrappers in pyspark.ml for checking default Param values vs. the defaults in the Scala side
* Various fixes for bugs found
* This includes changing classes taking weightCol to treat unset and empty String Param values the same way.
Defaults changed:
* Scala
* LogisticRegression: weightCol defaults to not set (instead of empty string)
* StringIndexer: labels default to not set (instead of empty array)
* GeneralizedLinearRegression:
* maxIter always defaults to 25 (simpler than defaulting to 25 for a particular solver)
* weightCol defaults to not set (instead of empty string)
* LinearRegression: weightCol defaults to not set (instead of empty string)
* Python
* MultilayerPerceptron: layers default to not set (instead of [1,1])
* ChiSqSelector: numTopFeatures defaults to 50 (instead of not set)
## How was this patch tested?
Generic unit test. Manually tested that unit test by changing defaults and verifying that broke the test.
Author: Joseph K. Bradley <joseph@databricks.com>
Author: yinxusen <yinxusen@gmail.com>
Closes #12816 from jkbradley/yinxusen-SPARK-14931.
Diffstat (limited to 'mllib/src/main')
4 files changed, 32 insertions, 26 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 717e93c058..d2d4e249b4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -235,13 +235,12 @@ class LogisticRegression @Since("1.2.0") ( /** * Whether to over-/under-sample training instances according to the given weights in weightCol. - * If empty, all instances are treated equally (weight 1.0). - * Default is empty, so all instances have weight one. + * If not set or empty String, all instances are treated equally (weight 1.0). + * Default is not set, so all instances have weight one. * @group setParam */ @Since("1.6.0") def setWeightCol(value: String): this.type = set(weightCol, value) - setDefault(weightCol -> "") @Since("1.5.0") override def setThresholds(value: Array[Double]): this.type = super.setThresholds(value) @@ -264,7 +263,7 @@ class LogisticRegression @Since("1.2.0") ( protected[spark] def train(dataset: Dataset[_], handlePersistence: Boolean): LogisticRegressionModel = { - val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 7e0d374f02..cc0571fd7e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -263,13 +263,12 @@ class IndexToString private[ml] (override val uid: String) /** * Optional param for array of labels specifying index-string mapping. * - * Default: Empty array, in which case [[inputCol]] metadata is used for labels. + * Default: Not specified, in which case [[inputCol]] metadata is used for labels. * @group param */ final val labels: StringArrayParam = new StringArrayParam(this, "labels", "Optional array of labels specifying index-string mapping." + " If not provided or if empty, then metadata from inputCol is used instead.") - setDefault(labels, Array.empty[String]) /** @group getParam */ final def getLabels: Array[String] = $(labels) @@ -292,7 +291,7 @@ class IndexToString private[ml] (override val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { val inputColSchema = dataset.schema($(inputCol)) // If the labels array is empty use column metadata - val values = if ($(labels).isEmpty) { + val values = if (!isDefined(labels) || $(labels).isEmpty) { Attribute.fromStructField(inputColSchema) .asInstanceOf[NominalAttribute].values.get } else { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index bf9d3ff30c..c294ef31f9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -31,7 +31,7 @@ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.linalg.{BLAS, Vector} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} @@ -101,9 +101,6 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam schema: StructType, fitting: Boolean, featuresDataType: DataType): StructType = { - if ($(solver) == "irls") { - setDefault(maxIter -> 25) - } if (isDefined(link)) { require(supportedFamilyAndLinkPairs.contains( Family.fromName($(family)) -> Link.fromName($(link))), "Generalized Linear Regression " + @@ -171,13 +168,14 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) /** - * Sets the maximum number of iterations. - * Default is 25 if the solver algorithm is "irls". + * Sets the maximum number of iterations (applicable for solver "irls"). + * Default is 25. * * @group setParam */ @Since("2.0.0") def setMaxIter(value: Int): this.type = set(maxIter, value) + setDefault(maxIter -> 25) /** * Sets the convergence tolerance of iterations. @@ -213,7 +211,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val */ @Since("2.0.0") def setWeightCol(value: String): this.type = set(weightCol, value) - setDefault(weightCol -> "") /** * Sets the solver algorithm used for optimization. @@ -252,7 +249,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val throw new SparkException(msg) } - val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => @@ -912,19 +909,27 @@ class GeneralizedLinearRegressionSummary private[regression] ( numInstances } + private def weightCol: Column = { + if (!model.isDefined(model.weightCol) || model.getWeightCol.isEmpty) { + lit(1.0) + } else { + col(model.getWeightCol) + } + } + private[regression] lazy val devianceResiduals: DataFrame = { val drUDF = udf { (y: Double, mu: Double, weight: Double) => val r = math.sqrt(math.max(family.deviance(y, mu, weight), 0.0)) if (y > mu) r else -1.0 * r } - val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + val w = weightCol predictions.select( drUDF(col(model.getLabelCol), col(predictionCol), w).as("devianceResiduals")) } private[regression] lazy val pearsonResiduals: DataFrame = { val prUDF = udf { mu: Double => family.variance(mu) } - val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + val w = weightCol predictions.select(col(model.getLabelCol).minus(col(predictionCol)) .multiply(sqrt(w)).divide(sqrt(prUDF(col(predictionCol)))).as("pearsonResiduals")) } @@ -967,7 +972,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( */ @Since("2.0.0") lazy val nullDeviance: Double = { - val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + val w = weightCol val wtdmu: Double = if (model.getFitIntercept) { val agg = predictions.agg(sum(w.multiply(col(model.getLabelCol))), sum(w)).first() agg.getDouble(0) / agg.getDouble(1) @@ -985,7 +990,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( */ @Since("2.0.0") lazy val deviance: Double = { - val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + val w = weightCol predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { case Row(label: Double, pred: Double, weight: Double) => family.deviance(label, pred, weight) @@ -1010,7 +1015,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** Akaike's "An Information Criterion"(AIC) for the fitted model. */ @Since("2.0.0") lazy val aic: Double = { - val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + val w = weightCol val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0) val t = predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { case Row(label: Double, pred: Double, weight: Double) => diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 5117ee115c..d13b15fd82 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -136,13 +136,12 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String /** * Whether to over-/under-sample training instances according to the given weights in weightCol. - * If empty, all instances are treated equally (weight 1.0). - * Default is empty, so all instances have weight one. + * If not set or empty, all instances are treated equally (weight 1.0). + * Default is not set, so all instances have weight one. * @group setParam */ @Since("1.6.0") def setWeightCol(value: String): this.type = set(weightCol, value) - setDefault(weightCol -> "") /** * Set the solver algorithm used for optimization. @@ -163,7 +162,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val numFeatures = dataset.select(col($(featuresCol))).limit(1).rdd.map { case Row(features: Vector) => features.size }.first() - val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) if (($(solver) == "auto" && $(elasticNetParam) == 0.0 && numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == "normal") { @@ -643,7 +642,11 @@ class LinearRegressionSummary private[regression] ( * the square root of the instance weights. */ lazy val devianceResiduals: Array[Double] = { - val weighted = if (model.getWeightCol.isEmpty) lit(1.0) else sqrt(col(model.getWeightCol)) + val weighted = if (!model.isDefined(model.weightCol) || model.getWeightCol.isEmpty) { + lit(1.0) + } else { + sqrt(col(model.getWeightCol)) + } val dr = predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol)) .multiply(weighted).as("weightedResiduals")) .select(min(col("weightedResiduals")).as("min"), max(col("weightedResiduals")).as("max")) @@ -665,7 +668,7 @@ class LinearRegressionSummary private[regression] ( throw new UnsupportedOperationException( "No Std. Error of coefficients available for this LinearRegressionModel") } else { - val rss = if (model.getWeightCol.isEmpty) { + val rss = if (!model.isDefined(model.weightCol) || model.getWeightCol.isEmpty) { meanSquaredError * numInstances } else { val t = udf { (pred: Double, label: Double, weight: Double) => |