aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorsethah <seth.hendrickson16@gmail.com>2016-08-08 00:00:15 -0700
committerDB Tsai <dbt@netflix.com>2016-08-08 00:00:15 -0700
commit1db1c6567bae0c80fdc522f2cbb65557cd62263f (patch)
tree493f86413f3e7fe5248b95fb270aee5a7739be32 /mllib
parente076fb05ac83a3ed6995e29bb03ea07ea05e39db (diff)
downloadspark-1db1c6567bae0c80fdc522f2cbb65557cd62263f.tar.gz
spark-1db1c6567bae0c80fdc522f2cbb65557cd62263f.tar.bz2
spark-1db1c6567bae0c80fdc522f2cbb65557cd62263f.zip
[SPARK-16404][ML] LeastSquaresAggregators serializes unnecessary data
## What changes were proposed in this pull request? Similar to `LogisticAggregator`, `LeastSquaresAggregator` used for linear regression ends up serializing the coefficients and the features standard deviations, which is not necessary and can cause performance issues for high dimensional data. This patch removes this serialization. In https://github.com/apache/spark/pull/13729 the approach was to pass these values directly to the add method. The approach used here, initially, is to mark these fields as transient instead which gives the benefit of keeping the signature of the add method simple and interpretable. The downside is that it requires the use of `transient lazy val`s which are difficult to reason about if one is not quite familiar with serialization in Scala/Spark. ## How was this patch tested? **MLlib** ![image](https://cloud.githubusercontent.com/assets/7275795/16703660/436f79fa-4524-11e6-9022-ef00058ec718.png) **ML without patch** ![image](https://cloud.githubusercontent.com/assets/7275795/16703831/c4d50b9e-4525-11e6-80cb-9b58c850cd41.png) **ML with patch** ![image](https://cloud.githubusercontent.com/assets/7275795/16703675/63e0cf40-4524-11e6-9120-1f512a70e083.png) Author: sethah <seth.hendrickson16@gmail.com> Closes #14109 from sethah/LIR_serialize.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala65
1 files changed, 45 insertions, 20 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 6d5e398dfe..76be4204e9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{Vector, Vectors}
@@ -82,6 +83,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set the regularization parameter.
* Default is 0.0.
+ *
* @group setParam
*/
@Since("1.3.0")
@@ -91,6 +93,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set if we should fit the intercept
* Default is true.
+ *
* @group setParam
*/
@Since("1.5.0")
@@ -104,6 +107,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* the models should be always converged to the same solution when no regularization
* is applied. In R's GLMNET package, the default behavior is true as well.
* Default is true.
+ *
* @group setParam
*/
@Since("1.5.0")
@@ -115,6 +119,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
* For 0 < alpha < 1, the penalty is a combination of L1 and L2.
* Default is 0.0 which is an L2 penalty.
+ *
* @group setParam
*/
@Since("1.4.0")
@@ -124,6 +129,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set the maximum number of iterations.
* Default is 100.
+ *
* @group setParam
*/
@Since("1.3.0")
@@ -134,6 +140,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* Set the convergence tolerance of iterations.
* Smaller value will lead to higher accuracy with the cost of more iterations.
* Default is 1E-6.
+ *
* @group setParam
*/
@Since("1.4.0")
@@ -144,6 +151,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* Whether to over-/under-sample training instances according to the given weights in weightCol.
* If not set or empty, all instances are treated equally (weight 1.0).
* Default is not set, so all instances have weight one.
+ *
* @group setParam
*/
@Since("1.6.0")
@@ -157,6 +165,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* solution to the linear regression problem.
* The default value is "auto" which means that the solver algorithm is
* selected automatically.
+ *
* @group setParam
*/
@Since("1.6.0")
@@ -270,6 +279,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean)
val featuresMean = featuresSummarizer.mean.toArray
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+ val bcFeaturesMean = instances.context.broadcast(featuresMean)
+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
@@ -285,7 +296,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
- $(standardization), featuresStd, featuresMean, effectiveL2RegParam)
+ $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam)
val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -330,6 +341,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
throw new SparkException(msg)
}
+ bcFeaturesMean.destroy(blocking = false)
+ bcFeaturesStd.destroy(blocking = false)
+
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
@@ -419,6 +433,7 @@ class LinearRegressionModel private[ml] (
/**
* Evaluates the model on a test dataset.
+ *
* @param dataset Test dataset to evaluate model on.
*/
@Since("2.0.0")
@@ -544,6 +559,7 @@ class LinearRegressionTrainingSummary private[regression] (
* Number of training iterations until termination
*
* This value is only available when using the "l-bfgs" solver.
+ *
* @see [[LinearRegression.solver]]
*/
@Since("1.5.0")
@@ -862,27 +878,31 @@ class LinearRegressionSummary private[regression] (
* $$
* </blockquote></p>
*
- * @param coefficients The coefficients corresponding to the features.
+ * @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param labelStd The standard deviation value of the label.
* @param labelMean The mean value of the label.
* @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
*/
private class LeastSquaresAggregator(
- coefficients: Vector,
+ bcCoefficients: Broadcast[Vector],
labelStd: Double,
labelMean: Double,
fitIntercept: Boolean,
- featuresStd: Array[Double],
- featuresMean: Array[Double]) extends Serializable {
+ bcFeaturesStd: Broadcast[Array[Double]],
+ bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
private var totalCnt: Long = 0L
private var weightSum: Double = 0.0
private var lossSum = 0.0
- private val (effectiveCoefficientsArray: Array[Double], offset: Double, dim: Int) = {
- val coefficientsArray = coefficients.toArray.clone()
+ private val dim = bcCoefficients.value.size
+ // make transient so we do not serialize between aggregation stages
+ @transient private lazy val featuresStd = bcFeaturesStd.value
+ @transient private lazy val effectiveCoefAndOffset = {
+ val coefficientsArray = bcCoefficients.value.toArray.clone()
+ val featuresMean = bcFeaturesMean.value
var sum = 0.0
var i = 0
val len = coefficientsArray.length
@@ -896,10 +916,11 @@ private class LeastSquaresAggregator(
i += 1
}
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
- (coefficientsArray, offset, coefficientsArray.length)
+ (Vectors.dense(coefficientsArray), offset)
}
-
- private val effectiveCoefficientsVector = Vectors.dense(effectiveCoefficientsArray)
+ // do not use tuple assignment above because it will circumvent the @transient tag
+ @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
+ @transient private lazy val offset = effectiveCoefAndOffset._2
private val gradientSumArray = Array.ofDim[Double](dim)
@@ -922,9 +943,10 @@ private class LeastSquaresAggregator(
if (diff != 0) {
val localGradientSumArray = gradientSumArray
+ val localFeaturesStd = featuresStd
features.foreachActive { (index, value) =>
- if (featuresStd(index) != 0.0 && value != 0.0) {
- localGradientSumArray(index) += weight * diff * value / featuresStd(index)
+ if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+ localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index)
}
}
lossSum += weight * diff * diff / 2.0
@@ -992,23 +1014,26 @@ private class LeastSquaresCostFun(
labelMean: Double,
fitIntercept: Boolean,
standardization: Boolean,
- featuresStd: Array[Double],
- featuresMean: Array[Double],
+ bcFeaturesStd: Broadcast[Array[Double]],
+ bcFeaturesMean: Broadcast[Array[Double]],
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
+ val bcCoeffs = instances.context.broadcast(coeffs)
+ val localFeaturesStd = bcFeaturesStd.value
val leastSquaresAggregator = {
val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2)
instances.treeAggregate(
- new LeastSquaresAggregator(coeffs, labelStd, labelMean, fitIntercept, featuresStd,
- featuresMean))(seqOp, combOp)
+ new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd,
+ bcFeaturesMean))(seqOp, combOp)
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray
+ bcCoeffs.destroy(blocking = false)
val regVal = if (effectiveL2regParam == 0.0) {
0.0
@@ -1022,13 +1047,13 @@ private class LeastSquaresCostFun(
totalGradientArray(index) += effectiveL2regParam * value
value * value
} else {
- if (featuresStd(index) != 0.0) {
+ if (localFeaturesStd(index) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
- val temp = value / (featuresStd(index) * featuresStd(index))
+ val temp = value / (localFeaturesStd(index) * localFeaturesStd(index))
totalGradientArray(index) += effectiveL2regParam * temp
value * temp
} else {