aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorsethah <seth.hendrickson16@gmail.com>2016-11-20 01:42:37 +0000
committerDB Tsai <dbtsai@dbtsai.com>2016-11-20 01:42:37 +0000
commit856e0042007c789dda4539fb19a5d4580999fbf4 (patch)
tree25c67679bce2bec591dd0f739ba265660a29c5af /mllib/src/main
parentea77c81ec0db27ea4709f71dc080d00167505a7d (diff)
downloadspark-856e0042007c789dda4539fb19a5d4580999fbf4.tar.gz
spark-856e0042007c789dda4539fb19a5d4580999fbf4.tar.bz2
spark-856e0042007c789dda4539fb19a5d4580999fbf4.zip
[SPARK-18456][ML][FOLLOWUP] Use matrix abstraction for coefficients in LogisticRegression training
## What changes were proposed in this pull request? This is a follow up to some of the discussion [here](https://github.com/apache/spark/pull/15593). During LogisticRegression training, we store the coefficients combined with intercepts as a flat vector, but a more natural abstraction is a matrix. Here, we refactor the code to use matrix where possible, which makes the code more readable and greatly simplifies the indexing. Note: We do not use a Breeze matrix for the cost function as was mentioned in the linked PR. This is because LBFGS/OWLQN require an implicit `MutableInnerProductModule[DenseMatrix[Double], Double]` which is not natively defined in Breeze. We would need to extend Breeze in Spark to define it ourselves. Also, we do not modify the `regParamL1Fun` because OWLQN in Breeze requires a `MutableEnumeratedCoordinateField[(Int, Int), DenseVector[Double]]` (since we still use a dense vector for coefficients). Here again we would have to extend Breeze inside Spark. ## How was this patch tested? This is internal code refactoring - the current unit tests passing show us that the change did not break anything. No added functionality in this patch. Author: sethah <seth.hendrickson16@gmail.com> Closes #15893 from sethah/logreg_refactor.
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala115
1 files changed, 53 insertions, 62 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 71a7fe53c1..f58efd36a1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -463,16 +463,11 @@ class LogisticRegression @Since("1.2.0") (
}
/*
- The coefficients are laid out in column major order during training. e.g. for
- `numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is:
-
- Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2,
- intercept_3)
-
- where beta_jk corresponds to the coefficient for class `j` and feature `k`.
+ The coefficients are laid out in column major order during training. Here we initialize
+ a column major matrix of initial coefficients.
*/
- val initialCoefficientsWithIntercept =
- Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)
+ val initialCoefWithInterceptMatrix =
+ Matrices.zeros(numCoefficientSets, numFeaturesPlusIntercept)
val initialModelIsValid = optInitialModel match {
case Some(_initialModel) =>
@@ -491,18 +486,15 @@ class LogisticRegression @Since("1.2.0") (
}
if (initialModelIsValid) {
- val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray
val providedCoef = optInitialModel.get.coefficientMatrix
- providedCoef.foreachActive { (row, col, value) =>
- // convert matrix to column major for training
- val flatIndex = col * numCoefficientSets + row
+ providedCoef.foreachActive { (classIndex, featureIndex, value) =>
// We need to scale the coefficients since they will be trained in the scaled space
- initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
+ initialCoefWithInterceptMatrix.update(classIndex, featureIndex,
+ value * featuresStd(featureIndex))
}
if ($(fitIntercept)) {
- optInitialModel.get.interceptVector.foreachActive { (index, value) =>
- val coefIndex = numCoefficientSets * numFeatures + index
- initialCoefWithInterceptArray(coefIndex) = value
+ optInitialModel.get.interceptVector.foreachActive { (classIndex, value) =>
+ initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value)
}
}
} else if ($(fitIntercept) && isMultinomial) {
@@ -532,8 +524,7 @@ class LogisticRegression @Since("1.2.0") (
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i =>
- initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) =
- rawIntercepts(i) - rawMean
+ initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean)
}
} else if ($(fitIntercept)) {
/*
@@ -549,12 +540,12 @@ class LogisticRegression @Since("1.2.0") (
b = \log{P(1) / P(0)} = \log{count_1 / count_0}
}}}
*/
- initialCoefficientsWithIntercept.toArray(numFeatures) = math.log(
- histogram(1) / histogram(0))
+ initialCoefWithInterceptMatrix.update(0, numFeatures,
+ math.log(histogram(1) / histogram(0)))
}
val states = optimizer.iterations(new CachedDiffFunction(costFun),
- initialCoefficientsWithIntercept.asBreeze.toDenseVector)
+ new BDV[Double](initialCoefWithInterceptMatrix.toArray))
/*
Note that in Logistic Regression, the objective history (loss + regularization)
@@ -586,15 +577,24 @@ class LogisticRegression @Since("1.2.0") (
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
- val rawCoefficients = state.x.toArray.clone()
- val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i =>
- val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures
- val featureIndex = i % numFeatures
- if (featuresStd(featureIndex) != 0.0) {
- rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
- } else {
- 0.0
+ val allCoefficients = state.x.toArray.clone()
+ val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept,
+ allCoefficients)
+ val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures,
+ new Array[Double](numCoefficientSets * numFeatures), isTransposed = true)
+ val interceptVec = if ($(fitIntercept) || !isMultinomial) {
+ Vectors.zeros(numCoefficientSets)
+ } else {
+ Vectors.sparse(numCoefficientSets, Seq())
+ }
+ // separate intercepts and coefficients from the combined matrix
+ allCoefMatrix.foreachActive { (classIndex, featureIndex, value) =>
+ val isIntercept = $(fitIntercept) && (featureIndex == numFeatures)
+ if (!isIntercept && featuresStd(featureIndex) != 0.0) {
+ denseCoefficientMatrix.update(classIndex, featureIndex,
+ value / featuresStd(featureIndex))
}
+ if (isIntercept) interceptVec.toArray(classIndex) = value
}
if ($(regParam) == 0.0 && isMultinomial) {
@@ -607,17 +607,16 @@ class LogisticRegression @Since("1.2.0") (
Friedman, et al. "Regularization Paths for Generalized Linear Models via
Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf
*/
- val coefficientMean = coefficientArray.sum / coefficientArray.length
- coefficientArray.indices.foreach { i => coefficientArray(i) -= coefficientMean}
+ val denseValues = denseCoefficientMatrix.values
+ val coefficientMean = denseValues.sum / denseValues.length
+ denseCoefficientMatrix.update(_ - coefficientMean)
}
- val denseCoefficientMatrix =
- new DenseMatrix(numCoefficientSets, numFeatures, coefficientArray, isTransposed = true)
// TODO: use `denseCoefficientMatrix.compressed` after SPARK-17471
val compressedCoefficientMatrix = if (isMultinomial) {
denseCoefficientMatrix
} else {
- val compressedVector = Vectors.dense(coefficientArray).compressed
+ val compressedVector = Vectors.dense(denseCoefficientMatrix.values).compressed
compressedVector match {
case dv: DenseVector => denseCoefficientMatrix
case sv: SparseVector =>
@@ -626,25 +625,13 @@ class LogisticRegression @Since("1.2.0") (
}
}
- val interceptsArray: Array[Double] = if ($(fitIntercept)) {
- Array.tabulate(numCoefficientSets) { i =>
- val coefIndex = numFeatures * numCoefficientSets + i
- rawCoefficients(coefIndex)
- }
- } else {
- Array.empty[Double]
- }
- val interceptVector = if (interceptsArray.nonEmpty && isMultinomial) {
- // The intercepts are never regularized, so we always center the mean.
- val interceptMean = interceptsArray.sum / numClasses
- interceptsArray.indices.foreach { i => interceptsArray(i) -= interceptMean }
- Vectors.dense(interceptsArray)
- } else if (interceptsArray.length == 1) {
- Vectors.dense(interceptsArray)
- } else {
- Vectors.sparse(numCoefficientSets, Seq())
+ // center the intercepts when using multinomial algorithm
+ if ($(fitIntercept) && isMultinomial) {
+ val interceptArray = interceptVec.toArray
+ val interceptMean = interceptArray.sum / interceptArray.length
+ (0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean }
}
- (compressedCoefficientMatrix, interceptVector.compressed, arrayBuilder.result())
+ (compressedCoefficientMatrix, interceptVec.compressed, arrayBuilder.result())
}
}
@@ -1424,6 +1411,7 @@ private class LogisticAggregator(
private val numFeatures = bcFeaturesStd.value.length
private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
private val coefficientSize = bcCoefficients.value.size
+ private val numCoefficientSets = if (multinomial) numClasses else 1
if (multinomial) {
require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " +
s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize")
@@ -1633,12 +1621,12 @@ private class LogisticAggregator(
lossSum / weightSum
}
- def gradient: Vector = {
+ def gradient: Matrix = {
require(weightSum > 0.0, s"The effective number of instances should be " +
s"greater than 0.0, but $weightSum.")
val result = Vectors.dense(gradientSumArray.clone())
scal(1.0 / weightSum, result)
- result
+ new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray)
}
}
@@ -1664,6 +1652,7 @@ private class LogisticCostFun(
val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length
val numCoefficientSets = if (multinomial) numClasses else 1
+ val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
@@ -1675,24 +1664,25 @@ private class LogisticCostFun(
)(seqOp, combOp, aggregationDepth)
}
- val totalGradientArray = logisticAggregator.gradient.toArray
+ val totalGradientMatrix = logisticAggregator.gradient
+ val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray)
// regVal is the sum of coefficients squares excluding intercept for L2 regularization.
val regVal = if (regParamL2 == 0.0) {
0.0
} else {
var sum = 0.0
- coeffs.foreachActive { case (index, value) =>
+ coefMatrix.foreachActive { case (classIndex, featureIndex, value) =>
// We do not apply regularization to the intercepts
- val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures
+ val isIntercept = fitIntercept && (featureIndex == numFeatures)
if (!isIntercept) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
sum += {
if (standardization) {
- totalGradientArray(index) += regParamL2 * value
+ val gradValue = totalGradientMatrix(classIndex, featureIndex)
+ totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value)
value * value
} else {
- val featureIndex = index / numCoefficientSets
if (featuresStd(featureIndex) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
@@ -1700,7 +1690,8 @@ private class LogisticCostFun(
// differently to get effectively the same objective function when
// the training dataset is not standardized.
val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex))
- totalGradientArray(index) += regParamL2 * temp
+ val gradValue = totalGradientMatrix(classIndex, featureIndex)
+ totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp)
value * temp
} else {
0.0
@@ -1713,6 +1704,6 @@ private class LogisticCostFun(
}
bcCoeffs.destroy(blocking = false)
- (logisticAggregator.loss + regVal, new BDV(totalGradientArray))
+ (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray))
}
}