/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.ml.regression import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} import breeze.stats.distributions.StudentsT import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.linalg.BLAS._ import org.apache.spark.ml.optim.WeightedLeastSquares import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.evaluation.RegressionMetrics import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DoubleType import org.apache.spark.storage.StorageLevel /** * Params for linear regression. */ private[regression] trait LinearRegressionParams extends PredictorParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver with HasAggregationDepth /** * Linear regression. * * The learning objective is to minimize the squared error, with regularization. * The specific squared error loss function used is: * *

* $$ * L = 1/2n ||A coefficients - y||^2^ * $$ *

* * This supports multiple types of regularization: * - none (a.k.a. ordinary least squares) * - L2 (ridge regression) * - L1 (Lasso) * - L2 + L1 (elastic net) */ @Since("1.3.0") class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String) extends Regressor[Vector, LinearRegression, LinearRegressionModel] with LinearRegressionParams with DefaultParamsWritable with Logging { @Since("1.4.0") def this() = this(Identifiable.randomUID("linReg")) /** * Set the regularization parameter. * Default is 0.0. * * @group setParam */ @Since("1.3.0") def setRegParam(value: Double): this.type = set(regParam, value) setDefault(regParam -> 0.0) /** * Set if we should fit the intercept * Default is true. * * @group setParam */ @Since("1.5.0") def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) setDefault(fitIntercept -> true) /** * Whether to standardize the training features before fitting the model. * The coefficients of models will be always returned on the original scale, * so it will be transparent for users. Note that with/without standardization, * the models should be always converged to the same solution when no regularization * is applied. In R's GLMNET package, the default behavior is true as well. * Default is true. * * @group setParam */ @Since("1.5.0") def setStandardization(value: Boolean): this.type = set(standardization, value) setDefault(standardization -> true) /** * Set the ElasticNet mixing parameter. * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. * For 0 < alpha < 1, the penalty is a combination of L1 and L2. * Default is 0.0 which is an L2 penalty. * * @group setParam */ @Since("1.4.0") def setElasticNetParam(value: Double): this.type = set(elasticNetParam, value) setDefault(elasticNetParam -> 0.0) /** * Set the maximum number of iterations. * Default is 100. * * @group setParam */ @Since("1.3.0") def setMaxIter(value: Int): this.type = set(maxIter, value) setDefault(maxIter -> 100) /** * Set the convergence tolerance of iterations. * Smaller value will lead to higher accuracy with the cost of more iterations. * Default is 1E-6. * * @group setParam */ @Since("1.4.0") def setTol(value: Double): this.type = set(tol, value) setDefault(tol -> 1E-6) /** * Whether to over-/under-sample training instances according to the given weights in weightCol. * If not set or empty, all instances are treated equally (weight 1.0). * Default is not set, so all instances have weight one. * * @group setParam */ @Since("1.6.0") def setWeightCol(value: String): this.type = set(weightCol, value) /** * Set the solver algorithm used for optimization. * In case of linear regression, this can be "l-bfgs", "normal" and "auto". * "l-bfgs" denotes Limited-memory BFGS which is a limited-memory quasi-Newton * optimization method. "normal" denotes using Normal Equation as an analytical * solution to the linear regression problem. * The default value is "auto" which means that the solver algorithm is * selected automatically. * * @group setParam */ @Since("1.6.0") def setSolver(value: String): this.type = set(solver, value) setDefault(solver -> "auto") /** * Suggested depth for treeAggregate (>= 2). * If the dimensions of features or the number of partitions are large, * this param could be adjusted to a larger size. * Default is 2. * @group expertSetParam */ @Since("2.1.0") def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) override protected def train(dataset: Dataset[_]): LinearRegressionModel = { // Extract the number of features before deciding optimization solver. val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) if (($(solver) == "auto" && $(elasticNetParam) == 0.0 && numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == "normal") { require($(elasticNetParam) == 0.0, "Only L2 regularization can be used when normal " + "solver is used.'") // For low dimensional data, WeightedLeastSquares is more efficiently since the // training algorithm only requires one pass through the data. (SPARK-10668) val instances: RDD[Instance] = dataset.select( col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) } val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), $(standardization), true) val model = optimizer.fit(instances) // When it is trained by WeightedLeastSquares, training summary does not // attached returned model. val lrModel = copyValues(new LinearRegressionModel(uid, model.coefficients, model.intercept)) // WeightedLeastSquares does not run through iterations. So it does not generate // an objective history. val (summaryModel, predictionColName) = lrModel.findSummaryModelAndPredictionCol() val trainingSummary = new LinearRegressionTrainingSummary( summaryModel.transform(dataset), predictionColName, $(labelCol), $(featuresCol), summaryModel, model.diagInvAtWA.toArray, Array(0D)) return lrModel.setSummary(trainingSummary) } val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) } val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val (featuresSummarizer, ySummarizer) = { val seqOp = (c: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), instance: Instance) => (c._1.add(instance.features, instance.weight), c._2.add(Vectors.dense(instance.label), instance.weight)) val combOp = (c1: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), c2: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer)) => (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer )(seqOp, combOp, $(aggregationDepth)) } val yMean = ySummarizer.mean(0) val rawYStd = math.sqrt(ySummarizer.variance(0)) if (rawYStd == 0.0) { if ($(fitIntercept) || yMean==0.0) { // If the rawYStd is zero and fitIntercept=true, then the intercept is yMean with // zero coefficient; as a result, training is not needed. // Also, if yMean==0 and rawYStd==0, all the coefficients are zero regardless of // the fitIntercept if (yMean == 0.0) { logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " + s"and the intercept will all be zero; as a result, training is not needed.") } else { logWarning(s"The standard deviation of the label is zero, so the coefficients will be " + s"zeros and the intercept will be the mean of the label; as a result, " + s"training is not needed.") } if (handlePersistence) instances.unpersist() val coefficients = Vectors.sparse(numFeatures, Seq()) val intercept = yMean val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept)) // Handle possible missing or invalid prediction columns val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol() val trainingSummary = new LinearRegressionTrainingSummary( summaryModel.transform(dataset), predictionColName, $(labelCol), $(featuresCol), model, Array(0D), Array(0D)) return model.setSummary(trainingSummary) } else { require($(regParam) == 0.0, "The standard deviation of the label is zero. " + "Model cannot be regularized.") logWarning(s"The standard deviation of the label is zero. " + "Consider setting fitIntercept=true.") } } // if y is constant (rawYStd is zero), then y cannot be scaled. In this case // setting yStd=abs(yMean) ensures that y is not scaled anymore in l-bfgs algorithm. val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean) val featuresMean = featuresSummarizer.mean.toArray val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val bcFeaturesMean = instances.context.broadcast(featuresMean) val bcFeaturesStd = instances.context.broadcast(featuresStd) if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { logWarning("Fitting LinearRegressionModel without intercept on dataset with " + "constant nonzero column, Spark MLlib outputs zero coefficients for constant nonzero " + "columns. This behavior is the same as R glmnet but different from LIBSVM.") } // Since we implicitly do the feature scaling when we compute the cost function // to improve the convergence, the effective regParam will be changed. val effectiveRegParam = $(regParam) / yStd val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept), $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam, $(aggregationDepth)) val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) { new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) } else { val standardizationParam = $(standardization) def effectiveL1RegFun = (index: Int) => { if (standardizationParam) { effectiveL1RegParam } else { // If `standardization` is false, we still standardize the data // to improve the rate of convergence; as a result, we have to // perform this reverse standardization by penalizing each component // differently to get effectively the same objective function when // the training dataset is not standardized. if (featuresStd(index) != 0.0) effectiveL1RegParam / featuresStd(index) else 0.0 } } new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, effectiveL1RegFun, $(tol)) } val initialCoefficients = Vectors.zeros(numFeatures) val states = optimizer.iterations(new CachedDiffFunction(costFun), initialCoefficients.asBreeze.toDenseVector) val (coefficients, objectiveHistory) = { /* Note that in Linear Regression, the objective history (loss + regularization) returned from optimizer is computed in the scaled space given by the following formula.

$$ L &= 1/2n||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2 + regTerms \\ $$

*/ val arrayBuilder = mutable.ArrayBuilder.make[Double] var state: optimizer.State = null while (states.hasNext) { state = states.next() arrayBuilder += state.adjustedValue } if (state == null) { val msg = s"${optimizer.getClass.getName} failed." logError(msg) throw new SparkException(msg) } bcFeaturesMean.destroy(blocking = false) bcFeaturesStd.destroy(blocking = false) /* The coefficients are trained in the scaled space; we're converting them back to the original space. */ val rawCoefficients = state.x.toArray.clone() var i = 0 val len = rawCoefficients.length while (i < len) { rawCoefficients(i) *= { if (featuresStd(i) != 0.0) yStd / featuresStd(i) else 0.0 } i += 1 } (Vectors.dense(rawCoefficients).compressed, arrayBuilder.result()) } /* The intercept in R's GLMNET is computed using closed form after the coefficients are converged. See the following discussion for detail. http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet */ val intercept = if ($(fitIntercept)) { yMean - dot(coefficients, Vectors.dense(featuresMean)) } else { 0.0 } if (handlePersistence) instances.unpersist() val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept)) // Handle possible missing or invalid prediction columns val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol() val trainingSummary = new LinearRegressionTrainingSummary( summaryModel.transform(dataset), predictionColName, $(labelCol), $(featuresCol), model, Array(0D), objectiveHistory) model.setSummary(trainingSummary) } @Since("1.4.0") override def copy(extra: ParamMap): LinearRegression = defaultCopy(extra) } @Since("1.6.0") object LinearRegression extends DefaultParamsReadable[LinearRegression] { @Since("1.6.0") override def load(path: String): LinearRegression = super.load(path) } /** * Model produced by [[LinearRegression]]. */ @Since("1.3.0") class LinearRegressionModel private[ml] ( @Since("1.4.0") override val uid: String, @Since("2.0.0") val coefficients: Vector, @Since("1.3.0") val intercept: Double) extends RegressionModel[Vector, LinearRegressionModel] with LinearRegressionParams with MLWritable { private var trainingSummary: Option[LinearRegressionTrainingSummary] = None override val numFeatures: Int = coefficients.size /** * Gets summary (e.g. residuals, mse, r-squared ) of model on training set. An exception is * thrown if `trainingSummary == None`. */ @Since("1.5.0") def summary: LinearRegressionTrainingSummary = trainingSummary.getOrElse { throw new SparkException("No training summary available for this LinearRegressionModel") } private[regression] def setSummary(summary: LinearRegressionTrainingSummary): this.type = { this.trainingSummary = Some(summary) this } /** Indicates whether a training summary exists for this model instance. */ @Since("1.5.0") def hasSummary: Boolean = trainingSummary.isDefined /** * Evaluates the model on a test dataset. * * @param dataset Test dataset to evaluate model on. */ @Since("2.0.0") def evaluate(dataset: Dataset[_]): LinearRegressionSummary = { // Handle possible missing or invalid prediction columns val (summaryModel, predictionColName) = findSummaryModelAndPredictionCol() new LinearRegressionSummary(summaryModel.transform(dataset), predictionColName, $(labelCol), $(featuresCol), summaryModel, Array(0D)) } /** * If the prediction column is set returns the current model and prediction column, * otherwise generates a new column and sets it as the prediction column on a new copy * of the current model. */ private[regression] def findSummaryModelAndPredictionCol(): (LinearRegressionModel, String) = { $(predictionCol) match { case "" => val predictionColName = "prediction_" + java.util.UUID.randomUUID.toString (copy(ParamMap.empty).setPredictionCol(predictionColName), predictionColName) case p => (this, p) } } override protected def predict(features: Vector): Double = { dot(features, coefficients) + intercept } @Since("1.4.0") override def copy(extra: ParamMap): LinearRegressionModel = { val newModel = copyValues(new LinearRegressionModel(uid, coefficients, intercept), extra) if (trainingSummary.isDefined) newModel.setSummary(trainingSummary.get) newModel.setParent(parent) } /** * Returns a [[org.apache.spark.ml.util.MLWriter]] instance for this ML instance. * * For [[LinearRegressionModel]], this does NOT currently save the training [[summary]]. * An option to save [[summary]] may be added in the future. * * This also does not save the [[parent]] currently. */ @Since("1.6.0") override def write: MLWriter = new LinearRegressionModel.LinearRegressionModelWriter(this) } @Since("1.6.0") object LinearRegressionModel extends MLReadable[LinearRegressionModel] { @Since("1.6.0") override def read: MLReader[LinearRegressionModel] = new LinearRegressionModelReader @Since("1.6.0") override def load(path: String): LinearRegressionModel = super.load(path) /** [[MLWriter]] instance for [[LinearRegressionModel]] */ private[LinearRegressionModel] class LinearRegressionModelWriter(instance: LinearRegressionModel) extends MLWriter with Logging { private case class Data(intercept: Double, coefficients: Vector) override protected def saveImpl(path: String): Unit = { // Save metadata and Params DefaultParamsWriter.saveMetadata(instance, path, sc) // Save model data: intercept, coefficients val data = Data(instance.intercept, instance.coefficients) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } } private class LinearRegressionModelReader extends MLReader[LinearRegressionModel] { /** Checked against metadata when loading model */ private val className = classOf[LinearRegressionModel].getName override def load(path: String): LinearRegressionModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.format("parquet").load(dataPath) val Row(intercept: Double, coefficients: Vector) = MLUtils.convertVectorColumnsToML(data, "coefficients") .select("intercept", "coefficients") .head() val model = new LinearRegressionModel(metadata.uid, coefficients, intercept) DefaultParamsReader.getAndSetParams(model, metadata) model } } } /** * :: Experimental :: * Linear regression training results. Currently, the training summary ignores the * training weights except for the objective trace. * * @param predictions predictions output by the model's `transform` method. * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. */ @Since("1.5.0") @Experimental class LinearRegressionTrainingSummary private[regression] ( predictions: DataFrame, predictionCol: String, labelCol: String, featuresCol: String, model: LinearRegressionModel, diagInvAtWA: Array[Double], val objectiveHistory: Array[Double]) extends LinearRegressionSummary( predictions, predictionCol, labelCol, featuresCol, model, diagInvAtWA) { /** * Number of training iterations until termination * * This value is only available when using the "l-bfgs" solver. * * @see [[LinearRegression.solver]] */ @Since("1.5.0") val totalIterations = objectiveHistory.length } /** * :: Experimental :: * Linear regression results evaluated on a dataset. * * @param predictions predictions output by the model's `transform` method. * @param predictionCol Field in "predictions" which gives the predicted value of the label at * each instance. * @param labelCol Field in "predictions" which gives the true label of each instance. * @param featuresCol Field in "predictions" which gives the features of each instance as a vector. */ @Since("1.5.0") @Experimental class LinearRegressionSummary private[regression] ( @transient val predictions: DataFrame, val predictionCol: String, val labelCol: String, val featuresCol: String, private val privateModel: LinearRegressionModel, private val diagInvAtWA: Array[Double]) extends Serializable { @deprecated("The model field is deprecated and will be removed in 2.1.0.", "2.0.0") val model: LinearRegressionModel = privateModel @transient private val metrics = new RegressionMetrics( predictions .select(col(predictionCol), col(labelCol).cast(DoubleType)) .rdd .map { case Row(pred: Double, label: Double) => (pred, label) }, !privateModel.getFitIntercept) /** * Returns the explained variance regression score. * explainedVariance = 1 - variance(y - \hat{y}) / variance(y) * Reference: [[http://en.wikipedia.org/wiki/Explained_variation]] * * Note: This ignores instance weights (setting all to 1.0) from [[LinearRegression.weightCol]]. * This will change in later Spark versions. */ @Since("1.5.0") val explainedVariance: Double = metrics.explainedVariance /** * Returns the mean absolute error, which is a risk function corresponding to the * expected value of the absolute error loss or l1-norm loss. * * Note: This ignores instance weights (setting all to 1.0) from [[LinearRegression.weightCol]]. * This will change in later Spark versions. */ @Since("1.5.0") val meanAbsoluteError: Double = metrics.meanAbsoluteError /** * Returns the mean squared error, which is a risk function corresponding to the * expected value of the squared error loss or quadratic loss. * * Note: This ignores instance weights (setting all to 1.0) from [[LinearRegression.weightCol]]. * This will change in later Spark versions. */ @Since("1.5.0") val meanSquaredError: Double = metrics.meanSquaredError /** * Returns the root mean squared error, which is defined as the square root of * the mean squared error. * * Note: This ignores instance weights (setting all to 1.0) from [[LinearRegression.weightCol]]. * This will change in later Spark versions. */ @Since("1.5.0") val rootMeanSquaredError: Double = metrics.rootMeanSquaredError /** * Returns R^2^, the coefficient of determination. * Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] * * Note: This ignores instance weights (setting all to 1.0) from [[LinearRegression.weightCol]]. * This will change in later Spark versions. */ @Since("1.5.0") val r2: Double = metrics.r2 /** Residuals (label - predicted value) */ @Since("1.5.0") @transient lazy val residuals: DataFrame = { val t = udf { (pred: Double, label: Double) => label - pred } predictions.select(t(col(predictionCol), col(labelCol)).as("residuals")) } /** Number of instances in DataFrame predictions */ lazy val numInstances: Long = predictions.count() /** Degrees of freedom */ private val degreesOfFreedom: Long = if (privateModel.getFitIntercept) { numInstances - privateModel.coefficients.size - 1 } else { numInstances - privateModel.coefficients.size } /** * The weighted residuals, the usual residuals rescaled by * the square root of the instance weights. */ lazy val devianceResiduals: Array[Double] = { val weighted = if (!privateModel.isDefined(privateModel.weightCol) || privateModel.getWeightCol.isEmpty) { lit(1.0) } else { sqrt(col(privateModel.getWeightCol)) } val dr = predictions .select(col(privateModel.getLabelCol).minus(col(privateModel.getPredictionCol)) .multiply(weighted).as("weightedResiduals")) .select(min(col("weightedResiduals")).as("min"), max(col("weightedResiduals")).as("max")) .first() Array(dr.getDouble(0), dr.getDouble(1)) } /** * Standard error of estimated coefficients and intercept. * This value is only available when using the "normal" solver. * * If [[LinearRegression.fitIntercept]] is set to true, * then the last element returned corresponds to the intercept. * * @see [[LinearRegression.solver]] */ lazy val coefficientStandardErrors: Array[Double] = { if (diagInvAtWA.length == 1 && diagInvAtWA(0) == 0) { throw new UnsupportedOperationException( "No Std. Error of coefficients available for this LinearRegressionModel") } else { val rss = if (!privateModel.isDefined(privateModel.weightCol) || privateModel.getWeightCol.isEmpty) { meanSquaredError * numInstances } else { val t = udf { (pred: Double, label: Double, weight: Double) => math.pow(label - pred, 2.0) * weight } predictions.select(t(col(privateModel.getPredictionCol), col(privateModel.getLabelCol), col(privateModel.getWeightCol)).as("wse")).agg(sum(col("wse"))).first().getDouble(0) } val sigma2 = rss / degreesOfFreedom diagInvAtWA.map(_ * sigma2).map(math.sqrt) } } /** * T-statistic of estimated coefficients and intercept. * This value is only available when using the "normal" solver. * * If [[LinearRegression.fitIntercept]] is set to true, * then the last element returned corresponds to the intercept. * * @see [[LinearRegression.solver]] */ lazy val tValues: Array[Double] = { if (diagInvAtWA.length == 1 && diagInvAtWA(0) == 0) { throw new UnsupportedOperationException( "No t-statistic available for this LinearRegressionModel") } else { val estimate = if (privateModel.getFitIntercept) { Array.concat(privateModel.coefficients.toArray, Array(privateModel.intercept)) } else { privateModel.coefficients.toArray } estimate.zip(coefficientStandardErrors).map { x => x._1 / x._2 } } } /** * Two-sided p-value of estimated coefficients and intercept. * This value is only available when using the "normal" solver. * * If [[LinearRegression.fitIntercept]] is set to true, * then the last element returned corresponds to the intercept. * * @see [[LinearRegression.solver]] */ lazy val pValues: Array[Double] = { if (diagInvAtWA.length == 1 && diagInvAtWA(0) == 0) { throw new UnsupportedOperationException( "No p-value available for this LinearRegressionModel") } else { tValues.map { x => 2.0 * (1.0 - StudentsT(degreesOfFreedom.toDouble).cdf(math.abs(x))) } } } } /** * LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function, * as used in linear regression for samples in sparse or dense vector in an online fashion. * * Two LeastSquaresAggregator can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. * * For improving the convergence rate during the optimization process, and also preventing against * features with very large variances exerting an overly large influence during model training, * package like R's GLMNET performs the scaling to unit variance and removing the mean to reduce * the condition number, and then trains the model in scaled space but returns the coefficients in * the original scale. See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf * * However, we don't want to apply the `StandardScaler` on the training dataset, and then cache * the standardized dataset since it will create a lot of overhead. As a result, we perform the * scaling implicitly when we compute the objective function. The following is the mathematical * derivation. * * Note that we don't deal with intercept by adding bias here, because the intercept * can be computed using closed form after the coefficients are converged. * See this discussion for detail. * http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet * * When training with intercept enabled, * The objective function in the scaled space is given by * *

