aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala63
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala15
2 files changed, 30 insertions, 48 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index 969a0c5f7c..8f187c9df5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -42,7 +42,6 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
private var convergenceTol = 1E-4
private var maxNumIterations = 100
private var regParam = 0.0
- private var miniBatchFraction = 1.0
/**
* Set the number of corrections used in the LBFGS update. Default 10.
@@ -58,14 +57,6 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
}
/**
- * Set fraction of data to be used for each L-BFGS iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double): this.type = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
* Set the convergence tolerance of iterations for L-BFGS. Default 1E-4.
* Smaller value will lead to higher accuracy with the cost of more iterations.
*/
@@ -110,7 +101,7 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
}
override def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
- val (weights, _) = LBFGS.runMiniBatchLBFGS(
+ val (weights, _) = LBFGS.runLBFGS(
data,
gradient,
updater,
@@ -118,7 +109,6 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
convergenceTol,
maxNumIterations,
regParam,
- miniBatchFraction,
initialWeights)
weights
}
@@ -132,10 +122,8 @@ class LBFGS(private var gradient: Gradient, private var updater: Updater)
@DeveloperApi
object LBFGS extends Logging {
/**
- * Run Limited-memory BFGS (L-BFGS) in parallel using mini batches.
- * In each iteration, we sample a subset (fraction miniBatchFraction) of the total data
- * in order to compute a gradient estimate.
- * Sampling, and averaging the subgradients over this subset is performed using one standard
+ * Run Limited-memory BFGS (L-BFGS) in parallel.
+ * Averaging the subgradients over different partitions is performed using one standard
* spark map-reduce in each iteration.
*
* @param data - Input data for L-BFGS. RDD of the set of data examples, each of
@@ -147,14 +135,12 @@ object LBFGS extends Logging {
* @param convergenceTol - The convergence tolerance of iterations for L-BFGS
* @param maxNumIterations - Maximal number of iterations that L-BFGS can be run.
* @param regParam - Regularization parameter
- * @param miniBatchFraction - Fraction of the input data set that should be used for
- * one iteration of L-BFGS. Default value 1.0.
*
* @return A tuple containing two elements. The first element is a column matrix containing
* weights for every feature, and the second element is an array containing the loss
* computed for every iteration.
*/
- def runMiniBatchLBFGS(
+ def runLBFGS(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
@@ -162,23 +148,33 @@ object LBFGS extends Logging {
convergenceTol: Double,
maxNumIterations: Int,
regParam: Double,
- miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
val lossHistory = new ArrayBuffer[Double](maxNumIterations)
val numExamples = data.count()
- val miniBatchSize = numExamples * miniBatchFraction
val costFun =
- new CostFun(data, gradient, updater, regParam, miniBatchFraction, lossHistory, miniBatchSize)
+ new CostFun(data, gradient, updater, regParam, numExamples)
val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, convergenceTol)
- val weights = Vectors.fromBreeze(
- lbfgs.minimize(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector))
+ val states =
+ lbfgs.iterations(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector)
+
+ /**
+ * NOTE: lossSum and loss is computed using the weights from the previous iteration
+ * and regVal is the regularization value computed in the previous iteration as well.
+ */
+ var state = states.next()
+ while(states.hasNext) {
+ lossHistory.append(state.value)
+ state = states.next()
+ }
+ lossHistory.append(state.value)
+ val weights = Vectors.fromBreeze(state.x)
- logInfo("LBFGS.runMiniBatchSGD finished. Last 10 losses %s".format(
+ logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
lossHistory.takeRight(10).mkString(", ")))
(weights, lossHistory.toArray)
@@ -193,9 +189,7 @@ object LBFGS extends Logging {
gradient: Gradient,
updater: Updater,
regParam: Double,
- miniBatchFraction: Double,
- lossHistory: ArrayBuffer[Double],
- miniBatchSize: Double) extends DiffFunction[BDV[Double]] {
+ numExamples: Long) extends DiffFunction[BDV[Double]] {
private var i = 0
@@ -204,8 +198,7 @@ object LBFGS extends Logging {
val localData = data
val localGradient = gradient
- val (gradientSum, lossSum) = localData.sample(false, miniBatchFraction, 42 + i)
- .aggregate((BDV.zeros[Double](weights.size), 0.0))(
+ val (gradientSum, lossSum) = localData.aggregate((BDV.zeros[Double](weights.size), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, Vectors.fromBreeze(weights), Vectors.fromBreeze(grad))
@@ -223,7 +216,7 @@ object LBFGS extends Logging {
Vectors.fromBreeze(weights),
Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
- val loss = lossSum / miniBatchSize + regVal
+ val loss = lossSum / numExamples + regVal
/**
* It will return the gradient part of regularization using updater.
*
@@ -245,14 +238,8 @@ object LBFGS extends Logging {
Vectors.fromBreeze(weights),
Vectors.dense(new Array[Double](weights.size)), 1, 1, regParam)._1.toBreeze
- // gradientTotal = gradientSum / miniBatchSize + gradientTotal
- axpy(1.0 / miniBatchSize, gradientSum, gradientTotal)
-
- /**
- * NOTE: lossSum and loss is computed using the weights from the previous iteration
- * and regVal is the regularization value computed in the previous iteration as well.
- */
- lossHistory.append(loss)
+ // gradientTotal = gradientSum / numExamples + gradientTotal
+ axpy(1.0 / numExamples, gradientSum, gradientTotal)
i += 1
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
index f33770aed3..6af1b502eb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala
@@ -59,7 +59,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
val convergenceTol = 1e-12
val maxNumIterations = 10
- val (_, loss) = LBFGS.runMiniBatchLBFGS(
+ val (_, loss) = LBFGS.runLBFGS(
dataRDD,
gradient,
simpleUpdater,
@@ -67,7 +67,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
convergenceTol,
maxNumIterations,
regParam,
- miniBatchFrac,
initialWeightsWithIntercept)
// Since the cost function is convex, the loss is guaranteed to be monotonically decreasing
@@ -104,7 +103,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
val convergenceTol = 1e-12
val maxNumIterations = 10
- val (weightLBFGS, lossLBFGS) = LBFGS.runMiniBatchLBFGS(
+ val (weightLBFGS, lossLBFGS) = LBFGS.runLBFGS(
dataRDD,
gradient,
squaredL2Updater,
@@ -112,7 +111,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
convergenceTol,
maxNumIterations,
regParam,
- miniBatchFrac,
initialWeightsWithIntercept)
val numGDIterations = 50
@@ -150,7 +148,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
val maxNumIterations = 8
var convergenceTol = 0.0
- val (_, lossLBFGS1) = LBFGS.runMiniBatchLBFGS(
+ val (_, lossLBFGS1) = LBFGS.runLBFGS(
dataRDD,
gradient,
squaredL2Updater,
@@ -158,7 +156,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
convergenceTol,
maxNumIterations,
regParam,
- miniBatchFrac,
initialWeightsWithIntercept)
// Note that the first loss is computed with initial weights,
@@ -166,7 +163,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
assert(lossLBFGS1.length == 9)
convergenceTol = 0.1
- val (_, lossLBFGS2) = LBFGS.runMiniBatchLBFGS(
+ val (_, lossLBFGS2) = LBFGS.runLBFGS(
dataRDD,
gradient,
squaredL2Updater,
@@ -174,7 +171,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
convergenceTol,
maxNumIterations,
regParam,
- miniBatchFrac,
initialWeightsWithIntercept)
// Based on observation, lossLBFGS2 runs 3 iterations, no theoretically guaranteed.
@@ -182,7 +178,7 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
assert((lossLBFGS2(2) - lossLBFGS2(3)) / lossLBFGS2(2) < convergenceTol)
convergenceTol = 0.01
- val (_, lossLBFGS3) = LBFGS.runMiniBatchLBFGS(
+ val (_, lossLBFGS3) = LBFGS.runLBFGS(
dataRDD,
gradient,
squaredL2Updater,
@@ -190,7 +186,6 @@ class LBFGSSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
convergenceTol,
maxNumIterations,
regParam,
- miniBatchFrac,
initialWeightsWithIntercept)
// With smaller convergenceTol, it takes more steps.