diff options
author | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-08-02 19:15:34 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@eecs.berkeley.edu> | 2013-08-02 19:15:34 -0700 |
commit | 00339cc0328b692417ad0b9aeb23d522edbb93e5 (patch) | |
tree | 2042d3aced47d4af5d252ef6677130fd5860d02c /mllib | |
parent | cef178873b04960c36647d9899fcd13715fef62c (diff) | |
download | spark-00339cc0328b692417ad0b9aeb23d522edbb93e5.tar.gz spark-00339cc0328b692417ad0b9aeb23d522edbb93e5.tar.bz2 spark-00339cc0328b692417ad0b9aeb23d522edbb93e5.zip |
Refactor optimizers and create GLMs
This change refactors the structure of GLMs to use mixins which maintain
a similar interface to other ML lib algorithms. This change also creates
an Optimizer trait which allows GLMs to be extended to use other optimization
techniques.
Diffstat (limited to 'mllib')
10 files changed, 320 insertions, 286 deletions
diff --git a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala deleted file mode 100644 index d9c2be2a19..0000000000 --- a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.optimization - -/** - * Class used to configure options used for GradientDescent based optimization - * algorithms. - */ - -class GradientDescentOpts private ( - var stepSize: Double, - var numIters: Int, - var regParam: Double, - var miniBatchFraction: Double) { - - def this() = this(1.0, 100, 0.0, 1.0) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } - - /** - * Set the regularization parameter used for SGD. Default 0.0. - */ - def setRegParam(regParam: Double) = { - this.regParam = regParam - this - } -} - -object GradientDescentOpts { - - def apply(stepSize: Double, numIters: Int, regParam: Double, miniBatchFraction: Double) = { - new GradientDescentOpts(stepSize, numIters, regParam, miniBatchFraction) - } - - def apply() = { - new GradientDescentOpts() - } - -} diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index bc711fd2d8..0af99c616d 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -19,6 +19,7 @@ package spark.mllib.classification import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import scala.math.round @@ -30,80 +31,46 @@ import org.jblas.DoubleMatrix * Based on Matlab code written by John Duchi. */ class LogisticRegressionModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept - round(1.0/ (1.0 + math.exp(margin * -1))).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Int](weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept round(1.0/ (1.0 + math.exp(margin * -1))).toInt } } -class LogisticRegression(val opts: GradientDescentOpts) extends Logging { +class LogisticRegressionWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Int, LogisticRegressionModel] + with GradientDescent with Serializable { + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() /** * Construct a LogisticRegression object with default parameters */ - def this() = this(new GradientDescentOpts()) - - def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): LogisticRegressionModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new LogisticGradient(), - new SimpleUpdater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 0.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LogisticRegressionModel(weights, intercept) } } /** * Top-level methods for calling Logistic Regression. - * NOTE(shivaram): We use multiple train methods instead of default arguments to support + * NOTE(shivaram): We use multiple train methods instead of default arguments to support * Java programs. */ -object LogisticRegression { +object LogisticRegressionWithSGD { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed @@ -126,8 +93,8 @@ object LogisticRegression { initialWeights: Array[Double]) : LogisticRegressionModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) - new LogisticRegression(sgdOpts).train(input, initialWeights) + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + input, initialWeights) } /** @@ -148,8 +115,8 @@ object LogisticRegression { miniBatchFraction: Double) : LogisticRegressionModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) - new LogisticRegression(sgdOpts).train(input) + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + input) } /** @@ -197,7 +164,7 @@ object LogisticRegression { } val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = LogisticRegression.train( + val model = LogisticRegressionWithSGD.train( data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index 1c137168b6..caf9e3cb93 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -20,6 +20,7 @@ package spark.mllib.classification import scala.math.signum import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix @@ -28,78 +29,42 @@ import org.jblas.DoubleMatrix * SVM using Stochastic Gradient Descent. */ class SVMModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - signum(dataMat.dot(weightsMatrix) + this.intercept).toInt + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Int](weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + signum(dataMatrix.dot(weightMatrix) + intercept).toInt } } +class SVMWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Int, SVMModel] with GradientDescent with Serializable { - -class SVM(val opts: GradientDescentOpts) extends Logging { + val gradient = new HingeGradient() + val updater = new SquaredL2Updater() /** * Construct a SVM object with default parameters */ - def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) - - def train(input: RDD[(Int, Array[Double])]): SVMModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): SVMModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new HingeGradient(), - new SquaredL2Updater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new SVMModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 1.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new SVMModel(weights, intercept) } } /** * Top-level methods for calling SVM. */ -object SVM { +object SVMWithSGD { /** * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number @@ -124,8 +89,8 @@ object SVM { initialWeights: Array[Double]) : SVMModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new SVM(sgdOpts).train(input, initialWeights) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + initialWeights) } /** @@ -147,8 +112,7 @@ object SVM { miniBatchFraction: Double) : SVMModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new SVM(sgdOpts).train(input) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) } /** @@ -196,7 +160,7 @@ object SVM { } val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = SVM.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 67451ff053..f7d09a2bd3 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,8 +24,66 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer -object GradientDescent { +trait GradientDescent extends Optimizer { + val gradient: Gradient + val updater: Updater + + var stepSize: Double + var numIterations: Int + var regParam: Double + var miniBatchFraction: Double + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double): this.type = { + this.stepSize = step + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter used for SGD. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]) + : Array[Double] = { + + val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD( + data, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFraction, + initialWeights) + weights + } + +} + +object GradientDescent extends Logging { /** * Run gradient descent in parallel using mini batches. * Based on Matlab code written by John Duchi. @@ -34,7 +92,7 @@ object GradientDescent { * @param gradient - Gradient object that will be used to compute the gradient. * @param updater - Updater object that will be used to update the model. * @param stepSize - stepSize to be used during update. - * @param numIters - number of iterations that SGD should be run. + * @param numIterations - number of iterations that SGD should be run. * @param regParam - regularization parameter * @param miniBatchFraction - fraction of the input data set that should be used for * one iteration of SGD. Default value 1.0. @@ -47,20 +105,23 @@ object GradientDescent { data: RDD[(Double, Array[Double])], gradient: Gradient, updater: Updater, - opts: GradientDescentOpts, + stepSize: Double, + numIterations: Int, + regParam: Double, + miniBatchFraction: Double, initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { - val stochasticLossHistory = new ArrayBuffer[Double](opts.numIters) + val stochasticLossHistory = new ArrayBuffer[Double](numIterations) val nexamples: Long = data.count() - val miniBatchSize = nexamples * opts.miniBatchFraction + val miniBatchSize = nexamples * miniBatchFraction // Initialize weights as a column vector var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) var regVal = 0.0 - for (i <- 1 to opts.numIters) { - val (gradientSum, lossSum) = data.sample(false, opts.miniBatchFraction, 42+i).map { + for (i <- 1 to numIterations) { + val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { case (y, features) => val featuresRow = new DoubleMatrix(features.length, 1, features:_*) val (grad, loss) = gradient.compute(featuresRow, y, weights) @@ -73,11 +134,14 @@ object GradientDescent { */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) val update = updater.compute( - weights, gradientSum.div(miniBatchSize), opts.stepSize, i, opts.regParam) + weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) weights = update._1 regVal = update._2 } + logInfo("GradientDescent finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + (weights.toArray, stochasticLossHistory.toArray) } } diff --git a/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala new file mode 100644 index 0000000000..76a519c338 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.optimization + +import spark.RDD + +trait Optimizer { + + /** + * Solve the provided convex optimization problem. + */ + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double] + +} diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala new file mode 100644 index 0000000000..0bbc9424e6 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import spark.{Logging, RDD, SparkContext, SparkException} +import spark.mllib.optimization._ +import spark.mllib.util.MLUtils + +import scala.math.round + +import org.jblas.DoubleMatrix + +/** + * GeneralizedLinearModel (GLM) represents a model trained using + * GeneralizedLinearAlgorithm. GLMs consist of a weight vector, + * an intercept. + */ +abstract class GeneralizedLinearModel[T: ClassManifest]( + val weights: Array[Double], + val intercept: Double) + extends Serializable { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + + def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double): T + + def predict(testData: spark.RDD[Array[Double]]): RDD[T] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + + testData.map { x => + val dataMatrix = new DoubleMatrix(1, x.length, x:_*) + predictPoint(dataMatrix, localWeights, localIntercept) + } + } + + def predict(testData: Array[Double]): T = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + predictPoint(dataMat, weightsMatrix, intercept) + } +} + +/** + * GeneralizedLinearAlgorithm abstracts out the training for all GLMs. + * This class should be mixed in with an Optimizer to create a new GLM. + * + * NOTE(shivaram): This is an abstract class rather than a trait as we use + * a view bound to convert labels to Double. + */ +abstract class GeneralizedLinearAlgorithm[T <% Double, M <: GeneralizedLinearModel[T]] + extends Logging with Serializable { + + // We need an optimizer mixin to solve the GLM + self : Optimizer => + + var addIntercept: Boolean + + def createModel(weights: Array[Double], intercept: Double): M + + /** + * Set if the algorithm should add an intercept. Default true. + */ + def setIntercept(addIntercept: Boolean): this.type = { + this.addIntercept = addIntercept + this + } + + def train(input: RDD[(T, Array[Double])])(implicit mt: Manifest[T]) : M = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(T, Array[Double])], + initialWeights: Array[Double])(implicit mt: Manifest[T]) + : M = { + + // Add a extra variable consisting of all 1.0's for the intercept. + val data = if (addIntercept) { + input.map { case (y, features) => + (y.toDouble, Array(1.0, features:_*)) + } + } else { + input.map { case (y, features) => + (y.toDouble, features) + } + } + + val initialWeightsWithIntercept = if (addIntercept) { + Array(1.0, initialWeights:_*) + } else { + initialWeights + } + + val weights = optimize(data, initialWeightsWithIntercept) + val intercept = weights(0) + val weightsScaled = weights.tail + + val model = createModel(weightsScaled, intercept) + + logInfo("Final model weights " + model.weights.mkString(",")) + logInfo("Final model intercept " + model.intercept) + model + } +} diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index 7f6fa8025c..f8b15033aa 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -28,78 +28,44 @@ import org.jblas.DoubleMatrix * */ class LassoModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends RegressionModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]) = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept - } - } - - - override def predict(testData: Array[Double]): Double = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - dataMat.dot(weightsMatrix) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Double](weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept } } -class Lasso(val opts: GradientDescentOpts) extends Logging { +class LassoWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Double, LassoModel] + with GradientDescent with Serializable { + + val gradient = new SquaredGradient() + val updater = new L1Updater() /** * Construct a Lasso object with default parameters */ - def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) - - def train(input: RDD[(Double, Array[Double])]): LassoModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Double, Array[Double])], - initialWeights: Array[Double]): LassoModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new SquaredGradient(), - new L1Updater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LassoModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 1.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LassoModel(weights, intercept) } } /** * Top-level methods for calling Lasso. */ -object Lasso { +object LassoWithSGD { /** * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number @@ -124,8 +90,8 @@ object Lasso { initialWeights: Array[Double]) : LassoModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new Lasso(sgdOpts).train(input, initialWeights) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + initialWeights) } /** @@ -147,8 +113,7 @@ object Lasso { miniBatchFraction: Double) : LassoModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new Lasso(sgdOpts).train(input) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) } /** @@ -196,7 +161,7 @@ object Lasso { } val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = Lasso.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index 439867d163..ee38486212 100644 --- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -80,8 +80,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(20) - val lr = new LogisticRegression(sgdOpts) + val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(20) val model = lr.train(testRDD) @@ -113,8 +112,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul testRDD.cache() // Use half as many iterations as the previous test. - val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(10) - val lr = new LogisticRegression(sgdOpts) + val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(10) val model = lr.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala index a624b42c38..1eef9387e3 100644 --- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala @@ -44,7 +44,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { seed: Int): Seq[(Int, Array[Double])] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) val y = x.map { xi => signum( (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + @@ -75,8 +76,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val svm = new SVM(sgdOpts) + val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.train(testRDD) @@ -106,8 +106,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val svm = new SVM(sgdOpts) + val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala index 531746ec02..ab1d07b879 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala @@ -44,10 +44,11 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { seed: Int): Seq[(Double, Array[Double])] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) val y = x.map(xi => (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() - ) + ) y zip x } @@ -72,8 +73,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val ls = new Lasso(sgdOpts) + val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) val model = ls.train(testRDD) @@ -109,8 +109,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val ls = new Lasso(sgdOpts) + val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) val model = ls.train(testRDD, initialWeights) |