* $$ * L = 1/2n ||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2, * $$ *

* * where $\bar{x_i}$ is the mean of $x_i$, $\hat{x_i}$ is the standard deviation of $x_i$, * $\bar{y}$ is the mean of label, and $\hat{y}$ is the standard deviation of label. * * If we fitting the intercept disabled (that is forced through 0.0), * we can use the same equation except we set $\bar{y}$ and $\bar{x_i}$ to 0 instead * of the respective means. * * This can be rewritten as * *

* $$ * \begin{align} * L &= 1/2n ||\sum_i (w_i/\hat{x_i})x_i - \sum_i (w_i/\hat{x_i})\bar{x_i} - y / \hat{y} * + \bar{y} / \hat{y}||^2 \\ * &= 1/2n ||\sum_i w_i^\prime x_i - y / \hat{y} + offset||^2 = 1/2n diff^2 * \end{align} * $$ *

* * where $w_i^\prime$ is the effective coefficients defined by $w_i/\hat{x_i}$, offset is * *

* $$ * - \sum_i (w_i/\hat{x_i})\bar{x_i} + \bar{y} / \hat{y}. * $$ *

* * and diff is * *

* $$ * \sum_i w_i^\prime x_i - y / \hat{y} + offset * $$ *

* * Note that the effective coefficients and offset don't depend on training dataset, * so they can be precomputed. * * Now, the first derivative of the objective function in scaled space is * *

