From 00339cc0328b692417ad0b9aeb23d522edbb93e5 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Fri, 2 Aug 2013 19:15:34 -0700 Subject: Refactor optimizers and create GLMs This change refactors the structure of GLMs to use mixins which maintain a similar interface to other ML lib algorithms. This change also creates an Optimizer trait which allows GLMs to be extended to use other optimization techniques. --- .../scala/spark/mllib/GradientDescentOpts.scala | 76 ------------- .../mllib/classification/LogisticRegression.scala | 93 +++++---------- .../scala/spark/mllib/classification/SVM.scala | 88 +++++---------- .../spark/mllib/optimization/GradientDescent.scala | 80 +++++++++++-- .../scala/spark/mllib/optimization/Optimizer.scala | 29 +++++ .../regression/GeneralizedLinearAlgorithm.scala | 125 +++++++++++++++++++++ .../main/scala/spark/mllib/regression/Lasso.scala | 89 +++++---------- .../classification/LogisticRegressionSuite.scala | 6 +- .../spark/mllib/classification/SVMSuite.scala | 9 +- .../scala/spark/mllib/regression/LassoSuite.scala | 11 +- 10 files changed, 320 insertions(+), 286 deletions(-) delete mode 100644 mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala create mode 100644 mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala create mode 100644 mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala diff --git a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala deleted file mode 100644 index d9c2be2a19..0000000000 --- a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.optimization - -/** - * Class used to configure options used for GradientDescent based optimization - * algorithms. - */ - -class GradientDescentOpts private ( - var stepSize: Double, - var numIters: Int, - var regParam: Double, - var miniBatchFraction: Double) { - - def this() = this(1.0, 100, 0.0, 1.0) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } - - /** - * Set the regularization parameter used for SGD. Default 0.0. - */ - def setRegParam(regParam: Double) = { - this.regParam = regParam - this - } -} - -object GradientDescentOpts { - - def apply(stepSize: Double, numIters: Int, regParam: Double, miniBatchFraction: Double) = { - new GradientDescentOpts(stepSize, numIters, regParam, miniBatchFraction) - } - - def apply() = { - new GradientDescentOpts() - } - -} diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index bc711fd2d8..0af99c616d 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -19,6 +19,7 @@ package spark.mllib.classification import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import scala.math.round @@ -30,80 +31,46 @@ import org.jblas.DoubleMatrix * Based on Matlab code written by John Duchi. */ class LogisticRegressionModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept - round(1.0/ (1.0 + math.exp(margin * -1))).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Int](weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept round(1.0/ (1.0 + math.exp(margin * -1))).toInt } } -class LogisticRegression(val opts: GradientDescentOpts) extends Logging { +class LogisticRegressionWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Int, LogisticRegressionModel] + with GradientDescent with Serializable { + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() /** * Construct a LogisticRegression object with default parameters */ - def this() = this(new GradientDescentOpts()) - - def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): LogisticRegressionModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new LogisticGradient(), - new SimpleUpdater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 0.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LogisticRegressionModel(weights, intercept) } } /** * Top-level methods for calling Logistic Regression. - * NOTE(shivaram): We use multiple train methods instead of default arguments to support + * NOTE(shivaram): We use multiple train methods instead of default arguments to support * Java programs. */ -object LogisticRegression { +object LogisticRegressionWithSGD { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed @@ -126,8 +93,8 @@ object LogisticRegression { initialWeights: Array[Double]) : LogisticRegressionModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) - new LogisticRegression(sgdOpts).train(input, initialWeights) + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + input, initialWeights) } /** @@ -148,8 +115,8 @@ object LogisticRegression { miniBatchFraction: Double) : LogisticRegressionModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) - new LogisticRegression(sgdOpts).train(input) + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + input) } /** @@ -197,7 +164,7 @@ object LogisticRegression { } val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = LogisticRegression.train( + val model = LogisticRegressionWithSGD.train( data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index 1c137168b6..caf9e3cb93 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -20,6 +20,7 @@ package spark.mllib.classification import scala.math.signum import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix @@ -28,78 +29,42 @@ import org.jblas.DoubleMatrix * SVM using Stochastic Gradient Descent. */ class SVMModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - signum(dataMat.dot(weightsMatrix) + this.intercept).toInt + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Int](weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + signum(dataMatrix.dot(weightMatrix) + intercept).toInt } } +class SVMWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Int, SVMModel] with GradientDescent with Serializable { - -class SVM(val opts: GradientDescentOpts) extends Logging { + val gradient = new HingeGradient() + val updater = new SquaredL2Updater() /** * Construct a SVM object with default parameters */ - def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) - - def train(input: RDD[(Int, Array[Double])]): SVMModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): SVMModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new HingeGradient(), - new SquaredL2Updater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new SVMModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 1.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new SVMModel(weights, intercept) } } /** * Top-level methods for calling SVM. */ -object SVM { +object SVMWithSGD { /** * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number @@ -124,8 +89,8 @@ object SVM { initialWeights: Array[Double]) : SVMModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new SVM(sgdOpts).train(input, initialWeights) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + initialWeights) } /** @@ -147,8 +112,7 @@ object SVM { miniBatchFraction: Double) : SVMModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new SVM(sgdOpts).train(input) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) } /** @@ -196,7 +160,7 @@ object SVM { } val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = SVM.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 67451ff053..f7d09a2bd3 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,8 +24,66 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer -object GradientDescent { +trait GradientDescent extends Optimizer { + val gradient: Gradient + val updater: Updater + + var stepSize: Double + var numIterations: Int + var regParam: Double + var miniBatchFraction: Double + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double): this.type = { + this.stepSize = step + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter used for SGD. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]) + : Array[Double] = { + + val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD( + data, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFraction, + initialWeights) + weights + } + +} + +object GradientDescent extends Logging { /** * Run gradient descent in parallel using mini batches. * Based on Matlab code written by John Duchi. @@ -34,7 +92,7 @@ object GradientDescent { * @param gradient - Gradient object that will be used to compute the gradient. * @param updater - Updater object that will be used to update the model. * @param stepSize - stepSize to be used during update. - * @param numIters - number of iterations that SGD should be run. + * @param numIterations - number of iterations that SGD should be run. * @param regParam - regularization parameter * @param miniBatchFraction - fraction of the input data set that should be used for * one iteration of SGD. Default value 1.0. @@ -47,20 +105,23 @@ object GradientDescent { data: RDD[(Double, Array[Double])], gradient: Gradient, updater: Updater, - opts: GradientDescentOpts, + stepSize: Double, + numIterations: Int, + regParam: Double, + miniBatchFraction: Double, initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { - val stochasticLossHistory = new ArrayBuffer[Double](opts.numIters) + val stochasticLossHistory = new ArrayBuffer[Double](numIterations) val nexamples: Long = data.count() - val miniBatchSize = nexamples * opts.miniBatchFraction + val miniBatchSize = nexamples * miniBatchFraction // Initialize weights as a column vector var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) var regVal = 0.0 - for (i <- 1 to opts.numIters) { - val (gradientSum, lossSum) = data.sample(false, opts.miniBatchFraction, 42+i).map { + for (i <- 1 to numIterations) { + val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { case (y, features) => val featuresRow = new DoubleMatrix(features.length, 1, features:_*) val (grad, loss) = gradient.compute(featuresRow, y, weights) @@ -73,11 +134,14 @@ object GradientDescent { */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) val update = updater.compute( - weights, gradientSum.div(miniBatchSize), opts.stepSize, i, opts.regParam) + weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) weights = update._1 regVal = update._2 } + logInfo("GradientDescent finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + (weights.toArray, stochasticLossHistory.toArray) } } diff --git a/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala new file mode 100644 index 0000000000..76a519c338 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.optimization + +import spark.RDD + +trait Optimizer { + + /** + * Solve the provided convex optimization problem. + */ + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double] + +} diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala new file mode 100644 index 0000000000..0bbc9424e6 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import spark.{Logging, RDD, SparkContext, SparkException} +import spark.mllib.optimization._ +import spark.mllib.util.MLUtils + +import scala.math.round + +import org.jblas.DoubleMatrix + +/** + * GeneralizedLinearModel (GLM) represents a model trained using + * GeneralizedLinearAlgorithm. GLMs consist of a weight vector, + * an intercept. + */ +abstract class GeneralizedLinearModel[T: ClassManifest]( + val weights: Array[Double], + val intercept: Double) + extends Serializable { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + + def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double): T + + def predict(testData: spark.RDD[Array[Double]]): RDD[T] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + + testData.map { x => + val dataMatrix = new DoubleMatrix(1, x.length, x:_*) + predictPoint(dataMatrix, localWeights, localIntercept) + } + } + + def predict(testData: Array[Double]): T = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + predictPoint(dataMat, weightsMatrix, intercept) + } +} + +/** + * GeneralizedLinearAlgorithm abstracts out the training for all GLMs. + * This class should be mixed in with an Optimizer to create a new GLM. + * + * NOTE(shivaram): This is an abstract class rather than a trait as we use + * a view bound to convert labels to Double. + */ +abstract class GeneralizedLinearAlgorithm[T <% Double, M <: GeneralizedLinearModel[T]] + extends Logging with Serializable { + + // We need an optimizer mixin to solve the GLM + self : Optimizer => + + var addIntercept: Boolean + + def createModel(weights: Array[Double], intercept: Double): M + + /** + * Set if the algorithm should add an intercept. Default true. + */ + def setIntercept(addIntercept: Boolean): this.type = { + this.addIntercept = addIntercept + this + } + + def train(input: RDD[(T, Array[Double])])(implicit mt: Manifest[T]) : M = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(T, Array[Double])], + initialWeights: Array[Double])(implicit mt: Manifest[T]) + : M = { + + // Add a extra variable consisting of all 1.0's for the intercept. + val data = if (addIntercept) { + input.map { case (y, features) => + (y.toDouble, Array(1.0, features:_*)) + } + } else { + input.map { case (y, features) => + (y.toDouble, features) + } + } + + val initialWeightsWithIntercept = if (addIntercept) { + Array(1.0, initialWeights:_*) + } else { + initialWeights + } + + val weights = optimize(data, initialWeightsWithIntercept) + val intercept = weights(0) + val weightsScaled = weights.tail + + val model = createModel(weightsScaled, intercept) + + logInfo("Final model weights " + model.weights.mkString(",")) + logInfo("Final model intercept " + model.intercept) + model + } +} diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index 7f6fa8025c..f8b15033aa 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -28,78 +28,44 @@ import org.jblas.DoubleMatrix * */ class LassoModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends RegressionModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]) = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept - } - } - - - override def predict(testData: Array[Double]): Double = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - dataMat.dot(weightsMatrix) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Double](weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept } } -class Lasso(val opts: GradientDescentOpts) extends Logging { +class LassoWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Double, LassoModel] + with GradientDescent with Serializable { + + val gradient = new SquaredGradient() + val updater = new L1Updater() /** * Construct a Lasso object with default parameters */ - def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) - - def train(input: RDD[(Double, Array[Double])]): LassoModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Double, Array[Double])], - initialWeights: Array[Double]): LassoModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new SquaredGradient(), - new L1Updater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LassoModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 1.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LassoModel(weights, intercept) } } /** * Top-level methods for calling Lasso. */ -object Lasso { +object LassoWithSGD { /** * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number @@ -124,8 +90,8 @@ object Lasso { initialWeights: Array[Double]) : LassoModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new Lasso(sgdOpts).train(input, initialWeights) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + initialWeights) } /** @@ -147,8 +113,7 @@ object Lasso { miniBatchFraction: Double) : LassoModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new Lasso(sgdOpts).train(input) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) } /** @@ -196,7 +161,7 @@ object Lasso { } val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = Lasso.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index 439867d163..ee38486212 100644 --- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -80,8 +80,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(20) - val lr = new LogisticRegression(sgdOpts) + val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(20) val model = lr.train(testRDD) @@ -113,8 +112,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul testRDD.cache() // Use half as many iterations as the previous test. - val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(10) - val lr = new LogisticRegression(sgdOpts) + val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(10) val model = lr.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala index a624b42c38..1eef9387e3 100644 --- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala @@ -44,7 +44,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { seed: Int): Seq[(Int, Array[Double])] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) val y = x.map { xi => signum( (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + @@ -75,8 +76,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val svm = new SVM(sgdOpts) + val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.train(testRDD) @@ -106,8 +106,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val svm = new SVM(sgdOpts) + val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala index 531746ec02..ab1d07b879 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala @@ -44,10 +44,11 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { seed: Int): Seq[(Double, Array[Double])] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) val y = x.map(xi => (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() - ) + ) y zip x } @@ -72,8 +73,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val ls = new Lasso(sgdOpts) + val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) val model = ls.train(testRDD) @@ -109,8 +109,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val ls = new Lasso(sgdOpts) + val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) val model = ls.train(testRDD, initialWeights) -- cgit v1.2.3