aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala65
1 files changed, 45 insertions, 20 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 6d5e398dfe..76be4204e9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{Vector, Vectors}
@@ -82,6 +83,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set the regularization parameter.
* Default is 0.0.
+ *
* @group setParam
*/
@Since("1.3.0")
@@ -91,6 +93,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set if we should fit the intercept
* Default is true.
+ *
* @group setParam
*/
@Since("1.5.0")
@@ -104,6 +107,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* the models should be always converged to the same solution when no regularization
* is applied. In R's GLMNET package, the default behavior is true as well.
* Default is true.
+ *
* @group setParam
*/
@Since("1.5.0")
@@ -115,6 +119,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
* For 0 < alpha < 1, the penalty is a combination of L1 and L2.
* Default is 0.0 which is an L2 penalty.
+ *
* @group setParam
*/
@Since("1.4.0")
@@ -124,6 +129,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set the maximum number of iterations.
* Default is 100.
+ *
* @group setParam
*/
@Since("1.3.0")
@@ -134,6 +140,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* Set the convergence tolerance of iterations.
* Smaller value will lead to higher accuracy with the cost of more iterations.
* Default is 1E-6.
+ *
* @group setParam
*/
@Since("1.4.0")
@@ -144,6 +151,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* Whether to over-/under-sample training instances according to the given weights in weightCol.
* If not set or empty, all instances are treated equally (weight 1.0).
* Default is not set, so all instances have weight one.
+ *
* @group setParam
*/
@Since("1.6.0")
@@ -157,6 +165,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* solution to the linear regression problem.
* The default value is "auto" which means that the solver algorithm is
* selected automatically.
+ *
* @group setParam
*/
@Since("1.6.0")
@@ -270,6 +279,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean)
val featuresMean = featuresSummarizer.mean.toArray
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+ val bcFeaturesMean = instances.context.broadcast(featuresMean)
+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
@@ -285,7 +296,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
- $(standardization), featuresStd, featuresMean, effectiveL2RegParam)
+ $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam)
val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -330,6 +341,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
throw new SparkException(msg)
}
+ bcFeaturesMean.destroy(blocking = false)
+ bcFeaturesStd.destroy(blocking = false)
+
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
@@ -419,6 +433,7 @@ class LinearRegressionModel private[ml] (
/**
* Evaluates the model on a test dataset.
+ *
* @param dataset Test dataset to evaluate model on.
*/
@Since("2.0.0")
@@ -544,6 +559,7 @@ class LinearRegressionTrainingSummary private[regression] (
* Number of training iterations until termination
*
* This value is only available when using the "l-bfgs" solver.
+ *
* @see [[LinearRegression.solver]]
*/
@Since("1.5.0")
@@ -862,27 +878,31 @@ class LinearRegressionSummary private[regression] (
* $$
* </blockquote></p>
*
- * @param coefficients The coefficients corresponding to the features.
+ * @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param labelStd The standard deviation value of the label.
* @param labelMean The mean value of the label.
* @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
*/
private class LeastSquaresAggregator(
- coefficients: Vector,
+ bcCoefficients: Broadcast[Vector],
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
- featuresStd: Array[Double],
- featuresMean: Array[Double]) extends Serializable {
+ bcFeaturesStd: Broadcast[Array[Double]],
+ bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
private var totalCnt: Long = 0L
private var weightSum: Double = 0.0
private var lossSum = 0.0
- private val (effectiveCoefficientsArray: Array[Double], offset: Double, dim: Int) = {
- val coefficientsArray = coefficients.toArray.clone()
+ private val dim = bcCoefficients.value.size
+ // make transient so we do not serialize between aggregation stages
+ @transient private lazy val featuresStd = bcFeaturesStd.value
+ @transient private lazy val effectiveCoefAndOffset = {
+ val coefficientsArray = bcCoefficients.value.toArray.clone()
+ val featuresMean = bcFeaturesMean.value
var sum = 0.0
var i = 0
val len = coefficientsArray.length
@@ -896,10 +916,11 @@ private class LeastSquaresAggregator(
i += 1
}
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
- (coefficientsArray, offset, coefficientsArray.length)
+ (Vectors.dense(coefficientsArray), offset)
}
-
- private val effectiveCoefficientsVector = Vectors.dense(effectiveCoefficientsArray)
+ // do not use tuple assignment above because it will circumvent the @transient tag
+ @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
+ @transient private lazy val offset = effectiveCoefAndOffset._2
private val gradientSumArray = Array.ofDim[Double](dim)
@@ -922,9 +943,10 @@ private class LeastSquaresAggregator(
if (diff != 0) {
val localGradientSumArray = gradientSumArray
+ val localFeaturesStd = featuresStd
features.foreachActive { (index, value) =>
- if (featuresStd(index) != 0.0 && value != 0.0) {
- localGradientSumArray(index) += weight * diff * value / featuresStd(index)
+ if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+ localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index)
}
}
lossSum += weight * diff * diff / 2.0
@@ -992,23 +1014,26 @@ private class LeastSquaresCostFun(
labelMean: Double,
fitIntercept: Boolean,
standardization: Boolean,
- featuresStd: Array[Double],
- featuresMean: Array[Double],
+ bcFeaturesStd: Broadcast[Array[Double]],
+ bcFeaturesMean: Broadcast[Array[Double]],
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
+ val bcCoeffs = instances.context.broadcast(coeffs)
+ val localFeaturesStd = bcFeaturesStd.value
val leastSquaresAggregator = {
val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2)
instances.treeAggregate(
- new LeastSquaresAggregator(coeffs, labelStd, labelMean, fitIntercept, featuresStd,
- featuresMean))(seqOp, combOp)
+ new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd,
+ bcFeaturesMean))(seqOp, combOp)
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray
+ bcCoeffs.destroy(blocking = false)
val regVal = if (effectiveL2regParam == 0.0) {
0.0
@@ -1022,13 +1047,13 @@ private class LeastSquaresCostFun(
totalGradientArray(index) += effectiveL2regParam * value
value * value
} else {
- if (featuresStd(index) != 0.0) {
+ if (localFeaturesStd(index) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
- val temp = value / (featuresStd(index) * featuresStd(index))
+ val temp = value / (localFeaturesStd(index) * localFeaturesStd(index))
totalGradientArray(index) += effectiveL2regParam * temp
value * temp
} else {