* $$ * \frac{\partial L}{\partial w_i} = diff/N (x_i - \bar{x_i}) / \hat{x_i} * $$ *

* * However, $(x_i - \bar{x_i})$ will densify the computation, so it's not * an ideal formula when the training dataset is sparse format. * * This can be addressed by adding the dense $\bar{x_i} / \hat{x_i}$ terms * in the end by keeping the sum of diff. The first derivative of total * objective function from all the samples is * * *

* $$ * \begin{align} * \frac{\partial L}{\partial w_i} &= * 1/N \sum_j diff_j (x_{ij} - \bar{x_i}) / \hat{x_i} \\ * &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) - diffSum \bar{x_i} / \hat{x_i}) \\ * &= 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) + correction_i) * \end{align} * $$ *

* * where $correction_i = - diffSum \bar{x_i} / \hat{x_i}$ * * A simple math can show that diffSum is actually zero, so we don't even * need to add the correction terms in the end. From the definition of diff, * *

* $$ * \begin{align} * diffSum &= \sum_j (\sum_i w_i(x_{ij} - \bar{x_i}) * / \hat{x_i} - (y_j - \bar{y}) / \hat{y}) \\ * &= N * (\sum_i w_i(\bar{x_i} - \bar{x_i}) / \hat{x_i} - (\bar{y} - \bar{y}) / \hat{y}) \\ * &= 0 * \end{align} * $$ *

