aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-07-31 14:15:17 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-07-31 14:15:17 -0700
commitcef178873b04960c36647d9899fcd13715fef62c (patch)
tree979bf66a085f316852789b5518be8a8f7bd5240a /mllib
parent29b8cd3616b76c76def2232d14846da44a3d9839 (diff)
downloadspark-cef178873b04960c36647d9899fcd13715fef62c.tar.gz
spark-cef178873b04960c36647d9899fcd13715fef62c.tar.bz2
spark-cef178873b04960c36647d9899fcd13715fef62c.zip
Refactor SGD options into a new class.
This refactoring pulls out code shared between SVM, Lasso, LR into a common GradientDescentOpts class. Some style cleanup as well
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala76
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala48
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala58
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala19
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala58
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala7
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala23
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala13
8 files changed, 143 insertions, 159 deletions
diff --git a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala
new file mode 100644
index 0000000000..d9c2be2a19
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.optimization
+
+/**
+ * Class used to configure options used for GradientDescent based optimization
+ * algorithms.
+ */
+
+class GradientDescentOpts private (
+ var stepSize: Double,
+ var numIters: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double) {
+
+ def this() = this(1.0, 100, 0.0, 1.0)
+
+ /**
+ * Set the step size per-iteration of SGD. Default 1.0.
+ */
+ def setStepSize(step: Double) = {
+ this.stepSize = step
+ this
+ }
+
+ /**
+ * Set fraction of data to be used for each SGD iteration. Default 1.0.
+ */
+ def setMiniBatchFraction(fraction: Double) = {
+ this.miniBatchFraction = fraction
+ this
+ }
+
+ /**
+ * Set the number of iterations for SGD. Default 100.
+ */
+ def setNumIterations(iters: Int) = {
+ this.numIters = iters
+ this
+ }
+
+ /**
+ * Set the regularization parameter used for SGD. Default 0.0.
+ */
+ def setRegParam(regParam: Double) = {
+ this.regParam = regParam
+ this
+ }
+}
+
+object GradientDescentOpts {
+
+ def apply(stepSize: Double, numIters: Int, regParam: Double, miniBatchFraction: Double) = {
+ new GradientDescentOpts(stepSize, numIters, regParam, miniBatchFraction)
+ }
+
+ def apply() = {
+ new GradientDescentOpts()
+ }
+
+}
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index 203aa8fdd4..bc711fd2d8 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -55,38 +55,12 @@ class LogisticRegressionModel(
}
}
-class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double,
- var numIters: Int)
- extends Logging {
+class LogisticRegression(val opts: GradientDescentOpts) extends Logging {
/**
* Construct a LogisticRegression object with default parameters
*/
- def this() = this(1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
+ def this() = this(new GradientDescentOpts())
def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = {
val nfeatures: Int = input.take(1)(0)._2.length
@@ -109,11 +83,8 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa
data,
new LogisticGradient(),
new SimpleUpdater(),
- stepSize,
- numIters,
- 0.0,
- initalWeightsWithIntercept,
- miniBatchFraction)
+ opts,
+ initalWeightsWithIntercept)
val intercept = weights(0)
val weightsScaled = weights.tail
@@ -132,7 +103,7 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa
* NOTE(shivaram): We use multiple train methods instead of default arguments to support
* Java programs.
*/
-object LogisticRegressionLocalRandomSGD {
+object LogisticRegression {
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
@@ -155,8 +126,8 @@ object LogisticRegressionLocalRandomSGD {
initialWeights: Array[Double])
: LogisticRegressionModel =
{
- new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(
- input, initialWeights)
+ val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction)
+ new LogisticRegression(sgdOpts).train(input, initialWeights)
}
/**
@@ -177,7 +148,8 @@ object LogisticRegressionLocalRandomSGD {
miniBatchFraction: Double)
: LogisticRegressionModel =
{
- new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input)
+ val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction)
+ new LogisticRegression(sgdOpts).train(input)
}
/**
@@ -225,7 +197,7 @@ object LogisticRegressionLocalRandomSGD {
}
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = LogisticRegressionLocalRandomSGD.train(
+ val model = LogisticRegression.train(
data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
index 3a6a12814a..1c137168b6 100644
--- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -53,46 +53,12 @@ class SVMModel(
-class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double,
- var miniBatchFraction: Double, var numIters: Int)
- extends Logging {
+class SVM(val opts: GradientDescentOpts) extends Logging {
/**
* Construct a SVM object with default parameters
*/
- def this() = this(1.0, 1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set the regularization parameter. Default 1.0.
- */
- def setRegParam(param: Double) = {
- this.regParam = param
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
+ def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0))
def train(input: RDD[(Int, Array[Double])]): SVMModel = {
val nfeatures: Int = input.take(1)(0)._2.length
@@ -115,11 +81,8 @@ class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double,
data,
new HingeGradient(),
new SquaredL2Updater(),
- stepSize,
- numIters,
- regParam,
- initalWeightsWithIntercept,
- miniBatchFraction)
+ opts,
+ initalWeightsWithIntercept)
val intercept = weights(0)
val weightsScaled = weights.tail
@@ -135,10 +98,8 @@ class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double,
/**
* Top-level methods for calling SVM.
-
-
*/
-object SVMLocalRandomSGD {
+object SVM {
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
@@ -163,8 +124,8 @@ object SVMLocalRandomSGD {
initialWeights: Array[Double])
: SVMModel =
{
- new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
- input, initialWeights)
+ val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
+ new SVM(sgdOpts).train(input, initialWeights)
}
/**
@@ -186,7 +147,8 @@ object SVMLocalRandomSGD {
miniBatchFraction: Double)
: SVMModel =
{
- new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
+ new SVM(sgdOpts).train(input)
}
/**
@@ -234,7 +196,7 @@ object SVMLocalRandomSGD {
}
val sc = new SparkContext(args(0), "SVM")
val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val model = SVM.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index 19cda26446..67451ff053 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -24,7 +24,6 @@ import org.jblas.DoubleMatrix
import scala.collection.mutable.ArrayBuffer
-
object GradientDescent {
/**
@@ -48,23 +47,20 @@ object GradientDescent {
data: RDD[(Double, Array[Double])],
gradient: Gradient,
updater: Updater,
- stepSize: Double,
- numIters: Int,
- regParam: Double,
- initialWeights: Array[Double],
- miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = {
+ opts: GradientDescentOpts,
+ initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
- val stochasticLossHistory = new ArrayBuffer[Double](numIters)
+ val stochasticLossHistory = new ArrayBuffer[Double](opts.numIters)
val nexamples: Long = data.count()
- val miniBatchSize = nexamples * miniBatchFraction
+ val miniBatchSize = nexamples * opts.miniBatchFraction
// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var regVal = 0.0
- for (i <- 1 to numIters) {
- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
+ for (i <- 1 to opts.numIters) {
+ val (gradientSum, lossSum) = data.sample(false, opts.miniBatchFraction, 42+i).map {
case (y, features) =>
val featuresRow = new DoubleMatrix(features.length, 1, features:_*)
val (grad, loss) = gradient.compute(featuresRow, y, weights)
@@ -76,7 +72,8 @@ object GradientDescent {
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
- val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
+ val update = updater.compute(
+ weights, gradientSum.div(miniBatchSize), opts.stepSize, i, opts.regParam)
weights = update._1
regVal = update._2
}
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
index e8b1ed8a48..7f6fa8025c 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -53,46 +53,12 @@ class LassoModel(
}
-class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double,
- var miniBatchFraction: Double, var numIters: Int)
- extends Logging {
+class Lasso(val opts: GradientDescentOpts) extends Logging {
/**
* Construct a Lasso object with default parameters
*/
- def this() = this(1.0, 1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set the regularization parameter. Default 1.0.
- */
- def setRegParam(param: Double) = {
- this.regParam = param
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
+ def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0))
def train(input: RDD[(Double, Array[Double])]): LassoModel = {
val nfeatures: Int = input.take(1)(0)._2.length
@@ -115,11 +81,8 @@ class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double,
data,
new SquaredGradient(),
new L1Updater(),
- stepSize,
- numIters,
- regParam,
- initalWeightsWithIntercept,
- miniBatchFraction)
+ opts,
+ initalWeightsWithIntercept)
val intercept = weights(0)
val weightsScaled = weights.tail
@@ -135,10 +98,8 @@ class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double,
/**
* Top-level methods for calling Lasso.
- *
- *
*/
-object LassoLocalRandomSGD {
+object Lasso {
/**
* Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
@@ -163,8 +124,8 @@ object LassoLocalRandomSGD {
initialWeights: Array[Double])
: LassoModel =
{
- new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
- input, initialWeights)
+ val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
+ new Lasso(sgdOpts).train(input, initialWeights)
}
/**
@@ -186,7 +147,8 @@ object LassoLocalRandomSGD {
miniBatchFraction: Double)
: LassoModel =
{
- new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction)
+ new Lasso(sgdOpts).train(input)
}
/**
@@ -234,7 +196,7 @@ object LassoLocalRandomSGD {
}
val sc = new SparkContext(args(0), "Lasso")
val data = MLUtils.loadLabeledData(sc, args(1))
- val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val model = Lasso.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
index 8664263935..439867d163 100644
--- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import spark.SparkContext
+import spark.mllib.optimization._
class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
@@ -79,7 +80,8 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(20)
+ val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(20)
+ val lr = new LogisticRegression(sgdOpts)
val model = lr.train(testRDD)
@@ -111,7 +113,8 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
testRDD.cache()
// Use half as many iterations as the previous test.
- val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(10)
+ val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(10)
+ val lr = new LogisticRegression(sgdOpts)
val model = lr.train(testRDD, initialWeights)
diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
index d546e0729e..a624b42c38 100644
--- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import spark.SparkContext
+import spark.mllib.optimization._
import org.jblas.DoubleMatrix
@@ -44,10 +45,14 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
val rnd = new Random(seed)
val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
- val y = x.map(xi =>
- signum((new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()).toInt
- )
- y zip x
+ val y = x.map { xi =>
+ signum(
+ (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) +
+ intercept +
+ 0.1 * rnd.nextGaussian()
+ ).toInt
+ }
+ y.zip(x)
}
def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) {
@@ -58,7 +63,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
assert(numOffPredictions < input.length / 5)
}
- test("SVMLocalRandomSGD") {
+ test("SVM using local random SGD") {
val nPoints = 10000
val A = 2.0
@@ -70,7 +75,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+ val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+ val svm = new SVM(sgdOpts)
val model = svm.train(testRDD)
@@ -84,7 +90,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
}
- test("SVMLocalRandomSGD with initial weights") {
+ test("SVM local random SGD with initial weights") {
val nPoints = 10000
val A = 2.0
@@ -100,7 +106,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+ val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+ val svm = new SVM(sgdOpts)
val model = svm.train(testRDD, initialWeights)
diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
index cf2b067d40..531746ec02 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import spark.SparkContext
+import spark.mllib.optimization._
import org.jblas.DoubleMatrix
@@ -59,7 +60,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
assert(numOffPredictions < input.length / 5)
}
- test("LassoLocalRandomSGD") {
+ test("Lasso local random SGD") {
val nPoints = 10000
val A = 2.0
@@ -70,7 +71,9 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+ val ls = new Lasso(sgdOpts)
val model = ls.train(testRDD)
@@ -90,7 +93,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
}
- test("LassoLocalRandomSGD with initial weights") {
+ test("Lasso local random SGD with initial weights") {
val nPoints = 10000
val A = 2.0
@@ -105,7 +108,9 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+ val ls = new Lasso(sgdOpts)
val model = ls.train(testRDD, initialWeights)