diff options
author | Evan Sparks <evan.sparks@gmail.com> | 2013-08-11 10:52:55 -0700 |
---|---|---|
committer | Evan Sparks <evan.sparks@gmail.com> | 2013-08-11 10:52:55 -0700 |
commit | ff9ebfabb47e3439c7b78cb4e3c33423a1467a9a (patch) | |
tree | 8ce3ef1b12dbe1f0a8e9c64fc5a684bcab66e6bb /mllib/src/main/scala | |
parent | 95c62ca3060c89a44aa19aaab1fc9a9fff5a1196 (diff) | |
parent | a65a6ed5140446651916aff1761a9a755194eaf4 (diff) | |
download | spark-ff9ebfabb47e3439c7b78cb4e3c33423a1467a9a.tar.gz spark-ff9ebfabb47e3439c7b78cb4e3c33423a1467a9a.tar.bz2 spark-ff9ebfabb47e3439c7b78cb4e3c33423a1467a9a.zip |
Merge pull request #762 from shivaram/sgd-cleanup
Refactor SGD options into a new class.
Diffstat (limited to 'mllib/src/main/scala')
14 files changed, 389 insertions, 337 deletions
diff --git a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala index d6154b66ae..70fae8c15a 100644 --- a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala +++ b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala @@ -9,7 +9,7 @@ trait ClassificationModel extends Serializable { * @param testData RDD representing data points to be predicted * @return RDD[Int] where each entry contains the corresponding prediction */ - def predict(testData: RDD[Array[Double]]): RDD[Int] + def predict(testData: RDD[Array[Double]]): RDD[Double] /** * Predict values for a single data point using the model trained. @@ -17,5 +17,5 @@ trait ClassificationModel extends Serializable { * @param testData array representing a single data point * @return Int prediction from the trained model */ - def predict(testData: Array[Double]): Int + def predict(testData: Array[Double]): Double } diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 203aa8fdd4..73949b0103 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -19,6 +19,7 @@ package spark.mllib.classification import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import scala.math.round @@ -30,109 +31,49 @@ import org.jblas.DoubleMatrix * Based on Matlab code written by John Duchi. */ class LogisticRegressionModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept - round(1.0/ (1.0 + math.exp(margin * -1))).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept - round(1.0/ (1.0 + math.exp(margin * -1))).toInt + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept + round(1.0/ (1.0 + math.exp(margin * -1))) } } -class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double, - var numIters: Int) - extends Logging { - +class LogisticRegressionWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[LogisticRegressionModel] + with Serializable { + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) /** * Construct a LogisticRegression object with default parameters */ - def this() = this(1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } - - def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): LogisticRegressionModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new LogisticGradient(), - new SimpleUpdater(), - stepSize, - numIters, - 0.0, - initalWeightsWithIntercept, - miniBatchFraction) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 0.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LogisticRegressionModel(weights, intercept) } } /** * Top-level methods for calling Logistic Regression. - * NOTE(shivaram): We use multiple train methods instead of default arguments to support + * NOTE(shivaram): We use multiple train methods instead of default arguments to support * Java programs. */ -object LogisticRegressionLocalRandomSGD { +object LogisticRegressionWithSGD { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed @@ -148,14 +89,14 @@ object LogisticRegressionLocalRandomSGD { * the number of features in the data. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeights: Array[Double]) : LogisticRegressionModel = { - new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train( + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run( input, initialWeights) } @@ -171,13 +112,14 @@ object LogisticRegressionLocalRandomSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double) : LogisticRegressionModel = { - new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input) + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run( + input) } /** @@ -192,7 +134,7 @@ object LogisticRegressionLocalRandomSGD { * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double) : LogisticRegressionModel = @@ -210,7 +152,7 @@ object LogisticRegressionLocalRandomSGD { * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int) : LogisticRegressionModel = { @@ -218,15 +160,14 @@ object LogisticRegressionLocalRandomSGD { } def main(args: Array[String]) { - if (args.length != 5) { + if (args.length != 4) { println("Usage: LogisticRegression <master> <input_dir> <step_size> " + - "<regularization_parameter> <niters>") + "<niters>") System.exit(1) } val sc = new SparkContext(args(0), "LogisticRegression") - val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = LogisticRegressionLocalRandomSGD.train( - data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index 3a6a12814a..fa9d5a9471 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -20,6 +20,7 @@ package spark.mllib.classification import scala.math.signum import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix @@ -28,117 +29,45 @@ import org.jblas.DoubleMatrix * SVM using Stochastic Gradient Descent. */ class SVMModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - signum(dataMat.dot(weightsMatrix) + this.intercept).toInt + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + signum(dataMatrix.dot(weightMatrix) + intercept) } } - - -class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, - var miniBatchFraction: Double, var numIters: Int) - extends Logging { - +class SVMWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[SVMModel] with Serializable { + + val gradient = new HingeGradient() + val updater = new SquaredL2Updater() + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) /** * Construct a SVM object with default parameters */ - def this() = this(1.0, 1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set the regularization parameter. Default 1.0. - */ - def setRegParam(param: Double) = { - this.regParam = param - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } - - def train(input: RDD[(Int, Array[Double])]): SVMModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): SVMModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new HingeGradient(), - new SquaredL2Updater(), - stepSize, - numIters, - regParam, - initalWeightsWithIntercept, - miniBatchFraction) + def this() = this(1.0, 100, 1.0, 1.0, true) - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new SVMModel(weightsScaled, intercept, stochasticLosses) - - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new SVMModel(weights, intercept) } } /** * Top-level methods for calling SVM. - - */ -object SVMLocalRandomSGD { +object SVMWithSGD { /** * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number @@ -155,7 +84,7 @@ object SVMLocalRandomSGD { * the number of features in the data. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, @@ -163,8 +92,8 @@ object SVMLocalRandomSGD { initialWeights: Array[Double]) : SVMModel = { - new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train( - input, initialWeights) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input, + initialWeights) } /** @@ -179,14 +108,14 @@ object SVMLocalRandomSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double) : SVMModel = { - new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input) } /** @@ -201,7 +130,7 @@ object SVMLocalRandomSGD { * @return a SVMModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double) @@ -220,7 +149,7 @@ object SVMLocalRandomSGD { * @return a SVMModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int) : SVMModel = { @@ -233,8 +162,8 @@ object SVMLocalRandomSGD { System.exit(1) } val sc = new SparkContext(args(0), "SVM") - val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 19cda26446..1f04398d0c 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,10 +24,81 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer +class GradientDescent(var gradient: Gradient, var updater: Updater) extends Optimizer { -object GradientDescent { + var stepSize: Double = 1.0 + var numIterations: Int = 100 + var regParam: Double = 0.0 + var miniBatchFraction: Double = 1.0 /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double): this.type = { + this.stepSize = step + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter used for SGD. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + /** + * Set the gradient function to be used for SGD. + */ + def setGradient(gradient: Gradient): this.type = { + this.gradient = gradient + this + } + + + /** + * Set the updater function to be used for SGD. + */ + def setUpdater(updater: Updater): this.type = { + this.updater = updater + this + } + + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]) + : Array[Double] = { + + val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD( + data, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFraction, + initialWeights) + weights + } + +} + +object GradientDescent extends Logging { + /** * Run gradient descent in parallel using mini batches. * Based on Matlab code written by John Duchi. * @@ -35,7 +106,7 @@ object GradientDescent { * @param gradient - Gradient object that will be used to compute the gradient. * @param updater - Updater object that will be used to update the model. * @param stepSize - stepSize to be used during update. - * @param numIters - number of iterations that SGD should be run. + * @param numIterations - number of iterations that SGD should be run. * @param regParam - regularization parameter * @param miniBatchFraction - fraction of the input data set that should be used for * one iteration of SGD. Default value 1.0. @@ -49,12 +120,12 @@ object GradientDescent { gradient: Gradient, updater: Updater, stepSize: Double, - numIters: Int, + numIterations: Int, regParam: Double, - initialWeights: Array[Double], - miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = { + miniBatchFraction: Double, + initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { - val stochasticLossHistory = new ArrayBuffer[Double](numIters) + val stochasticLossHistory = new ArrayBuffer[Double](numIterations) val nexamples: Long = data.count() val miniBatchSize = nexamples * miniBatchFraction @@ -63,7 +134,7 @@ object GradientDescent { var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) var regVal = 0.0 - for (i <- 1 to numIters) { + for (i <- 1 to numIterations) { val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { case (y, features) => val featuresRow = new DoubleMatrix(features.length, 1, features:_*) @@ -76,11 +147,15 @@ object GradientDescent { * and regVal is the regularization value computed in the previous iteration as well. */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) - val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) + val update = updater.compute( + weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) weights = update._1 regVal = update._2 } + logInfo("GradientDescent finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + (weights.toArray, stochasticLossHistory.toArray) } } diff --git a/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala new file mode 100644 index 0000000000..76a519c338 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.optimization + +import spark.RDD + +trait Optimizer { + + /** + * Solve the provided convex optimization problem. + */ + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double] + +} diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala new file mode 100644 index 0000000000..8ea823b307 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import spark.{Logging, RDD} +import spark.mllib.optimization._ + +import org.jblas.DoubleMatrix + +/** + * GeneralizedLinearModel (GLM) represents a model trained using + * GeneralizedLinearAlgorithm. GLMs consist of a weight vector, + * an intercept. + */ +abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double) + extends Serializable { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + + /** + * Predict the result given a data point and the weights learned. + * + * @param dataMatrix Row vector containing the features for this data point + * @param weightMatrix Column vector containing the weights of the model + * @param intercept Intercept of the model. + */ + def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double): Double + + def predict(testData: spark.RDD[Array[Double]]): RDD[Double] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + + testData.map { x => + val dataMatrix = new DoubleMatrix(1, x.length, x:_*) + predictPoint(dataMatrix, localWeights, localIntercept) + } + } + + def predict(testData: Array[Double]): Double = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + predictPoint(dataMat, weightsMatrix, intercept) + } +} + +/** + * GeneralizedLinearAlgorithm abstracts out the training for all GLMs. + * This class should be extended with an Optimizer to create a new GLM. + */ +abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] + extends Logging with Serializable { + + val optimizer: Optimizer + + def createModel(weights: Array[Double], intercept: Double): M + + var addIntercept: Boolean + + /** + * Set if the algorithm should add an intercept. Default true. + */ + def setIntercept(addIntercept: Boolean): this.type = { + this.addIntercept = addIntercept + this + } + + def run(input: RDD[LabeledPoint]) : M = { + val nfeatures: Int = input.first().features.length + val initialWeights = Array.fill(nfeatures)(1.0) + run(input, initialWeights) + } + + def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = { + + // Add a extra variable consisting of all 1.0's for the intercept. + val data = if (addIntercept) { + input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*))) + } else { + input.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) + } + + val initialWeightsWithIntercept = if (addIntercept) { + Array(1.0, initialWeights:_*) + } else { + initialWeights + } + + val weights = optimizer.optimize(data, initialWeightsWithIntercept) + val intercept = weights(0) + val weightsScaled = weights.tail + + val model = createModel(weightsScaled, intercept) + + logInfo("Final model weights " + model.weights.mkString(",")) + logInfo("Final model intercept " + model.intercept) + model + } +} diff --git a/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala new file mode 100644 index 0000000000..3de60482c5 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +/** + * Class that represents the features and labels of a data point. + * + * @param label Label for this data point. + * @param features List of features for this data point. + */ +case class LabeledPoint(val label: Double, val features: Array[Double]) diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index e8b1ed8a48..989e5ded58 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -28,117 +28,48 @@ import org.jblas.DoubleMatrix * */ class LassoModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends RegressionModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]) = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept - } - } - - - override def predict(testData: Array[Double]): Double = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - dataMat.dot(weightsMatrix) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept } } -class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, - var miniBatchFraction: Double, var numIters: Int) - extends Logging { +class LassoWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[LassoModel] + with Serializable { - /** - * Construct a Lasso object with default parameters - */ - def this() = this(1.0, 1.0, 1.0, 100) + val gradient = new SquaredGradient() + val updater = new L1Updater() + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set the regularization parameter. Default 1.0. - */ - def setRegParam(param: Double) = { - this.regParam = param - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. + * Construct a Lasso object with default parameters */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } - - def train(input: RDD[(Double, Array[Double])]): LassoModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Double, Array[Double])], - initialWeights: Array[Double]): LassoModel = { + def this() = this(1.0, 100, 1.0, 1.0, true) - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new SquaredGradient(), - new L1Updater(), - stepSize, - numIters, - regParam, - initalWeightsWithIntercept, - miniBatchFraction) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LassoModel(weightsScaled, intercept, stochasticLosses) - - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LassoModel(weights, intercept) } } /** * Top-level methods for calling Lasso. - * - * */ -object LassoLocalRandomSGD { +object LassoWithSGD { /** * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number @@ -155,7 +86,7 @@ object LassoLocalRandomSGD { * the number of features in the data. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, @@ -163,8 +94,8 @@ object LassoLocalRandomSGD { initialWeights: Array[Double]) : LassoModel = { - new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train( - input, initialWeights) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input, + initialWeights) } /** @@ -179,14 +110,14 @@ object LassoLocalRandomSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double) : LassoModel = { - new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input) } /** @@ -201,7 +132,7 @@ object LassoLocalRandomSGD { * @return a LassoModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double) @@ -220,7 +151,7 @@ object LassoLocalRandomSGD { * @return a LassoModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int) : LassoModel = { @@ -234,7 +165,7 @@ object LassoLocalRandomSGD { } val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala index 6ba141e8fb..de790dde51 100644 --- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala @@ -71,7 +71,8 @@ class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double) this } - def train(input: RDD[(Double, Array[Double])]): RidgeRegressionModel = { + def train(inputLabeled: RDD[LabeledPoint]): RidgeRegressionModel = { + val input = inputLabeled.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) val nfeatures: Int = input.take(1)(0)._2.length val nexamples: Long = input.count() @@ -183,7 +184,7 @@ object RidgeRegression { * @param lambdaHigh upper bound used in binary search for lambda */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], lambdaLow: Double, lambdaHigh: Double) : RidgeRegressionModel = @@ -199,7 +200,7 @@ object RidgeRegression { * * @param input RDD of (response, array of features) pairs. */ - def train(input: RDD[(Double, Array[Double])]) : RidgeRegressionModel = { + def train(input: RDD[LabeledPoint]) : RidgeRegressionModel = { train(input, 0.0, 100.0) } diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala index ef4f42a494..1f185c9de7 100644 --- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala @@ -29,14 +29,14 @@ object LassoGenerator { val trueWeights = new DoubleMatrix(1, nfeatures+1, Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*) - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx => + val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx => val rnd = new Random(42 + idx) val x = Array.fill[Double](nfeatures) { rnd.nextDouble() * 2.0 - 1.0 } val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1 - (y, x) + LabeledPoint(y, x) } MLUtils.saveLabeledData(data, outputPath) diff --git a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala index 8d659cd97c..4fa19c3c23 100644 --- a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala @@ -20,6 +20,7 @@ package spark.mllib.util import scala.util.Random import spark.{RDD, SparkContext} +import spark.mllib.regression.LabeledPoint object LogisticRegressionDataGenerator { @@ -40,7 +41,7 @@ object LogisticRegressionDataGenerator { nfeatures: Int, eps: Double, nparts: Int = 2, - probOne: Double = 0.5): RDD[(Double, Array[Double])] = { + probOne: Double = 0.5): RDD[LabeledPoint] = { val data = sc.parallelize(0 until nexamples, nparts).map { idx => val rnd = new Random(42 + idx) @@ -48,7 +49,7 @@ object LogisticRegressionDataGenerator { val x = Array.fill[Double](nfeatures) { rnd.nextGaussian() + (y * eps) } - (y, x) + LabeledPoint(y, x) } data } diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala index 25d9673004..9174e8cea7 100644 --- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala @@ -21,6 +21,7 @@ import spark.{RDD, SparkContext} import spark.SparkContext._ import org.jblas.DoubleMatrix +import spark.mllib.regression.LabeledPoint /** * Helper methods to load and save data @@ -36,17 +37,17 @@ object MLUtils { * @return An RDD of tuples. For each tuple, the first element is the label, and the second * element represents the feature values (an array of Double). */ - def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = { + def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => val parts = line.split(',') val label = parts(0).toDouble val features = parts(1).trim().split(' ').map(_.toDouble) - (label, features) + LabeledPoint(label, features) } } - def saveLabeledData(data: RDD[(Double, Array[Double])], dir: String) { - val dataStr = data.map(x => x._1 + "," + x._2.mkString(" ")) + def saveLabeledData(data: RDD[LabeledPoint], dir: String) { + val dataStr = data.map(x => x.label + "," + x.features.mkString(" ")) dataStr.saveAsTextFile(dir) } diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala index c5b8a29942..c4d65c3f9a 100644 --- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala @@ -22,6 +22,7 @@ import scala.util.Random import org.jblas.DoubleMatrix import spark.{RDD, SparkContext} +import spark.mllib.regression.LabeledPoint object RidgeRegressionDataGenerator { @@ -41,14 +42,14 @@ object RidgeRegressionDataGenerator { nexamples: Int, nfeatures: Int, eps: Double, - nparts: Int = 2) : RDD[(Double, Array[Double])] = { + nparts: Int = 2) : RDD[LabeledPoint] = { org.jblas.util.Random.seed(42) // Random values distributed uniformly in [-0.5, 0.5] val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) w.put(0, 0, 10) w.put(1, 0, 10) - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nparts, nparts).flatMap { p => + val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => org.jblas.util.Random.seed(42 + p) val examplesInPartition = nexamples / nparts @@ -61,7 +62,7 @@ object RidgeRegressionDataGenerator { val yObs = new DoubleMatrix(normalValues).addi(y) Iterator.tabulate(examplesInPartition) { i => - (yObs.get(i, 0), X.getRow(i).toArray) + LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray) } } data diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala index 00a54d9a70..a37f6eb3b3 100644 --- a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala @@ -9,6 +9,7 @@ import spark.{RDD, SparkContext} import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix +import spark.mllib.regression.LabeledPoint object SVMGenerator { @@ -32,14 +33,14 @@ object SVMGenerator { val trueWeights = new DoubleMatrix(1, nfeatures+1, Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*) - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx => + val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx => val rnd = new Random(42 + idx) val x = Array.fill[Double](nfeatures) { rnd.nextDouble() * 2.0 - 1.0 } val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1) - (y, x) + LabeledPoint(y, x) } MLUtils.saveLabeledData(data, outputPath) |