* * As a result, the first derivative of the total objective function only depends on * the training dataset, which can be easily computed in distributed fashion, and is * sparse format friendly. * *

* $$ * \frac{\partial L}{\partial w_i} = 1/N ((\sum_j diff_j x_{ij} / \hat{x_i}) * $$ *

* * @param bcCoefficients The broadcast coefficients corresponding to the features. * @param labelStd The standard deviation value of the label. * @param labelMean The mean value of the label. * @param fitIntercept Whether to fit an intercept term. * @param bcFeaturesStd The broadcast standard deviation values of the features. * @param bcFeaturesMean The broadcast mean values of the features. */ private class LeastSquaresAggregator( bcCoefficients: Broadcast[Vector], labelStd: Double, labelMean: Double, fitIntercept: Boolean, bcFeaturesStd: Broadcast[Array[Double]], bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable { private var totalCnt: Long = 0L private var weightSum: Double = 0.0 private var lossSum = 0.0 private val dim = bcCoefficients.value.size // make transient so we do not serialize between aggregation stages @transient private lazy val featuresStd = bcFeaturesStd.value @transient private lazy val effectiveCoefAndOffset = { val coefficientsArray = bcCoefficients.value.toArray.clone() val featuresMean = bcFeaturesMean.value var sum = 0.0 var i = 0 val len = coefficientsArray.length while (i < len) { if (featuresStd(i) != 0.0) { coefficientsArray(i) /= featuresStd(i) sum += coefficientsArray(i) * featuresMean(i) } else { coefficientsArray(i) = 0.0 } i += 1 } val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0 (Vectors.dense(coefficientsArray), offset) } // do not use tuple assignment above because it will circumvent the @transient tag @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1 @transient private lazy val offset = effectiveCoefAndOffset._2 private val gradientSumArray = Array.ofDim[Double](dim) /** * Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient * of the objective function. * * @param instance The instance of data point to be added. * @return This LeastSquaresAggregator object. */ def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => require(dim == features.size, s"Dimensions mismatch when adding new sample." + s" Expecting $dim but got ${features.size}.") require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this val diff = dot(features, effectiveCoefficientsVector) - label / labelStd + offset if (diff != 0) { val localGradientSumArray = gradientSumArray val localFeaturesStd = featuresStd features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index) } } lossSum += weight * diff * diff / 2.0 } totalCnt += 1 weightSum += weight this } } /** * Merge another LeastSquaresAggregator, and update the loss and gradient * of the objective function. * (Note that it's in place merging; as a result, `this` object will be modified.) * * @param other The other LeastSquaresAggregator to be merged. * @return This LeastSquaresAggregator object. */ def merge(other: LeastSquaresAggregator): this.type = { require(dim == other.dim, s"Dimensions mismatch when merging with another " + s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.") if (other.weightSum != 0) { totalCnt += other.totalCnt weightSum += other.weightSum lossSum += other.lossSum var i = 0 val localThisGradientSumArray = this.gradientSumArray val localOtherGradientSumArray = other.gradientSumArray while (i < dim) { localThisGradientSumArray(i) += localOtherGradientSumArray(i) i += 1 } } this } def count: Long = totalCnt def loss: Double = { require(weightSum > 0.0, s"The effective number of instances should be " + s"greater than 0.0, but $weightSum.") lossSum / weightSum } def gradient: Vector = { require(weightSum > 0.0, s"The effective number of instances should be " + s"greater than 0.0, but $weightSum.") val result = Vectors.dense(gradientSumArray.clone()) scal(1.0 / weightSum, result) result } } /** * LeastSquaresCostFun implements Breeze's DiffFunction[T] for Least Squares cost. * It returns the loss and gradient with L2 regularization at a particular point (coefficients). * It's used in Breeze's convex optimization routines. */ private class LeastSquaresCostFun( instances: RDD[Instance], labelStd: Double, labelMean: Double, fitIntercept: Boolean, standardization: Boolean, bcFeaturesStd: Broadcast[Array[Double]], bcFeaturesMean: Broadcast[Array[Double]], effectiveL2regParam: Double, aggregationDepth: Int) extends DiffFunction[BDV[Double]] { override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { val coeffs = Vectors.fromBreeze(coefficients) val bcCoeffs = instances.context.broadcast(coeffs) val localFeaturesStd = bcFeaturesStd.value val leastSquaresAggregator = { val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance) val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2) instances.treeAggregate( new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd, bcFeaturesMean))(seqOp, combOp, aggregationDepth) } val totalGradientArray = leastSquaresAggregator.gradient.toArray bcCoeffs.destroy(blocking = false) val regVal = if (effectiveL2regParam == 0.0) { 0.0 } else { var sum = 0.0 coeffs.foreachActive { (index, value) => // The following code will compute the loss of the regularization; also // the gradient of the regularization, and add back to totalGradientArray. sum += { if (standardization) { totalGradientArray(index) += effectiveL2regParam * value value * value } else { if (localFeaturesStd(index) != 0.0) { // If `standardization` is false, we still standardize the data // to improve the rate of convergence; as a result, we have to // perform this reverse standardization by penalizing each component // differently to get effectively the same objective function when // the training dataset is not standardized. val temp = value / (localFeaturesStd(index) * localFeaturesStd(index)) totalGradientArray(index) += effectiveL2regParam * temp value * temp } else { 0.0 } } } } 0.5 * effectiveL2regParam * sum } (leastSquaresAggregator.loss + regVal, new BDV(totalGradientArray)) } }