aboutsummaryrefslogtreecommitdiff
path: root/mllib/src
diff options
context:
space:
mode:
authorsethah <seth.hendrickson16@gmail.com>2017-02-28 00:34:38 +0000
committerDB Tsai <dbtsai@dbtsai.com>2017-02-28 00:34:38 +0000
commit16d8472f74313b0d1932b69a9807fe0f2ad7057c (patch)
tree516afb4c6e09b62f6d05a252ca84c30554d75c85 /mllib/src
parent8a5a58506c35f35f41cd1366ee693abec2916153 (diff)
downloadspark-16d8472f74313b0d1932b69a9807fe0f2ad7057c.tar.gz
spark-16d8472f74313b0d1932b69a9807fe0f2ad7057c.tar.bz2
spark-16d8472f74313b0d1932b69a9807fe0f2ad7057c.zip
[SPARK-19746][ML] Faster indexing for logistic aggregator
## What changes were proposed in this pull request? JIRA: [SPARK-19746](https://issues.apache.org/jira/browse/SPARK-19746) The following code is inefficient: ````scala val localCoefficients: Vector = bcCoefficients.value features.foreachActive { (index, value) => val stdValue = value / localFeaturesStd(index) var j = 0 while (j < numClasses) { margins(j) += localCoefficients(index * numClasses + j) * stdValue j += 1 } } ```` `localCoefficients(index * numClasses + j)` calls `Vector.apply` which creates a new Breeze vector and indexes that. Even if it is not that slow to create the object, we will generate a lot of extra garbage that may result in longer GC pauses. This is a hot inner loop, so we should optimize wherever possible. ## How was this patch tested? I don't think there's a great way to test this patch. It's purely performance related, so unit tests should guarantee that we haven't made any unwanted changes. Empirically I observed between 10-40% speedups just running short local tests. I suspect the big differences will be seen when large data/coefficient sizes have to pause for GC more often. I welcome other ideas for testing. Author: sethah <seth.hendrickson16@gmail.com> Closes #17078 from sethah/logistic_agg_indexing.
Diffstat (limited to 'mllib/src')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala11
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala26
2 files changed, 34 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 892e00fa60..738b35135f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1431,7 +1431,12 @@ private class LogisticAggregator(
private var weightSum = 0.0
private var lossSum = 0.0
- private val gradientSumArray = Array.fill[Double](coefficientSize)(0.0D)
+ @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match {
+ case DenseVector(values) => values
+ case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " +
+ s"got type ${bcCoefficients.value.getClass}.)")
+ }
+ private val gradientSumArray = new Array[Double](coefficientSize)
if (multinomial && numClasses <= 2) {
logInfo(s"Multinomial logistic regression for binary classification yields separate " +
@@ -1447,7 +1452,7 @@ private class LogisticAggregator(
label: Double): Unit = {
val localFeaturesStd = bcFeaturesStd.value
- val localCoefficients = bcCoefficients.value
+ val localCoefficients = coefficientsArray
val localGradientArray = gradientSumArray
val margin = - {
var sum = 0.0
@@ -1491,7 +1496,7 @@ private class LogisticAggregator(
logistic regression without pivoting.
*/
val localFeaturesStd = bcFeaturesStd.value
- val localCoefficients = bcCoefficients.value
+ val localCoefficients = coefficientsArray
val localGradientArray = gradientSumArray
// marginOfLabel is margins(label) in the formula
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 43547a4aaf..d89a958eed 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -456,6 +456,32 @@ class LogisticRegressionSuite
assert(blrModel.intercept !== 0.0)
}
+ test("sparse coefficients in LogisticAggregator") {
+ val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
+ val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
+ val binaryAgg = new LogisticAggregator(bcCoefficientsBinary, bcFeaturesStd, 2,
+ fitIntercept = true, multinomial = false)
+ val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") {
+ intercept[IllegalArgumentException] {
+ binaryAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
+ }
+ }
+ assert(thrownBinary.getMessage.contains("coefficients only supports dense"))
+
+ val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0)))
+ val multinomialAgg = new LogisticAggregator(bcCoefficientsMulti, bcFeaturesStd, 3,
+ fitIntercept = true, multinomial = true)
+ val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") {
+ intercept[IllegalArgumentException] {
+ multinomialAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
+ }
+ }
+ assert(thrown.getMessage.contains("coefficients only supports dense"))
+ bcCoefficientsBinary.destroy(blocking = false)
+ bcFeaturesStd.destroy(blocking = false)
+ bcCoefficientsMulti.destroy(blocking = false)
+ }
+
test("overflow prediction for multiclass") {
val model = new LogisticRegressionModel("mLogReg",
Matrices.dense(3, 2, Array(0.0, 0.0, 0.0, 1.0, 2.0, 3.0)),