diff options
author | Anthony Truchet <a.truchet@criteo.com> | 2016-12-13 21:30:57 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-12-13 21:30:57 +0000 |
commit | 9e8a9d7c6a847bc5e77f9a1004029ec27616da9d (patch) | |
tree | afc8444d2469a0538c79821359d5ce550b4b1f1f /mllib/src | |
parent | e57e3938c69fb1d91970341f027f2ab5000d2daa (diff) | |
download | spark-9e8a9d7c6a847bc5e77f9a1004029ec27616da9d.tar.gz spark-9e8a9d7c6a847bc5e77f9a1004029ec27616da9d.tar.bz2 spark-9e8a9d7c6a847bc5e77f9a1004029ec27616da9d.zip |
[SPARK-18471][MLLIB] In LBFGS, avoid sending huge vectors of 0
## What changes were proposed in this pull request?
CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.
## How was this patch tested?
Unit test for module mllib run locally for correctness.
As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory.
Author: Anthony Truchet <a.truchet@criteo.com>
Author: sethah <seth.hendrickson16@gmail.com>
Author: Anthony Truchet <AnthonyTruchet@users.noreply.github.com>
Closes #16037 from AnthonyTruchet/ENG-17719-lbfgs-only.
Diffstat (limited to 'mllib/src')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala | 28 | ||||
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala | 19 |
2 files changed, 37 insertions, 10 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index e0e41f711b..7a714db853 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -241,16 +241,24 @@ object LBFGS extends Logging { val bcW = data.context.broadcast(w) val localGradient = gradient - val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( - seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => - val l = localGradient.compute( - features, label, bcW.value, grad) - (grad, loss + l) - }, - combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => - axpy(1.0, grad2, grad1) - (grad1, loss1 + loss2) - }) + val seqOp = (c: (Vector, Double), v: (Double, Vector)) => + (c, v) match { + case ((grad, loss), (label, features)) => + val denseGrad = grad.toDense + val l = localGradient.compute(features, label, bcW.value, denseGrad) + (denseGrad, loss + l) + } + + val combOp = (c1: (Vector, Double), c2: (Vector, Double)) => + (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => + val denseGrad1 = grad1.toDense + val denseGrad2 = grad2.toDense + axpy(1.0, denseGrad2, denseGrad1) + (denseGrad1, loss1 + loss2) + } + + val zeroSparseVector = Vectors.sparse(n, Seq()) + val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp) // broadcasted model is not needed anymore bcW.destroy() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala index 75ae0eb32f..572959200f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala @@ -230,6 +230,25 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers (weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02), "The weight differences between LBFGS and GD should be within 2%.") } + + test("SPARK-18471: LBFGS aggregator on empty partitions") { + val regParam = 0 + + val initialWeightsWithIntercept = Vectors.dense(0.0) + val convergenceTol = 1e-12 + val numIterations = 1 + val dataWithEmptyPartitions = sc.parallelize(Seq((1.0, Vectors.dense(2.0))), 2) + + LBFGS.runLBFGS( + dataWithEmptyPartitions, + gradient, + simpleUpdater, + numCorrections, + convergenceTol, + numIterations, + regParam, + initialWeightsWithIntercept) + } } class LBFGSClusterSuite extends SparkFunSuite with LocalClusterSparkContext { |