aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorsethah <seth.hendrickson16@gmail.com>2017-03-02 19:38:25 -0800
committerYanbo Liang <ybliang8@gmail.com>2017-03-02 19:38:25 -0800
commit93ae176e8943d6b346c80deea778bffd188366a1 (patch)
treedf31b5a793a12aa9b4840a3a246f839cddf09e11 /mllib
parent8417a7ae6c0ea3fb8dc41bc492fc9513d1ad24af (diff)
downloadspark-93ae176e8943d6b346c80deea778bffd188366a1.tar.gz
spark-93ae176e8943d6b346c80deea778bffd188366a1.tar.bz2
spark-93ae176e8943d6b346c80deea778bffd188366a1.zip
[SPARK-19745][ML] SVCAggregator captures coefficients in its closure
## What changes were proposed in this pull request? JIRA: [SPARK-19745](https://issues.apache.org/jira/browse/SPARK-19745) Reorganize SVCAggregator to avoid serializing coefficients. This patch also makes the gradient array a `lazy val` which will avoid materializing a large array on the driver before shipping the class to the executors. This improvement stems from https://github.com/apache/spark/pull/16037. Actually, probably all ML aggregators can benefit from this. We can either: a.) separate the gradient improvement into another patch b.) keep what's here _plus_ add the lazy evaluation to all other aggregators in this patch or c.) keep it as is. ## How was this patch tested? This is an interesting question! I don't know of a reasonable way to test this right now. Ideally, we could perform an optimization and look at the shuffle write data for each task, and we could compare the size to what it we know it should be: `numCoefficients * 8 bytes`. Not sure if there is a good way to do that right now? We could discuss this here or in another JIRA, but I suspect it would be a significant undertaking. Author: sethah <seth.hendrickson16@gmail.com> Closes #17076 from sethah/svc_agg.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala29
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala17
6 files changed, 34 insertions, 24 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index bf6e76d7ac..f76b14eeeb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -440,19 +440,14 @@ private class LinearSVCAggregator(
private val numFeatures: Int = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
- private val coefficients: Vector = bcCoefficients.value
private var weightSum: Double = 0.0
private var lossSum: Double = 0.0
- require(numFeaturesPlusIntercept == coefficients.size, s"Dimension mismatch. Coefficients " +
- s"length ${coefficients.size}, FeaturesStd length ${numFeatures}, fitIntercept: $fitIntercept")
-
- private val coefficientsArray = coefficients match {
- case dv: DenseVector => dv.values
- case _ =>
- throw new IllegalArgumentException(
- s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
+ @transient private lazy val coefficientsArray = bcCoefficients.value match {
+ case DenseVector(values) => values
+ case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
+ s" but got type ${bcCoefficients.value.getClass}.")
}
- private val gradientSumArray = Array.fill[Double](coefficientsArray.length)(0)
+ private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept)
/**
* Add a new training instance to this LinearSVCAggregator, and update the loss and gradient
@@ -463,6 +458,9 @@ private class LinearSVCAggregator(
*/
def add(instance: Instance): this.type = {
instance match { case Instance(label, weight, features) =>
+ require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
+ require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." +
+ s" Expecting $numFeatures but got ${features.size}.")
if (weight == 0.0) return this
val localFeaturesStd = bcFeaturesStd.value
val localCoefficients = coefficientsArray
@@ -530,18 +528,15 @@ private class LinearSVCAggregator(
this
}
- def loss: Double = {
- if (weightSum != 0) {
- lossSum / weightSum
- } else 0.0
- }
+ def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0
def gradient: Vector = {
if (weightSum != 0) {
val result = Vectors.dense(gradientSumArray.clone())
scal(1.0 / weightSum, result)
result
- } else Vectors.dense(Array.fill[Double](coefficientsArray.length)(0))
+ } else {
+ Vectors.dense(new Array[Double](numFeaturesPlusIntercept))
+ }
}
-
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 738b35135f..1a78187d4f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1436,7 +1436,7 @@ private class LogisticAggregator(
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " +
s"got type ${bcCoefficients.value.getClass}.)")
}
- private val gradientSumArray = new Array[Double](coefficientSize)
+ private lazy val gradientSumArray = new Array[Double](coefficientSize)
if (multinomial && numClasses <= 2) {
logInfo(s"Multinomial logistic regression for binary classification yields separate " +
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index ea2dc6cfd8..a9c1a7ba0b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -580,10 +580,10 @@ private class ExpectationAggregator(
private val k: Int = bcWeights.value.length
private var totalCnt: Long = 0L
private var newLogLikelihood: Double = 0.0
- private val newWeights: Array[Double] = new Array[Double](k)
- private val newMeans: Array[DenseVector] = Array.fill(k)(
+ private lazy val newWeights: Array[Double] = new Array[Double](k)
+ private lazy val newMeans: Array[DenseVector] = Array.fill(k)(
new DenseVector(Array.fill[Double](numFeatures)(0.0)))
- private val newCovs: Array[DenseVector] = Array.fill(k)(
+ private lazy val newCovs: Array[DenseVector] = Array.fill(k)(
new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 2)(0.0)))
@transient private lazy val oldGaussians = {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 4b3608330c..094853b6f4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -526,7 +526,7 @@ private class AFTAggregator(
private var totalCnt: Long = 0L
private var lossSum = 0.0
// Here we optimize loss function over log(sigma), intercept and coefficients
- private val gradientSumArray = Array.ofDim[Double](length)
+ private lazy val gradientSumArray = Array.ofDim[Double](length)
def count: Long = totalCnt
def loss: Double = {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 2de7e81d8d..45df1d9be6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -959,7 +959,7 @@ private class LeastSquaresAggregator(
@transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
@transient private lazy val offset = effectiveCoefAndOffset._2
- private val gradientSumArray = Array.ofDim[Double](dim)
+ private lazy val gradientSumArray = Array.ofDim[Double](dim)
/**
* Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
index ee2aefee7a..a165d8a934 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
@@ -23,7 +23,7 @@ import breeze.linalg.{DenseVector => BDV}
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.classification.LinearSVCSuite._
-import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.feature.{Instance, LabeledPoint}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
@@ -123,6 +123,21 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
assert(model2.intercept !== 0.0)
}
+ test("sparse coefficients in SVCAggregator") {
+ val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
+ val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
+ val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true)
+ val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") {
+ intercept[IllegalArgumentException] {
+ agg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
+ }
+ }
+ assert(thrown.getMessage.contains("coefficients only supports dense"))
+
+ bcCoefficients.destroy(blocking = false)
+ bcFeaturesStd.destroy(blocking = false)
+ }
+
test("linearSVC with sample weights") {
def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = {
assert(m1.coefficients ~== m2.coefficients absTol 0.05)