aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorsethah <seth.hendrickson16@gmail.com>2016-11-12 01:38:26 +0000
committerDB Tsai <dbtsai@dbtsai.com>2016-11-12 01:38:26 +0000
commit46b2550bcd3690a260b995fd4d024a73b92a0299 (patch)
treedd2cb601f4e6b436e94c39cdea621c7c3fda0a66 /mllib/src/main
parentba23f768f7419039df85530b84258ec31f0c22b4 (diff)
downloadspark-46b2550bcd3690a260b995fd4d024a73b92a0299.tar.gz
spark-46b2550bcd3690a260b995fd4d024a73b92a0299.tar.bz2
spark-46b2550bcd3690a260b995fd4d024a73b92a0299.zip
[SPARK-18060][ML] Avoid unnecessary computation for MLOR
## What changes were proposed in this pull request? Before this patch, the gradient updates for multinomial logistic regression were computed by an outer loop over the number of classes and an inner loop over the number of features. Inside the inner loop, we standardized the feature value (`value / featuresStd(index)`), which means we performed the computation `numFeatures * numClasses` times. We only need to perform that computation `numFeatures` times, however. If we re-order the inner and outer loop, we can avoid this, but then we lose sequential memory access. In this patch, we instead lay out the coefficients in column major order while we train, so that we can avoid the extra computation and retain sequential memory access. We convert back to row-major order when we create the model. ## How was this patch tested? This is an implementation detail only, so the original behavior should be maintained. All tests pass. I ran some performance tests to verify speedups. The results are below, and show significant speedups. ## Performance Tests **Setup** 3 node bare-metal cluster 120 cores total 384 gb RAM total **Results** NOTE: The `currentMasterTime` and `thisPatchTime` are times in seconds for a single iteration of L-BFGS or OWL-QN. | | numPoints | numFeatures | numClasses | regParam | elasticNetParam | currentMasterTime (sec) | thisPatchTime (sec) | pctSpeedup | |----|-------------|---------------|--------------|------------|-------------------|---------------------------|-----------------------|--------------| | 0 | 1e+07 | 100 | 500 | 0.5 | 0 | 90 | 18 | 80 | | 1 | 1e+08 | 100 | 50 | 0.5 | 0 | 90 | 19 | 78 | | 2 | 1e+08 | 100 | 50 | 0.05 | 1 | 72 | 19 | 73 | | 3 | 1e+06 | 100 | 5000 | 0.5 | 0 | 93 | 53 | 43 | | 4 | 1e+07 | 100 | 5000 | 0.5 | 0 | 900 | 390 | 56 | | 5 | 1e+08 | 100 | 500 | 0.5 | 0 | 840 | 174 | 79 | | 6 | 1e+08 | 100 | 200 | 0.5 | 0 | 360 | 72 | 80 | | 7 | 1e+08 | 1000 | 5 | 0.5 | 0 | 9 | 3 | 66 | Author: sethah <seth.hendrickson16@gmail.com> Closes #15593 from sethah/MLOR_PERF_COL_MAJOR_COEF.
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala125
1 files changed, 74 insertions, 51 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index c4651054fd..18b9b3043d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -438,18 +438,14 @@ class LogisticRegression @Since("1.2.0") (
val standardizationParam = $(standardization)
def regParamL1Fun = (index: Int) => {
// Remove the L1 penalization on the intercept
- val isIntercept = $(fitIntercept) && ((index + 1) % numFeaturesPlusIntercept == 0)
+ val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets
if (isIntercept) {
0.0
} else {
if (standardizationParam) {
regParamL1
} else {
- val featureIndex = if ($(fitIntercept)) {
- index % numFeaturesPlusIntercept
- } else {
- index % numFeatures
- }
+ val featureIndex = index / numCoefficientSets
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
@@ -466,6 +462,15 @@ class LogisticRegression @Since("1.2.0") (
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
}
+ /*
+ The coefficients are laid out in column major order during training. e.g. for
+ `numClasses = 3` and `numFeatures = 2` and `fitIntercept = true` the layout is:
+
+ Array(beta_11, beta_21, beta_31, beta_12, beta_22, beta_32, intercept_1, intercept_2,
+ intercept_3)
+
+ where beta_jk corresponds to the coefficient for class `j` and feature `k`.
+ */
val initialCoefficientsWithIntercept =
Vectors.zeros(numCoefficientSets * numFeaturesPlusIntercept)
@@ -489,13 +494,14 @@ class LogisticRegression @Since("1.2.0") (
val initialCoefWithInterceptArray = initialCoefficientsWithIntercept.toArray
val providedCoef = optInitialModel.get.coefficientMatrix
providedCoef.foreachActive { (row, col, value) =>
- val flatIndex = row * numFeaturesPlusIntercept + col
+ // convert matrix to column major for training
+ val flatIndex = col * numCoefficientSets + row
// We need to scale the coefficients since they will be trained in the scaled space
initialCoefWithInterceptArray(flatIndex) = value * featuresStd(col)
}
if ($(fitIntercept)) {
optInitialModel.get.interceptVector.foreachActive { (index, value) =>
- val coefIndex = (index + 1) * numFeaturesPlusIntercept - 1
+ val coefIndex = numCoefficientSets * numFeatures + index
initialCoefWithInterceptArray(coefIndex) = value
}
}
@@ -526,7 +532,7 @@ class LogisticRegression @Since("1.2.0") (
val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i =>
- initialCoefficientsWithIntercept.toArray(i * numFeaturesPlusIntercept + numFeatures) =
+ initialCoefficientsWithIntercept.toArray(numClasses * numFeatures + i) =
rawIntercepts(i) - rawMean
}
} else if ($(fitIntercept)) {
@@ -572,16 +578,20 @@ class LogisticRegression @Since("1.2.0") (
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
+
+ Additionally, since the coefficients were laid out in column major order during training
+ to avoid extra computation, we convert them back to row major before passing them to the
+ model.
+
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawCoefficients = state.x.toArray.clone()
val coefficientArray = Array.tabulate(numCoefficientSets * numFeatures) { i =>
- // flatIndex will loop though rawCoefficients, and skip the intercept terms.
- val flatIndex = if ($(fitIntercept)) i + i / numFeatures else i
+ val colMajorIndex = (i % numFeatures) * numCoefficientSets + i / numFeatures
val featureIndex = i % numFeatures
if (featuresStd(featureIndex) != 0.0) {
- rawCoefficients(flatIndex) / featuresStd(featureIndex)
+ rawCoefficients(colMajorIndex) / featuresStd(featureIndex)
} else {
0.0
}
@@ -618,7 +628,7 @@ class LogisticRegression @Since("1.2.0") (
val interceptsArray: Array[Double] = if ($(fitIntercept)) {
Array.tabulate(numCoefficientSets) { i =>
- val coefIndex = (i + 1) * numFeaturesPlusIntercept - 1
+ val coefIndex = numFeatures * numCoefficientSets + i
rawCoefficients(coefIndex)
}
} else {
@@ -697,6 +707,7 @@ class LogisticRegressionModel private[spark] (
/**
* A vector of model coefficients for "binomial" logistic regression. If this model was trained
* using the "multinomial" family then an exception is thrown.
+ *
* @return Vector
*/
@Since("2.0.0")
@@ -720,6 +731,7 @@ class LogisticRegressionModel private[spark] (
/**
* The model intercept for "binomial" logistic regression. If this model was fit with the
* "multinomial" family then an exception is thrown.
+ *
* @return Double
*/
@Since("1.3.0")
@@ -1389,6 +1401,12 @@ class BinaryLogisticRegressionSummary private[classification] (
* $$
* </blockquote></p>
*
+ * @note In order to avoid unnecessary computation during calculation of the gradient updates
+ * we lay out the coefficients in column major order during training. This allows us to
+ * perform feature standardization once, while still retaining sequential memory access
+ * for speed. We convert back to row major order when we create the model,
+ * since this form is optimal for the matrix operations used for prediction.
+ *
* @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param bcFeaturesStd The broadcast standard deviation values of the features.
* @param numClasses the number of possible outcomes for k classes classification problem in
@@ -1486,23 +1504,25 @@ private class LogisticAggregator(
var marginOfLabel = 0.0
var maxMargin = Double.NegativeInfinity
- val margins = Array.tabulate(numClasses) { i =>
- var margin = 0.0
- features.foreachActive { (index, value) =>
- if (localFeaturesStd(index) != 0.0 && value != 0.0) {
- margin += localCoefficients(i * numFeaturesPlusIntercept + index) *
- value / localFeaturesStd(index)
- }
+ val margins = new Array[Double](numClasses)
+ features.foreachActive { (index, value) =>
+ val stdValue = value / localFeaturesStd(index)
+ var j = 0
+ while (j < numClasses) {
+ margins(j) += localCoefficients(index * numClasses + j) * stdValue
+ j += 1
}
-
+ }
+ var i = 0
+ while (i < numClasses) {
if (fitIntercept) {
- margin += localCoefficients(i * numFeaturesPlusIntercept + numFeatures)
+ margins(i) += localCoefficients(numClasses * numFeatures + i)
}
- if (i == label.toInt) marginOfLabel = margin
- if (margin > maxMargin) {
- maxMargin = margin
+ if (i == label.toInt) marginOfLabel = margins(i)
+ if (margins(i) > maxMargin) {
+ maxMargin = margins(i)
}
- margin
+ i += 1
}
/**
@@ -1510,33 +1530,39 @@ private class LogisticAggregator(
* We address this by subtracting maxMargin from all the margins, so it's guaranteed
* that all of the new margins will be smaller than zero to prevent arithmetic overflow.
*/
+ val multipliers = new Array[Double](numClasses)
val sum = {
var temp = 0.0
- if (maxMargin > 0) {
- for (i <- 0 until numClasses) {
- margins(i) -= maxMargin
- temp += math.exp(margins(i))
- }
- } else {
- for (i <- 0 until numClasses) {
- temp += math.exp(margins(i))
- }
+ var i = 0
+ while (i < numClasses) {
+ if (maxMargin > 0) margins(i) -= maxMargin
+ val exp = math.exp(margins(i))
+ temp += exp
+ multipliers(i) = exp
+ i += 1
}
temp
}
- for (i <- 0 until numClasses) {
- val multiplier = math.exp(margins(i)) / sum - {
- if (label == i) 1.0 else 0.0
- }
- features.foreachActive { (index, value) =>
- if (localFeaturesStd(index) != 0.0 && value != 0.0) {
- localGradientArray(i * numFeaturesPlusIntercept + index) +=
- weight * multiplier * value / localFeaturesStd(index)
+ margins.indices.foreach { i =>
+ multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
+ }
+ features.foreachActive { (index, value) =>
+ if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+ val stdValue = value / localFeaturesStd(index)
+ var j = 0
+ while (j < numClasses) {
+ localGradientArray(index * numClasses + j) +=
+ weight * multipliers(j) * stdValue
+ j += 1
}
}
- if (fitIntercept) {
- localGradientArray(i * numFeaturesPlusIntercept + numFeatures) += weight * multiplier
+ }
+ if (fitIntercept) {
+ var i = 0
+ while (i < numClasses) {
+ localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i)
+ i += 1
}
}
@@ -1637,6 +1663,7 @@ private class LogisticCostFun(
val bcCoeffs = instances.context.broadcast(coeffs)
val featuresStd = bcFeaturesStd.value
val numFeatures = featuresStd.length
+ val numCoefficientSets = if (multinomial) numClasses else 1
val logisticAggregator = {
val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
@@ -1656,7 +1683,7 @@ private class LogisticCostFun(
var sum = 0.0
coeffs.foreachActive { case (index, value) =>
// We do not apply regularization to the intercepts
- val isIntercept = fitIntercept && ((index + 1) % (numFeatures + 1) == 0)
+ val isIntercept = fitIntercept && index >= numCoefficientSets * numFeatures
if (!isIntercept) {
// The following code will compute the loss of the regularization; also
// the gradient of the regularization, and add back to totalGradientArray.
@@ -1665,11 +1692,7 @@ private class LogisticCostFun(
totalGradientArray(index) += regParamL2 * value
value * value
} else {
- val featureIndex = if (fitIntercept) {
- index % (numFeatures + 1)
- } else {
- index % numFeatures
- }
+ val featureIndex = index / numCoefficientSets
if (featuresStd(featureIndex) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to