diff options
author | sethah <seth.hendrickson16@gmail.com> | 2016-11-20 01:42:37 +0000 |
---|---|---|
committer | DB Tsai <dbtsai@dbtsai.com> | 2016-11-20 01:42:37 +0000 |
commit | 856e0042007c789dda4539fb19a5d4580999fbf4 (patch) | |
tree | 25c67679bce2bec591dd0f739ba265660a29c5af /mllib/src/main | |
parent | ea77c81ec0db27ea4709f71dc080d00167505a7d (diff) | |
download | spark-856e0042007c789dda4539fb19a5d4580999fbf4.tar.gz spark-856e0042007c789dda4539fb19a5d4580999fbf4.tar.bz2 spark-856e0042007c789dda4539fb19a5d4580999fbf4.zip |
[SPARK-18456][ML][FOLLOWUP] Use matrix abstraction for coefficients in LogisticRegression training
## What changes were proposed in this pull request?
This is a follow up to some of the discussion [here](https://github.com/apache/spark/pull/15593). During LogisticRegression training, we store the coefficients combined with intercepts as a flat vector, but a more natural abstraction is a matrix. Here, we refactor the code to use matrix where possible, which makes the code more readable and greatly simplifies the indexing.
Note: We do not use a Breeze matrix for the cost function as was mentioned in the linked PR. This is because LBFGS/OWLQN require an implicit `MutableInnerProductModule[DenseMatrix[Double], Double]` which is not natively defined in Breeze. We would need to extend Breeze in Spark to define it ourselves. Also, we do not modify the `regParamL1Fun` because OWLQN in Breeze requires a `MutableEnumeratedCoordinateField[(Int, Int), DenseVector[Double]]` (since we still use a dense vector for coefficients). Here again we would have to extend Breeze inside Spark.
## How was this patch tested?
This is internal code refactoring - the current unit tests passing show us that the change did not break anything. No added functionality in this patch.
Author: sethah <seth.hendrickson16@gmail.com>
Closes #15893 from sethah/logreg_refactor.
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala | 115 |
1 files changed, 53 insertions, 62 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 71a7fe53c1..f58efd36a1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -463,16 +463,11 @@ class LogisticRegression @Since("1.2.0") ( } /* - The coefficients are laid out in column major order during training. e.g. for - `numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is: - - Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2, - intercept_3) - - where beta_jk corresponds to the coefficient for class `j` and feature `k`. + The coefficients are laid out in column major order during training. Here we initialize + a column major matrix of initial coefficients. */ - val initialCoefficientsWithIntercept = - Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept) + val initialCoefWithInterceptMatrix = + Matrices.zeros(numCoefficientSets, numFeaturesPlusIntercept) val initialModelIsValid = optInitialModel match { case Some(_initialModel) => @@ -491,18 +486,15 @@ class LogisticRegression @Since("1.2.0") ( } if (initialModelIsValid) { - val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray val providedCoef = optInitialModel.get.coefficientMatrix - providedCoef.foreachActive { (row, col, value) => - // convert matrix to column major for training - val flatIndex = col * numCoefficientSets + row + providedCoef.foreachActive { (classIndex, featureIndex, value) => // We need to scale the coefficients since they will be trained in the scaled space - initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col) + initialCoefWithInterceptMatrix.update(classIndex, featureIndex, + value * featuresStd(featureIndex)) } if ($(fitIntercept)) { - optInitialModel.get.interceptVector.foreachActive { (index, value) => - val coefIndex = numCoefficientSets * numFeatures + index - initialCoefWithInterceptArray(coefIndex) = value + optInitialModel.get.interceptVector.foreachActive { (classIndex, value) => + initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value) } } } else if ($(fitIntercept) && isMultinomial) { @@ -532,8 +524,7 @@ class LogisticRegression @Since("1.2.0") ( val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing val rawMean = rawIntercepts.sum / rawIntercepts.length rawIntercepts.indices.foreach { i => - initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) = - rawIntercepts(i) - rawMean + initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean) } } else if ($(fitIntercept)) { /* @@ -549,12 +540,12 @@ class LogisticRegression @Since("1.2.0") ( b = \log{P(1) / P(0)} = \log{count_1 / count_0} }}} */ - initialCoefficientsWithIntercept.toArray(numFeatures) = math.log( - histogram(1) / histogram(0)) + initialCoefWithInterceptMatrix.update(0, numFeatures, + math.log(histogram(1) / histogram(0))) } val states = optimizer.iterations(new CachedDiffFunction(costFun), - initialCoefficientsWithIntercept.asBreeze.toDenseVector) + new BDV[Double](initialCoefWithInterceptMatrix.toArray)) /* Note that in Logistic Regression, the objective history (loss + regularization) @@ -586,15 +577,24 @@ class LogisticRegression @Since("1.2.0") ( Note that the intercept in scaled space and original space is the same; as a result, no scaling is needed. */ - val rawCoefficients = state.x.toArray.clone() - val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i => - val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures - val featureIndex = i % numFeatures - if (featuresStd(featureIndex) != 0.0) { - rawCoefficients(colMajorIndex) / featuresStd(featureIndex) - } else { - 0.0 + val allCoefficients = state.x.toArray.clone() + val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, + allCoefficients) + val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures, + new Array[Double](numCoefficientSets * numFeatures), isTransposed = true) + val interceptVec = if ($(fitIntercept) || !isMultinomial) { + Vectors.zeros(numCoefficientSets) + } else { + Vectors.sparse(numCoefficientSets, Seq()) + } + // separate intercepts and coefficients from the combined matrix + allCoefMatrix.foreachActive { (classIndex, featureIndex, value) => + val isIntercept = $(fitIntercept) && (featureIndex == numFeatures) + if (!isIntercept && featuresStd(featureIndex) != 0.0) { + denseCoefficientMatrix.update(classIndex, featureIndex, + value / featuresStd(featureIndex)) } + if (isIntercept) interceptVec.toArray(classIndex) = value } if ($(regParam) == 0.0 && isMultinomial) { @@ -607,17 +607,16 @@ class LogisticRegression @Since("1.2.0") ( Friedman, et al. "Regularization Paths for Generalized Linear Models via Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf */ - val coefficientMean = coefficientArray.sum / coefficientArray.length - coefficientArray.indices.foreach { i => coefficientArray(i) -= coefficientMean} + val denseValues = denseCoefficientMatrix.values + val coefficientMean = denseValues.sum / denseValues.length + denseCoefficientMatrix.update(_ - coefficientMean) } - val denseCoefficientMatrix = - new DenseMatrix(numCoefficientSets, numFeatures, coefficientArray, isTransposed = true) // TODO: use `denseCoefficientMatrix.compressed` after SPARK-17471 val compressedCoefficientMatrix = if (isMultinomial) { denseCoefficientMatrix } else { - val compressedVector = Vectors.dense(coefficientArray).compressed + val compressedVector = Vectors.dense(denseCoefficientMatrix.values).compressed compressedVector match { case dv: DenseVector => denseCoefficientMatrix case sv: SparseVector => @@ -626,25 +625,13 @@ class LogisticRegression @Since("1.2.0") ( } } - val interceptsArray: Array[Double] = if ($(fitIntercept)) { - Array.tabulate(numCoefficientSets) { i => - val coefIndex = numFeatures * numCoefficientSets + i - rawCoefficients(coefIndex) - } - } else { - Array.empty[Double] - } - val interceptVector = if (interceptsArray.nonEmpty && isMultinomial) { - // The intercepts are never regularized, so we always center the mean. - val interceptMean = interceptsArray.sum / numClasses - interceptsArray.indices.foreach { i => interceptsArray(i) -= interceptMean } - Vectors.dense(interceptsArray) - } else if (interceptsArray.length == 1) { - Vectors.dense(interceptsArray) - } else { - Vectors.sparse(numCoefficientSets, Seq()) + // center the intercepts when using multinomial algorithm + if ($(fitIntercept) && isMultinomial) { + val interceptArray = interceptVec.toArray + val interceptMean = interceptArray.sum / interceptArray.length + (0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean } } - (compressedCoefficientMatrix, interceptVector.compressed, arrayBuilder.result()) + (compressedCoefficientMatrix, interceptVec.compressed, arrayBuilder.result()) } } @@ -1424,6 +1411,7 @@ private class LogisticAggregator( private val numFeatures = bcFeaturesStd.value.length private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures private val coefficientSize = bcCoefficients.value.size + private val numCoefficientSets = if (multinomial) numClasses else 1 if (multinomial) { require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") @@ -1633,12 +1621,12 @@ private class LogisticAggregator( lossSum / weightSum } - def gradient: Vector = { + def gradient: Matrix = { require(weightSum > 0.0, s"The effective number of instances should be " + s"greater than 0.0, but $weightSum.") val result = Vectors.dense(gradientSumArray.clone()) scal(1.0 / weightSum, result) - result + new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray) } } @@ -1664,6 +1652,7 @@ private class LogisticCostFun( val featuresStd = bcFeaturesStd.value val numFeatures = featuresStd.length val numCoefficientSets = if (multinomial) numClasses else 1 + val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures val logisticAggregator = { val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) @@ -1675,24 +1664,25 @@ private class LogisticCostFun( )(seqOp, combOp, aggregationDepth) } - val totalGradientArray = logisticAggregator.gradient.toArray + val totalGradientMatrix = logisticAggregator.gradient + val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray) // regVal is the sum of coefficients squares excluding intercept for L2 regularization. val regVal = if (regParamL2 == 0.0) { 0.0 } else { var sum = 0.0 - coeffs.foreachActive { case (index, value) => + coefMatrix.foreachActive { case (classIndex, featureIndex, value) => // We do not apply regularization to the intercepts - val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures + val isIntercept = fitIntercept && (featureIndex == numFeatures) if (!isIntercept) { // The following code will compute the loss of the regularization; also // the gradient of the regularization, and add back to totalGradientArray. sum += { if (standardization) { - totalGradientArray(index) += regParamL2 * value + val gradValue = totalGradientMatrix(classIndex, featureIndex) + totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value) value * value } else { - val featureIndex = index / numCoefficientSets if (featuresStd(featureIndex) != 0.0) { // If `standardization` is false, we still standardize the data // to improve the rate of convergence; as a result, we have to @@ -1700,7 +1690,8 @@ private class LogisticCostFun( // differently to get effectively the same objective function when // the training dataset is not standardized. val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex)) - totalGradientArray(index) += regParamL2 * temp + val gradValue = totalGradientMatrix(classIndex, featureIndex) + totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp) value * temp } else { 0.0 @@ -1713,6 +1704,6 @@ private class LogisticCostFun( } bcCoeffs.destroy(blocking = false) - (logisticAggregator.loss + regVal, new BDV(totalGradientArray)) + (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray)) } } |