diff options
Diffstat (limited to 'mllib')
6 files changed, 34 insertions, 24 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index bf6e76d7ac..f76b14eeeb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -440,19 +440,14 @@ private class LinearSVCAggregator( private val numFeatures: Int = bcFeaturesStd.value.length private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures - private val coefficients: Vector = bcCoefficients.value private var weightSum: Double = 0.0 private var lossSum: Double = 0.0 - require(numFeaturesPlusIntercept == coefficients.size, s"Dimension mismatch. Coefficients " + - s"length ${coefficients.size}, FeaturesStd length ${numFeatures}, fitIntercept: $fitIntercept") - - private val coefficientsArray = coefficients match { - case dv: DenseVector => dv.values - case _ => - throw new IllegalArgumentException( - s"coefficients only supports dense vector but got type ${coefficients.getClass}.") + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") } - private val gradientSumArray = Array.fill[Double](coefficientsArray.length)(0) + private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept) /** * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient @@ -463,6 +458,9 @@ private class LinearSVCAggregator( */ def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + + s" Expecting $numFeatures but got ${features.size}.") if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray @@ -530,18 +528,15 @@ private class LinearSVCAggregator( this } - def loss: Double = { - if (weightSum != 0) { - lossSum / weightSum - } else 0.0 - } + def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0 def gradient: Vector = { if (weightSum != 0) { val result = Vectors.dense(gradientSumArray.clone()) scal(1.0 / weightSum, result) result - } else Vectors.dense(Array.fill[Double](coefficientsArray.length)(0)) + } else { + Vectors.dense(new Array[Double](numFeaturesPlusIntercept)) + } } - } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 738b35135f..1a78187d4f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -1436,7 +1436,7 @@ private class LogisticAggregator( case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + s"got type ${bcCoefficients.value.getClass}.)") } - private val gradientSumArray = new Array[Double](coefficientSize) + private lazy val gradientSumArray = new Array[Double](coefficientSize) if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index ea2dc6cfd8..a9c1a7ba0b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -580,10 +580,10 @@ private class ExpectationAggregator( private val k: Int = bcWeights.value.length private var totalCnt: Long = 0L private var newLogLikelihood: Double = 0.0 - private val newWeights: Array[Double] = new Array[Double](k) - private val newMeans: Array[DenseVector] = Array.fill(k)( + private lazy val newWeights: Array[Double] = new Array[Double](k) + private lazy val newMeans: Array[DenseVector] = Array.fill(k)( new DenseVector(Array.fill[Double](numFeatures)(0.0))) - private val newCovs: Array[DenseVector] = Array.fill(k)( + private lazy val newCovs: Array[DenseVector] = Array.fill(k)( new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 2)(0.0))) @transient private lazy val oldGaussians = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 4b3608330c..094853b6f4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -526,7 +526,7 @@ private class AFTAggregator( private var totalCnt: Long = 0L private var lossSum = 0.0 // Here we optimize loss function over log(sigma), intercept and coefficients - private val gradientSumArray = Array.ofDim[Double](length) + private lazy val gradientSumArray = Array.ofDim[Double](length) def count: Long = totalCnt def loss: Double = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 2de7e81d8d..45df1d9be6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -959,7 +959,7 @@ private class LeastSquaresAggregator( @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1 @transient private lazy val offset = effectiveCoefAndOffset._2 - private val gradientSumArray = Array.ofDim[Double](dim) + private lazy val gradientSumArray = Array.ofDim[Double](dim) /** * Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index ee2aefee7a..a165d8a934 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -23,7 +23,7 @@ import breeze.linalg.{DenseVector => BDV} import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} @@ -123,6 +123,21 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model2.intercept !== 0.0) } + test("sparse coefficients in SVCAggregator") { + val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) + val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true) + val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrown.getMessage.contains("coefficients only supports dense")) + + bcCoefficients.destroy(blocking = false) + bcFeaturesStd.destroy(blocking = false) + } + test("linearSVC with sample weights") { def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = { assert(m1.coefficients ~== m2.coefficients absTol 0.05) |