From cef178873b04960c36647d9899fcd13715fef62c Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 31 Jul 2013 14:15:17 -0700 Subject: Refactor SGD options into a new class. This refactoring pulls out code shared between SVM, Lasso, LR into a common GradientDescentOpts class. Some style cleanup as well --- .../scala/spark/mllib/GradientDescentOpts.scala | 76 ++++++++++++++++++++++ .../mllib/classification/LogisticRegression.scala | 48 +++----------- .../scala/spark/mllib/classification/SVM.scala | 58 +++-------------- .../spark/mllib/optimization/GradientDescent.scala | 19 +++--- .../main/scala/spark/mllib/regression/Lasso.scala | 58 +++-------------- 5 files changed, 114 insertions(+), 145 deletions(-) create mode 100644 mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala (limited to 'mllib/src/main/scala') diff --git a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala new file mode 100644 index 0000000000..d9c2be2a19 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.optimization + +/** + * Class used to configure options used for GradientDescent based optimization + * algorithms. + */ + +class GradientDescentOpts private ( + var stepSize: Double, + var numIters: Int, + var regParam: Double, + var miniBatchFraction: Double) { + + def this() = this(1.0, 100, 0.0, 1.0) + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double) = { + this.stepSize = step + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double) = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int) = { + this.numIters = iters + this + } + + /** + * Set the regularization parameter used for SGD. Default 0.0. + */ + def setRegParam(regParam: Double) = { + this.regParam = regParam + this + } +} + +object GradientDescentOpts { + + def apply(stepSize: Double, numIters: Int, regParam: Double, miniBatchFraction: Double) = { + new GradientDescentOpts(stepSize, numIters, regParam, miniBatchFraction) + } + + def apply() = { + new GradientDescentOpts() + } + +} diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 203aa8fdd4..bc711fd2d8 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -55,38 +55,12 @@ class LogisticRegressionModel( } } -class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double, - var numIters: Int) - extends Logging { +class LogisticRegression(val opts: GradientDescentOpts) extends Logging { /** * Construct a LogisticRegression object with default parameters */ - def this() = this(1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } + def this() = this(new GradientDescentOpts()) def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { val nfeatures: Int = input.take(1)(0)._2.length @@ -109,11 +83,8 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa data, new LogisticGradient(), new SimpleUpdater(), - stepSize, - numIters, - 0.0, - initalWeightsWithIntercept, - miniBatchFraction) + opts, + initalWeightsWithIntercept) val intercept = weights(0) val weightsScaled = weights.tail @@ -132,7 +103,7 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa * NOTE(shivaram): We use multiple train methods instead of default arguments to support * Java programs. */ -object LogisticRegressionLocalRandomSGD { +object LogisticRegression { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed @@ -155,8 +126,8 @@ object LogisticRegressionLocalRandomSGD { initialWeights: Array[Double]) : LogisticRegressionModel = { - new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train( - input, initialWeights) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) + new LogisticRegression(sgdOpts).train(input, initialWeights) } /** @@ -177,7 +148,8 @@ object LogisticRegressionLocalRandomSGD { miniBatchFraction: Double) : LogisticRegressionModel = { - new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) + new LogisticRegression(sgdOpts).train(input) } /** @@ -225,7 +197,7 @@ object LogisticRegressionLocalRandomSGD { } val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = LogisticRegressionLocalRandomSGD.train( + val model = LogisticRegression.train( data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index 3a6a12814a..1c137168b6 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -53,46 +53,12 @@ class SVMModel( -class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, - var miniBatchFraction: Double, var numIters: Int) - extends Logging { +class SVM(val opts: GradientDescentOpts) extends Logging { /** * Construct a SVM object with default parameters */ - def this() = this(1.0, 1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set the regularization parameter. Default 1.0. - */ - def setRegParam(param: Double) = { - this.regParam = param - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } + def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) def train(input: RDD[(Int, Array[Double])]): SVMModel = { val nfeatures: Int = input.take(1)(0)._2.length @@ -115,11 +81,8 @@ class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, data, new HingeGradient(), new SquaredL2Updater(), - stepSize, - numIters, - regParam, - initalWeightsWithIntercept, - miniBatchFraction) + opts, + initalWeightsWithIntercept) val intercept = weights(0) val weightsScaled = weights.tail @@ -135,10 +98,8 @@ class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, /** * Top-level methods for calling SVM. - - */ -object SVMLocalRandomSGD { +object SVM { /** * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number @@ -163,8 +124,8 @@ object SVMLocalRandomSGD { initialWeights: Array[Double]) : SVMModel = { - new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train( - input, initialWeights) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new SVM(sgdOpts).train(input, initialWeights) } /** @@ -186,7 +147,8 @@ object SVMLocalRandomSGD { miniBatchFraction: Double) : SVMModel = { - new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new SVM(sgdOpts).train(input) } /** @@ -234,7 +196,7 @@ object SVMLocalRandomSGD { } val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = SVM.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 19cda26446..67451ff053 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,7 +24,6 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer - object GradientDescent { /** @@ -48,23 +47,20 @@ object GradientDescent { data: RDD[(Double, Array[Double])], gradient: Gradient, updater: Updater, - stepSize: Double, - numIters: Int, - regParam: Double, - initialWeights: Array[Double], - miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = { + opts: GradientDescentOpts, + initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { - val stochasticLossHistory = new ArrayBuffer[Double](numIters) + val stochasticLossHistory = new ArrayBuffer[Double](opts.numIters) val nexamples: Long = data.count() - val miniBatchSize = nexamples * miniBatchFraction + val miniBatchSize = nexamples * opts.miniBatchFraction // Initialize weights as a column vector var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) var regVal = 0.0 - for (i <- 1 to numIters) { - val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { + for (i <- 1 to opts.numIters) { + val (gradientSum, lossSum) = data.sample(false, opts.miniBatchFraction, 42+i).map { case (y, features) => val featuresRow = new DoubleMatrix(features.length, 1, features:_*) val (grad, loss) = gradient.compute(featuresRow, y, weights) @@ -76,7 +72,8 @@ object GradientDescent { * and regVal is the regularization value computed in the previous iteration as well. */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) - val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) + val update = updater.compute( + weights, gradientSum.div(miniBatchSize), opts.stepSize, i, opts.regParam) weights = update._1 regVal = update._2 } diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index e8b1ed8a48..7f6fa8025c 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -53,46 +53,12 @@ class LassoModel( } -class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, - var miniBatchFraction: Double, var numIters: Int) - extends Logging { +class Lasso(val opts: GradientDescentOpts) extends Logging { /** * Construct a Lasso object with default parameters */ - def this() = this(1.0, 1.0, 1.0, 100) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set the regularization parameter. Default 1.0. - */ - def setRegParam(param: Double) = { - this.regParam = param - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } + def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) def train(input: RDD[(Double, Array[Double])]): LassoModel = { val nfeatures: Int = input.take(1)(0)._2.length @@ -115,11 +81,8 @@ class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, data, new SquaredGradient(), new L1Updater(), - stepSize, - numIters, - regParam, - initalWeightsWithIntercept, - miniBatchFraction) + opts, + initalWeightsWithIntercept) val intercept = weights(0) val weightsScaled = weights.tail @@ -135,10 +98,8 @@ class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, /** * Top-level methods for calling Lasso. - * - * */ -object LassoLocalRandomSGD { +object Lasso { /** * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number @@ -163,8 +124,8 @@ object LassoLocalRandomSGD { initialWeights: Array[Double]) : LassoModel = { - new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train( - input, initialWeights) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new Lasso(sgdOpts).train(input, initialWeights) } /** @@ -186,7 +147,8 @@ object LassoLocalRandomSGD { miniBatchFraction: Double) : LassoModel = { - new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) + new Lasso(sgdOpts).train(input) } /** @@ -234,7 +196,7 @@ object LassoLocalRandomSGD { } val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = Lasso.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } -- cgit v1.2.3 From 00339cc0328b692417ad0b9aeb23d522edbb93e5 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Fri, 2 Aug 2013 19:15:34 -0700 Subject: Refactor optimizers and create GLMs This change refactors the structure of GLMs to use mixins which maintain a similar interface to other ML lib algorithms. This change also creates an Optimizer trait which allows GLMs to be extended to use other optimization techniques. --- .../scala/spark/mllib/GradientDescentOpts.scala | 76 ------------- .../mllib/classification/LogisticRegression.scala | 93 +++++---------- .../scala/spark/mllib/classification/SVM.scala | 88 +++++---------- .../spark/mllib/optimization/GradientDescent.scala | 80 +++++++++++-- .../scala/spark/mllib/optimization/Optimizer.scala | 29 +++++ .../regression/GeneralizedLinearAlgorithm.scala | 125 +++++++++++++++++++++ .../main/scala/spark/mllib/regression/Lasso.scala | 89 +++++---------- .../classification/LogisticRegressionSuite.scala | 6 +- .../spark/mllib/classification/SVMSuite.scala | 9 +- .../scala/spark/mllib/regression/LassoSuite.scala | 11 +- 10 files changed, 320 insertions(+), 286 deletions(-) delete mode 100644 mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala create mode 100644 mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala create mode 100644 mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala (limited to 'mllib/src/main/scala') diff --git a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala b/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala deleted file mode 100644 index d9c2be2a19..0000000000 --- a/mllib/src/main/scala/spark/mllib/GradientDescentOpts.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.optimization - -/** - * Class used to configure options used for GradientDescent based optimization - * algorithms. - */ - -class GradientDescentOpts private ( - var stepSize: Double, - var numIters: Int, - var regParam: Double, - var miniBatchFraction: Double) { - - def this() = this(1.0, 100, 0.0, 1.0) - - /** - * Set the step size per-iteration of SGD. Default 1.0. - */ - def setStepSize(step: Double) = { - this.stepSize = step - this - } - - /** - * Set fraction of data to be used for each SGD iteration. Default 1.0. - */ - def setMiniBatchFraction(fraction: Double) = { - this.miniBatchFraction = fraction - this - } - - /** - * Set the number of iterations for SGD. Default 100. - */ - def setNumIterations(iters: Int) = { - this.numIters = iters - this - } - - /** - * Set the regularization parameter used for SGD. Default 0.0. - */ - def setRegParam(regParam: Double) = { - this.regParam = regParam - this - } -} - -object GradientDescentOpts { - - def apply(stepSize: Double, numIters: Int, regParam: Double, miniBatchFraction: Double) = { - new GradientDescentOpts(stepSize, numIters, regParam, miniBatchFraction) - } - - def apply() = { - new GradientDescentOpts() - } - -} diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index bc711fd2d8..0af99c616d 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -19,6 +19,7 @@ package spark.mllib.classification import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import scala.math.round @@ -30,80 +31,46 @@ import org.jblas.DoubleMatrix * Based on Matlab code written by John Duchi. */ class LogisticRegressionModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept - round(1.0/ (1.0 + math.exp(margin * -1))).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Int](weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept round(1.0/ (1.0 + math.exp(margin * -1))).toInt } } -class LogisticRegression(val opts: GradientDescentOpts) extends Logging { +class LogisticRegressionWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Int, LogisticRegressionModel] + with GradientDescent with Serializable { + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() /** * Construct a LogisticRegression object with default parameters */ - def this() = this(new GradientDescentOpts()) - - def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): LogisticRegressionModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new LogisticGradient(), - new SimpleUpdater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 0.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LogisticRegressionModel(weights, intercept) } } /** * Top-level methods for calling Logistic Regression. - * NOTE(shivaram): We use multiple train methods instead of default arguments to support + * NOTE(shivaram): We use multiple train methods instead of default arguments to support * Java programs. */ -object LogisticRegression { +object LogisticRegressionWithSGD { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed @@ -126,8 +93,8 @@ object LogisticRegression { initialWeights: Array[Double]) : LogisticRegressionModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) - new LogisticRegression(sgdOpts).train(input, initialWeights) + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + input, initialWeights) } /** @@ -148,8 +115,8 @@ object LogisticRegression { miniBatchFraction: Double) : LogisticRegressionModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, 0.0, miniBatchFraction) - new LogisticRegression(sgdOpts).train(input) + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + input) } /** @@ -197,7 +164,7 @@ object LogisticRegression { } val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = LogisticRegression.train( + val model = LogisticRegressionWithSGD.train( data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index 1c137168b6..caf9e3cb93 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -20,6 +20,7 @@ package spark.mllib.classification import scala.math.signum import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ +import spark.mllib.regression._ import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix @@ -28,78 +29,42 @@ import org.jblas.DoubleMatrix * SVM using Stochastic Gradient Descent. */ class SVMModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends ClassificationModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt - } - } - - override def predict(testData: Array[Double]): Int = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - signum(dataMat.dot(weightsMatrix) + this.intercept).toInt + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Int](weights, intercept) + with ClassificationModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + signum(dataMatrix.dot(weightMatrix) + intercept).toInt } } +class SVMWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Int, SVMModel] with GradientDescent with Serializable { - -class SVM(val opts: GradientDescentOpts) extends Logging { + val gradient = new HingeGradient() + val updater = new SquaredL2Updater() /** * Construct a SVM object with default parameters */ - def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) - - def train(input: RDD[(Int, Array[Double])]): SVMModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Int, Array[Double])], - initialWeights: Array[Double]): SVMModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new HingeGradient(), - new SquaredL2Updater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new SVMModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 1.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new SVMModel(weights, intercept) } } /** * Top-level methods for calling SVM. */ -object SVM { +object SVMWithSGD { /** * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number @@ -124,8 +89,8 @@ object SVM { initialWeights: Array[Double]) : SVMModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new SVM(sgdOpts).train(input, initialWeights) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + initialWeights) } /** @@ -147,8 +112,7 @@ object SVM { miniBatchFraction: Double) : SVMModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new SVM(sgdOpts).train(input) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) } /** @@ -196,7 +160,7 @@ object SVM { } val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = SVM.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 67451ff053..f7d09a2bd3 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,8 +24,66 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer -object GradientDescent { +trait GradientDescent extends Optimizer { + val gradient: Gradient + val updater: Updater + + var stepSize: Double + var numIterations: Int + var regParam: Double + var miniBatchFraction: Double + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double): this.type = { + this.stepSize = step + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter used for SGD. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]) + : Array[Double] = { + + val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD( + data, + gradient, + updater, + stepSize, + numIterations, + regParam, + miniBatchFraction, + initialWeights) + weights + } + +} + +object GradientDescent extends Logging { /** * Run gradient descent in parallel using mini batches. * Based on Matlab code written by John Duchi. @@ -34,7 +92,7 @@ object GradientDescent { * @param gradient - Gradient object that will be used to compute the gradient. * @param updater - Updater object that will be used to update the model. * @param stepSize - stepSize to be used during update. - * @param numIters - number of iterations that SGD should be run. + * @param numIterations - number of iterations that SGD should be run. * @param regParam - regularization parameter * @param miniBatchFraction - fraction of the input data set that should be used for * one iteration of SGD. Default value 1.0. @@ -47,20 +105,23 @@ object GradientDescent { data: RDD[(Double, Array[Double])], gradient: Gradient, updater: Updater, - opts: GradientDescentOpts, + stepSize: Double, + numIterations: Int, + regParam: Double, + miniBatchFraction: Double, initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { - val stochasticLossHistory = new ArrayBuffer[Double](opts.numIters) + val stochasticLossHistory = new ArrayBuffer[Double](numIterations) val nexamples: Long = data.count() - val miniBatchSize = nexamples * opts.miniBatchFraction + val miniBatchSize = nexamples * miniBatchFraction // Initialize weights as a column vector var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) var regVal = 0.0 - for (i <- 1 to opts.numIters) { - val (gradientSum, lossSum) = data.sample(false, opts.miniBatchFraction, 42+i).map { + for (i <- 1 to numIterations) { + val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map { case (y, features) => val featuresRow = new DoubleMatrix(features.length, 1, features:_*) val (grad, loss) = gradient.compute(featuresRow, y, weights) @@ -73,11 +134,14 @@ object GradientDescent { */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) val update = updater.compute( - weights, gradientSum.div(miniBatchSize), opts.stepSize, i, opts.regParam) + weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) weights = update._1 regVal = update._2 } + logInfo("GradientDescent finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + (weights.toArray, stochasticLossHistory.toArray) } } diff --git a/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala new file mode 100644 index 0000000000..76a519c338 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.optimization + +import spark.RDD + +trait Optimizer { + + /** + * Solve the provided convex optimization problem. + */ + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double] + +} diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala new file mode 100644 index 0000000000..0bbc9424e6 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import spark.{Logging, RDD, SparkContext, SparkException} +import spark.mllib.optimization._ +import spark.mllib.util.MLUtils + +import scala.math.round + +import org.jblas.DoubleMatrix + +/** + * GeneralizedLinearModel (GLM) represents a model trained using + * GeneralizedLinearAlgorithm. GLMs consist of a weight vector, + * an intercept. + */ +abstract class GeneralizedLinearModel[T: ClassManifest]( + val weights: Array[Double], + val intercept: Double) + extends Serializable { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + + def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double): T + + def predict(testData: spark.RDD[Array[Double]]): RDD[T] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + + testData.map { x => + val dataMatrix = new DoubleMatrix(1, x.length, x:_*) + predictPoint(dataMatrix, localWeights, localIntercept) + } + } + + def predict(testData: Array[Double]): T = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + predictPoint(dataMat, weightsMatrix, intercept) + } +} + +/** + * GeneralizedLinearAlgorithm abstracts out the training for all GLMs. + * This class should be mixed in with an Optimizer to create a new GLM. + * + * NOTE(shivaram): This is an abstract class rather than a trait as we use + * a view bound to convert labels to Double. + */ +abstract class GeneralizedLinearAlgorithm[T <% Double, M <: GeneralizedLinearModel[T]] + extends Logging with Serializable { + + // We need an optimizer mixin to solve the GLM + self : Optimizer => + + var addIntercept: Boolean + + def createModel(weights: Array[Double], intercept: Double): M + + /** + * Set if the algorithm should add an intercept. Default true. + */ + def setIntercept(addIntercept: Boolean): this.type = { + this.addIntercept = addIntercept + this + } + + def train(input: RDD[(T, Array[Double])])(implicit mt: Manifest[T]) : M = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(T, Array[Double])], + initialWeights: Array[Double])(implicit mt: Manifest[T]) + : M = { + + // Add a extra variable consisting of all 1.0's for the intercept. + val data = if (addIntercept) { + input.map { case (y, features) => + (y.toDouble, Array(1.0, features:_*)) + } + } else { + input.map { case (y, features) => + (y.toDouble, features) + } + } + + val initialWeightsWithIntercept = if (addIntercept) { + Array(1.0, initialWeights:_*) + } else { + initialWeights + } + + val weights = optimize(data, initialWeightsWithIntercept) + val intercept = weights(0) + val weightsScaled = weights.tail + + val model = createModel(weightsScaled, intercept) + + logInfo("Final model weights " + model.weights.mkString(",")) + logInfo("Final model intercept " + model.intercept) + model + } +} diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index 7f6fa8025c..f8b15033aa 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -28,78 +28,44 @@ import org.jblas.DoubleMatrix * */ class LassoModel( - val weights: Array[Double], - val intercept: Double, - val stochasticLosses: Array[Double]) extends RegressionModel { - - // Create a column vector that can be used for predictions - private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - - override def predict(testData: spark.RDD[Array[Double]]) = { - // A small optimization to avoid serializing the entire model. Only the weightsMatrix - // and intercept is needed. - val localWeights = weightsMatrix - val localIntercept = intercept - testData.map { x => - new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept - } - } - - - override def predict(testData: Array[Double]): Double = { - val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - dataMat.dot(weightsMatrix) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel[Double](weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept } } -class Lasso(val opts: GradientDescentOpts) extends Logging { +class LassoWithSGD ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[Double, LassoModel] + with GradientDescent with Serializable { + + val gradient = new SquaredGradient() + val updater = new L1Updater() /** * Construct a Lasso object with default parameters */ - def this() = this(GradientDescentOpts(1.0, 100, 1.0, 1.0)) - - def train(input: RDD[(Double, Array[Double])]): LassoModel = { - val nfeatures: Int = input.take(1)(0)._2.length - val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) - } - - def train( - input: RDD[(Double, Array[Double])], - initialWeights: Array[Double]): LassoModel = { - - // Add a extra variable consisting of all 1.0's for the intercept. - val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) - } - - val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - - val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( - data, - new SquaredGradient(), - new L1Updater(), - opts, - initalWeightsWithIntercept) - - val intercept = weights(0) - val weightsScaled = weights.tail - - val model = new LassoModel(weightsScaled, intercept, stochasticLosses) + def this() = this(1.0, 100, 1.0, 1.0, true) - logInfo("Final model weights " + model.weights.mkString(",")) - logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) - model + def createModel(weights: Array[Double], intercept: Double) = { + new LassoModel(weights, intercept) } } /** * Top-level methods for calling Lasso. */ -object Lasso { +object LassoWithSGD { /** * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number @@ -124,8 +90,8 @@ object Lasso { initialWeights: Array[Double]) : LassoModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new Lasso(sgdOpts).train(input, initialWeights) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + initialWeights) } /** @@ -147,8 +113,7 @@ object Lasso { miniBatchFraction: Double) : LassoModel = { - val sgdOpts = GradientDescentOpts(stepSize, numIterations, regParam, miniBatchFraction) - new Lasso(sgdOpts).train(input) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) } /** @@ -196,7 +161,7 @@ object Lasso { } val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = Lasso.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index 439867d163..ee38486212 100644 --- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -80,8 +80,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(20) - val lr = new LogisticRegression(sgdOpts) + val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(20) val model = lr.train(testRDD) @@ -113,8 +112,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul testRDD.cache() // Use half as many iterations as the previous test. - val sgdOpts = GradientDescentOpts().setStepSize(10.0).setNumIterations(10) - val lr = new LogisticRegression(sgdOpts) + val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(10) val model = lr.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala index a624b42c38..1eef9387e3 100644 --- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala @@ -44,7 +44,8 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { seed: Int): Seq[(Int, Array[Double])] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) val y = x.map { xi => signum( (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + @@ -75,8 +76,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val svm = new SVM(sgdOpts) + val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.train(testRDD) @@ -106,8 +106,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val svm = new SVM(sgdOpts) + val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) val model = svm.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala index 531746ec02..ab1d07b879 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala @@ -44,10 +44,11 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { seed: Int): Seq[(Double, Array[Double])] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) val y = x.map(xi => (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() - ) + ) y zip x } @@ -72,8 +73,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val ls = new Lasso(sgdOpts) + val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) val model = ls.train(testRDD) @@ -109,8 +109,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val sgdOpts = GradientDescentOpts().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val ls = new Lasso(sgdOpts) + val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) val model = ls.train(testRDD, initialWeights) -- cgit v1.2.3 From 7388e27668800b2c958b75e13d24f0d2baebe23d Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sat, 3 Aug 2013 18:08:43 -0700 Subject: Move implicit arg to constructor for Java access. --- .../spark/mllib/regression/GeneralizedLinearAlgorithm.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'mllib/src/main/scala') diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 0bbc9424e6..7e80737773 100644 --- a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -66,7 +66,10 @@ abstract class GeneralizedLinearModel[T: ClassManifest]( * NOTE(shivaram): This is an abstract class rather than a trait as we use * a view bound to convert labels to Double. */ -abstract class GeneralizedLinearAlgorithm[T <% Double, M <: GeneralizedLinearModel[T]] +abstract class GeneralizedLinearAlgorithm[T, M](implicit + t: T => Double, + tManifest: Manifest[T], + methodEv: M <:< GeneralizedLinearModel[T]) extends Logging with Serializable { // We need an optimizer mixin to solve the GLM @@ -84,15 +87,15 @@ abstract class GeneralizedLinearAlgorithm[T <% Double, M <: GeneralizedLinearMod this } - def train(input: RDD[(T, Array[Double])])(implicit mt: Manifest[T]) : M = { - val nfeatures: Int = input.take(1)(0)._2.length + def train(input: RDD[(T, Array[Double])]) : M = { + val nfeatures: Int = input.first()._2.length val initialWeights = Array.fill(nfeatures)(1.0) train(input, initialWeights) } def train( input: RDD[(T, Array[Double])], - initialWeights: Array[Double])(implicit mt: Manifest[T]) + initialWeights: Array[Double]) : M = { // Add a extra variable consisting of all 1.0's for the intercept. -- cgit v1.2.3 From 7db69d56f2d050842ecf6e465d2d4f1abf3314d7 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Tue, 6 Aug 2013 17:23:22 -0700 Subject: Refactor GLM algorithms and add Java tests This change adds Java examples and unit tests for all GLM algorithms to make sure the MLLib interface works from Java. Changes include - Introduce LabeledPoint and avoid using Doubles in train arguments - Rename train to run in class methods - Make the optimizer a member variable of GLM to make sure the builder pattern works --- .../src/main/java/spark/mllib/examples/JavaLR.java | 85 +++++++++++++++++++ .../mllib/classification/ClassificationModel.scala | 4 +- .../mllib/classification/LogisticRegression.scala | 34 ++++---- .../scala/spark/mllib/classification/SVM.scala | 25 +++--- .../spark/mllib/optimization/GradientDescent.scala | 13 ++- .../regression/GeneralizedLinearAlgorithm.scala | 60 ++++++------- .../spark/mllib/regression/LabeledPoint.scala | 32 +++++++ .../main/scala/spark/mllib/regression/Lasso.scala | 22 +++-- .../spark/mllib/regression/RidgeRegression.scala | 7 +- .../spark/mllib/util/LassoDataGenerator.scala | 4 +- .../util/LogisticRegressionDataGenerator.scala | 5 +- .../src/main/scala/spark/mllib/util/MLUtils.scala | 9 +- .../mllib/util/RidgeRegressionDataGenerator.scala | 7 +- .../scala/spark/mllib/util/SVMDataGenerator.scala | 5 +- .../JavaLogisticRegressionSuite.java | 98 ++++++++++++++++++++++ .../spark/mllib/classification/JavaSVMSuite.java | 98 ++++++++++++++++++++++ .../classification/LogisticRegressionSuite.scala | 62 ++++++++------ .../spark/mllib/classification/SVMSuite.scala | 62 +++++++++----- .../spark/mllib/regression/JavaLassoSuite.java | 96 +++++++++++++++++++++ .../scala/spark/mllib/regression/LassoSuite.scala | 64 ++++++++------ .../mllib/regression/RidgeRegressionSuite.scala | 2 +- project/SparkBuild.scala | 2 +- 22 files changed, 626 insertions(+), 170 deletions(-) create mode 100644 examples/src/main/java/spark/mllib/examples/JavaLR.java create mode 100644 mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala create mode 100644 mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java create mode 100644 mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java create mode 100644 mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java (limited to 'mllib/src/main/scala') diff --git a/examples/src/main/java/spark/mllib/examples/JavaLR.java b/examples/src/main/java/spark/mllib/examples/JavaLR.java new file mode 100644 index 0000000000..e11f4830a8 --- /dev/null +++ b/examples/src/main/java/spark/mllib/examples/JavaLR.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.examples; + + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.Function; + +import spark.mllib.classification.LogisticRegressionWithSGD; +import spark.mllib.classification.LogisticRegressionModel; +import spark.mllib.regression.LabeledPoint; + +import java.util.Arrays; +import java.util.StringTokenizer; + +/** + * Logistic regression based classification using ML Lib. + */ +public class JavaLR { + + static class ParsePoint extends Function { + public LabeledPoint call(String line) { + String[] parts = line.split(","); + Double y = Double.parseDouble(parts[0]); + StringTokenizer tok = new StringTokenizer(parts[1], " "); + int numTokens = tok.countTokens(); + double[] x = new double[numTokens]; + for (int i = 0; i < numTokens; ++i) { + x[i] = Double.parseDouble(tok.nextToken()); + } + return new LabeledPoint(y, x); + } + } + + public static void printWeights(double[] a) { + System.out.println(Arrays.toString(a)); + } + + public static void main(String[] args) { + if (args.length != 4) { + System.err.println("Usage: JavaLR "); + System.exit(1); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); + JavaRDD lines = sc.textFile(args[1]); + JavaRDD points = lines.map(new ParsePoint()).cache(); + double stepSize = Double.parseDouble(args[2]); + int iterations = Integer.parseInt(args[3]); + + // Another way to configure LogisticRegression + // + // LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD(); + // lr.optimizer().setNumIterations(iterations) + // .setStepSize(stepSize) + // .setMiniBatchFraction(1.0); + // lr.setIntercept(true); + // LogisticRegressionModel model = lr.train(points.rdd()); + + LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(), + iterations, stepSize); + + System.out.print("Final w: "); + printWeights(model.weights()); + + System.exit(0); + } +} diff --git a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala index d6154b66ae..70fae8c15a 100644 --- a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala +++ b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala @@ -9,7 +9,7 @@ trait ClassificationModel extends Serializable { * @param testData RDD representing data points to be predicted * @return RDD[Int] where each entry contains the corresponding prediction */ - def predict(testData: RDD[Array[Double]]): RDD[Int] + def predict(testData: RDD[Array[Double]]): RDD[Double] /** * Predict values for a single data point using the model trained. @@ -17,5 +17,5 @@ trait ClassificationModel extends Serializable { * @param testData array representing a single data point * @return Int prediction from the trained model */ - def predict(testData: Array[Double]): Int + def predict(testData: Array[Double]): Double } diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 0af99c616d..73949b0103 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -33,13 +33,13 @@ import org.jblas.DoubleMatrix class LogisticRegressionModel( override val weights: Array[Double], override val intercept: Double) - extends GeneralizedLinearModel[Int](weights, intercept) + extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable { override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, intercept: Double) = { val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept - round(1.0/ (1.0 + math.exp(margin * -1))).toInt + round(1.0/ (1.0 + math.exp(margin * -1))) } } @@ -49,12 +49,15 @@ class LogisticRegressionWithSGD ( var regParam: Double, var miniBatchFraction: Double, var addIntercept: Boolean) - extends GeneralizedLinearAlgorithm[Int, LogisticRegressionModel] - with GradientDescent with Serializable { + extends GeneralizedLinearAlgorithm[LogisticRegressionModel] + with Serializable { val gradient = new LogisticGradient() val updater = new SimpleUpdater() - + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) /** * Construct a LogisticRegression object with default parameters */ @@ -86,14 +89,14 @@ object LogisticRegressionWithSGD { * the number of features in the data. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeights: Array[Double]) : LogisticRegressionModel = { - new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run( input, initialWeights) } @@ -109,13 +112,13 @@ object LogisticRegressionWithSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, miniBatchFraction: Double) : LogisticRegressionModel = { - new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).train( + new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run( input) } @@ -131,7 +134,7 @@ object LogisticRegressionWithSGD { * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double) : LogisticRegressionModel = @@ -149,7 +152,7 @@ object LogisticRegressionWithSGD { * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int) : LogisticRegressionModel = { @@ -157,15 +160,14 @@ object LogisticRegressionWithSGD { } def main(args: Array[String]) { - if (args.length != 5) { + if (args.length != 4) { println("Usage: LogisticRegression " + - " ") + "") System.exit(1) } val sc = new SparkContext(args(0), "LogisticRegression") - val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) - val model = LogisticRegressionWithSGD.train( - data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index caf9e3cb93..fa9d5a9471 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -31,12 +31,12 @@ import org.jblas.DoubleMatrix class SVMModel( override val weights: Array[Double], override val intercept: Double) - extends GeneralizedLinearModel[Int](weights, intercept) + extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable { override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, intercept: Double) = { - signum(dataMatrix.dot(weightMatrix) + intercept).toInt + signum(dataMatrix.dot(weightMatrix) + intercept) } } @@ -46,11 +46,14 @@ class SVMWithSGD private ( var regParam: Double, var miniBatchFraction: Double, var addIntercept: Boolean) - extends GeneralizedLinearAlgorithm[Int, SVMModel] with GradientDescent with Serializable { + extends GeneralizedLinearAlgorithm[SVMModel] with Serializable { val gradient = new HingeGradient() val updater = new SquaredL2Updater() - + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) /** * Construct a SVM object with default parameters */ @@ -81,7 +84,7 @@ object SVMWithSGD { * the number of features in the data. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, @@ -89,7 +92,7 @@ object SVMWithSGD { initialWeights: Array[Double]) : SVMModel = { - new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input, initialWeights) } @@ -105,14 +108,14 @@ object SVMWithSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double) : SVMModel = { - new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) + new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input) } /** @@ -127,7 +130,7 @@ object SVMWithSGD { * @return a SVMModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double) @@ -146,7 +149,7 @@ object SVMWithSGD { * @return a SVMModel which has the weights and offset from training. */ def train( - input: RDD[(Int, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int) : SVMModel = { @@ -159,7 +162,7 @@ object SVMWithSGD { System.exit(1) } val sc = new SparkContext(args(0), "SVM") - val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) + val data = MLUtils.loadLabeledData(sc, args(1)) val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index f7d09a2bd3..54793ca74d 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,15 +24,12 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer -trait GradientDescent extends Optimizer { +class GradientDescent(gradient: Gradient, updater: Updater) extends Optimizer { - val gradient: Gradient - val updater: Updater - - var stepSize: Double - var numIterations: Int - var regParam: Double - var miniBatchFraction: Double + var stepSize: Double = 1.0 + var numIterations: Int = 100 + var regParam: Double = 0.0 + var miniBatchFraction: Double = 1.0 /** * Set the step size per-iteration of SGD. Default 1.0. diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 7e80737773..03a7755541 100644 --- a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -17,11 +17,8 @@ package spark.mllib.regression -import spark.{Logging, RDD, SparkContext, SparkException} +import spark.{Logging, RDD} import spark.mllib.optimization._ -import spark.mllib.util.MLUtils - -import scala.math.round import org.jblas.DoubleMatrix @@ -30,18 +27,23 @@ import org.jblas.DoubleMatrix * GeneralizedLinearAlgorithm. GLMs consist of a weight vector, * an intercept. */ -abstract class GeneralizedLinearModel[T: ClassManifest]( - val weights: Array[Double], - val intercept: Double) +abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double) extends Serializable { // Create a column vector that can be used for predictions private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + /** + * Predict the result given a data point and the weights learned. + * + * @param dataMatrix Row vector containing the features for this data point + * @param weightMatrix Column vector containing the weights of the model + * @param intercept Intercept of the model. + */ def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, - intercept: Double): T + intercept: Double): Double - def predict(testData: spark.RDD[Array[Double]]): RDD[T] = { + def predict(testData: spark.RDD[Array[Double]]): RDD[Double] = { // A small optimization to avoid serializing the entire model. Only the weightsMatrix // and intercept is needed. val localWeights = weightsMatrix @@ -53,7 +55,7 @@ abstract class GeneralizedLinearModel[T: ClassManifest]( } } - def predict(testData: Array[Double]): T = { + def predict(testData: Array[Double]): Double = { val dataMat = new DoubleMatrix(1, testData.length, testData:_*) predictPoint(dataMat, weightsMatrix, intercept) } @@ -61,24 +63,18 @@ abstract class GeneralizedLinearModel[T: ClassManifest]( /** * GeneralizedLinearAlgorithm abstracts out the training for all GLMs. - * This class should be mixed in with an Optimizer to create a new GLM. - * - * NOTE(shivaram): This is an abstract class rather than a trait as we use - * a view bound to convert labels to Double. + * This class should be extended with an Optimizer to create a new GLM. */ -abstract class GeneralizedLinearAlgorithm[T, M](implicit - t: T => Double, - tManifest: Manifest[T], - methodEv: M <:< GeneralizedLinearModel[T]) +abstract class GeneralizedLinearAlgorithm[M](implicit + methodEv: M <:< GeneralizedLinearModel) extends Logging with Serializable { - // We need an optimizer mixin to solve the GLM - self : Optimizer => - - var addIntercept: Boolean + val optimizer: Optimizer def createModel(weights: Array[Double], intercept: Double): M + var addIntercept: Boolean + /** * Set if the algorithm should add an intercept. Default true. */ @@ -87,26 +83,22 @@ abstract class GeneralizedLinearAlgorithm[T, M](implicit this } - def train(input: RDD[(T, Array[Double])]) : M = { - val nfeatures: Int = input.first()._2.length + def run(input: RDD[LabeledPoint]) : M = { + val nfeatures: Int = input.first().features.length val initialWeights = Array.fill(nfeatures)(1.0) - train(input, initialWeights) + run(input, initialWeights) } - def train( - input: RDD[(T, Array[Double])], + def run( + input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = { // Add a extra variable consisting of all 1.0's for the intercept. val data = if (addIntercept) { - input.map { case (y, features) => - (y.toDouble, Array(1.0, features:_*)) - } + input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*))) } else { - input.map { case (y, features) => - (y.toDouble, features) - } + input.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) } val initialWeightsWithIntercept = if (addIntercept) { @@ -115,7 +107,7 @@ abstract class GeneralizedLinearAlgorithm[T, M](implicit initialWeights } - val weights = optimize(data, initialWeightsWithIntercept) + val weights = optimizer.optimize(data, initialWeightsWithIntercept) val intercept = weights(0) val weightsScaled = weights.tail diff --git a/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala new file mode 100644 index 0000000000..592f0b5414 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +/** + * Class that represents the features and labels of a data point. + * + * @param label Label for this data point. + * @param features List of features for this data point. + */ +case class LabeledPoint(val label: Double, val features: Array[Double]) { + + /** + * Construct a labeled point using java.lang.Double. + */ + def this(label: java.lang.Double, features: Array[Double]) = this(label.doubleValue(), features) +} diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index f8b15033aa..989e5ded58 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -30,7 +30,7 @@ import org.jblas.DoubleMatrix class LassoModel( override val weights: Array[Double], override val intercept: Double) - extends GeneralizedLinearModel[Double](weights, intercept) + extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, @@ -46,11 +46,15 @@ class LassoWithSGD ( var regParam: Double, var miniBatchFraction: Double, var addIntercept: Boolean) - extends GeneralizedLinearAlgorithm[Double, LassoModel] - with GradientDescent with Serializable { + extends GeneralizedLinearAlgorithm[LassoModel] + with Serializable { val gradient = new SquaredGradient() val updater = new L1Updater() + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) /** * Construct a Lasso object with default parameters @@ -82,7 +86,7 @@ object LassoWithSGD { * the number of features in the data. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, @@ -90,7 +94,7 @@ object LassoWithSGD { initialWeights: Array[Double]) : LassoModel = { - new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input, + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input, initialWeights) } @@ -106,14 +110,14 @@ object LassoWithSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double) : LassoModel = { - new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).train(input) + new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input) } /** @@ -128,7 +132,7 @@ object LassoWithSGD { * @return a LassoModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int, stepSize: Double, regParam: Double) @@ -147,7 +151,7 @@ object LassoWithSGD { * @return a LassoModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], numIterations: Int) : LassoModel = { diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala index 6ba141e8fb..de790dde51 100644 --- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala @@ -71,7 +71,8 @@ class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double) this } - def train(input: RDD[(Double, Array[Double])]): RidgeRegressionModel = { + def train(inputLabeled: RDD[LabeledPoint]): RidgeRegressionModel = { + val input = inputLabeled.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) val nfeatures: Int = input.take(1)(0)._2.length val nexamples: Long = input.count() @@ -183,7 +184,7 @@ object RidgeRegression { * @param lambdaHigh upper bound used in binary search for lambda */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[LabeledPoint], lambdaLow: Double, lambdaHigh: Double) : RidgeRegressionModel = @@ -199,7 +200,7 @@ object RidgeRegression { * * @param input RDD of (response, array of features) pairs. */ - def train(input: RDD[(Double, Array[Double])]) : RidgeRegressionModel = { + def train(input: RDD[LabeledPoint]) : RidgeRegressionModel = { train(input, 0.0, 100.0) } diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala index ef4f42a494..1f185c9de7 100644 --- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala @@ -29,14 +29,14 @@ object LassoGenerator { val trueWeights = new DoubleMatrix(1, nfeatures+1, Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*) - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx => + val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx => val rnd = new Random(42 + idx) val x = Array.fill[Double](nfeatures) { rnd.nextDouble() * 2.0 - 1.0 } val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1 - (y, x) + LabeledPoint(y, x) } MLUtils.saveLabeledData(data, outputPath) diff --git a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala index 8d659cd97c..4fa19c3c23 100644 --- a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala @@ -20,6 +20,7 @@ package spark.mllib.util import scala.util.Random import spark.{RDD, SparkContext} +import spark.mllib.regression.LabeledPoint object LogisticRegressionDataGenerator { @@ -40,7 +41,7 @@ object LogisticRegressionDataGenerator { nfeatures: Int, eps: Double, nparts: Int = 2, - probOne: Double = 0.5): RDD[(Double, Array[Double])] = { + probOne: Double = 0.5): RDD[LabeledPoint] = { val data = sc.parallelize(0 until nexamples, nparts).map { idx => val rnd = new Random(42 + idx) @@ -48,7 +49,7 @@ object LogisticRegressionDataGenerator { val x = Array.fill[Double](nfeatures) { rnd.nextGaussian() + (y * eps) } - (y, x) + LabeledPoint(y, x) } data } diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala index b5e564df6d..e45eda2c99 100644 --- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala @@ -21,6 +21,7 @@ import spark.{RDD, SparkContext} import spark.SparkContext._ import org.jblas.DoubleMatrix +import spark.mllib.regression.LabeledPoint /** * Helper methods to load and save data @@ -36,17 +37,17 @@ object MLUtils { * @return An RDD of tuples. For each tuple, the first element is the label, and the second * element represents the feature values (an array of Double). */ - def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = { + def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => val parts = line.split(",") val label = parts(0).toDouble val features = parts(1).trim().split(" ").map(_.toDouble) - (label, features) + LabeledPoint(label, features) } } - def saveLabeledData(data: RDD[(Double, Array[Double])], dir: String) { - val dataStr = data.map(x => x._1 + "," + x._2.mkString(" ")) + def saveLabeledData(data: RDD[LabeledPoint], dir: String) { + val dataStr = data.map(x => x.label + "," + x.features.mkString(" ")) dataStr.saveAsTextFile(dir) } diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala index c5b8a29942..c4d65c3f9a 100644 --- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala @@ -22,6 +22,7 @@ import scala.util.Random import org.jblas.DoubleMatrix import spark.{RDD, SparkContext} +import spark.mllib.regression.LabeledPoint object RidgeRegressionDataGenerator { @@ -41,14 +42,14 @@ object RidgeRegressionDataGenerator { nexamples: Int, nfeatures: Int, eps: Double, - nparts: Int = 2) : RDD[(Double, Array[Double])] = { + nparts: Int = 2) : RDD[LabeledPoint] = { org.jblas.util.Random.seed(42) // Random values distributed uniformly in [-0.5, 0.5] val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) w.put(0, 0, 10) w.put(1, 0, 10) - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nparts, nparts).flatMap { p => + val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => org.jblas.util.Random.seed(42 + p) val examplesInPartition = nexamples / nparts @@ -61,7 +62,7 @@ object RidgeRegressionDataGenerator { val yObs = new DoubleMatrix(normalValues).addi(y) Iterator.tabulate(examplesInPartition) { i => - (yObs.get(i, 0), X.getRow(i).toArray) + LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray) } } data diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala index 00a54d9a70..a37f6eb3b3 100644 --- a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala @@ -9,6 +9,7 @@ import spark.{RDD, SparkContext} import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix +import spark.mllib.regression.LabeledPoint object SVMGenerator { @@ -32,14 +33,14 @@ object SVMGenerator { val trueWeights = new DoubleMatrix(1, nfeatures+1, Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*) - val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx => + val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx => val rnd = new Random(42 + idx) val x = Array.fill[Double](nfeatures) { rnd.nextDouble() * 2.0 - 1.0 } val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1) - (y, x) + LabeledPoint(y, x) } MLUtils.saveLabeledData(data, outputPath) diff --git a/mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java new file mode 100644 index 0000000000..e0ebd45cd8 --- /dev/null +++ b/mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.classification; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +import spark.mllib.regression.LabeledPoint; + +public class JavaLogisticRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLogisticRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, LogisticRegressionModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + if (prediction == point.label()) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLRUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double B = -1.5; + + JavaRDD testRDD = sc.parallelize( + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); + List validationData = + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); + + LogisticRegressionWithSGD lrImpl = new LogisticRegressionWithSGD(); + lrImpl.optimizer().setStepSize(1.0) + .setRegParam(1.0) + .setNumIterations(100); + LogisticRegressionModel model = lrImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLRUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double B = -1.5; + + JavaRDD testRDD = sc.parallelize( + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); + List validationData = + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); + + LogisticRegressionModel model = LogisticRegressionWithSGD.train( + testRDD.rdd(), 100, 1.0, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java new file mode 100644 index 0000000000..7881b3c38f --- /dev/null +++ b/mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.classification; + + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +import spark.mllib.regression.LabeledPoint; + +public class JavaSVMSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaSVMSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, SVMModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + if (prediction == point.label()) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runSVMUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0}; + + JavaRDD testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); + + SVMWithSGD svmSGDImpl = new SVMWithSGD(); + svmSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(1.0) + .setNumIterations(100); + SVMModel model = svmSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runSVMUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0}; + + JavaRDD testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); + + SVMModel model = SVMWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index ee38486212..16bd2c6b38 100644 --- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -18,21 +18,23 @@ package spark.mllib.classification import scala.util.Random +import scala.collection.JavaConversions._ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers import spark.SparkContext -import spark.mllib.optimization._ +import spark.mllib.regression._ +object LogisticRegressionSuite { -class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { - val sc = new SparkContext("local", "test") - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") + def generateLogisticInputAsList( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLogisticInput(offset, scale, nPoints, seed)) } // Generate input of the form Y = logistic(offset + scale*X) @@ -40,7 +42,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul offset: Double, scale: Double, nPoints: Int, - seed: Int): Seq[(Int, Array[Double])] = { + seed: Int): Seq[LabeledPoint] = { val rnd = new Random(seed) val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) @@ -58,13 +60,23 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul if (yVal > 0) 1 else 0 } - val testData = (0 until nPoints).map(i => (y(i), Array(x1(i)))) + val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i)))) testData } - def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) { - val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => - (prediction != expected) +} + +class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { + val sc = new SparkContext("local", "test") + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + (prediction != expected.label) }.size // At least 83% of the predictions should be on. ((input.length - numOffPredictions).toDouble / input.length) should be > 0.83 @@ -76,26 +88,27 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul val A = 2.0 val B = -1.5 - val testData = generateLogisticInput(A, B, nPoints, 42) + val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42) val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(20) + val lr = new LogisticRegressionWithSGD() + lr.optimizer.setStepSize(10.0).setNumIterations(20) - val model = lr.train(testRDD) + val model = lr.run(testRDD) // Test the weights val weight0 = model.weights(0) assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") - val validationData = generateLogisticInput(A, B, nPoints, 17) + val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17) val validationRDD = sc.parallelize(validationData, 2) // Test prediction on RDD. - validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) // Test prediction on Array. - validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } test("logistic regression with initial weights") { @@ -103,7 +116,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul val A = 2.0 val B = -1.5 - val testData = generateLogisticInput(A, B, nPoints, 42) + val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42) val initialB = -1.0 val initialWeights = Array(initialB) @@ -112,20 +125,21 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul testRDD.cache() // Use half as many iterations as the previous test. - val lr = new LogisticRegressionWithSGD().setStepSize(10.0).setNumIterations(10) + val lr = new LogisticRegressionWithSGD() + lr.optimizer.setStepSize(10.0).setNumIterations(10) - val model = lr.train(testRDD, initialWeights) + val model = lr.run(testRDD, initialWeights) val weight0 = model.weights(0) assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") - val validationData = generateLogisticInput(A, B, nPoints, 17) + val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17) val validationRDD = sc.parallelize(validationData, 2) // Test prediction on RDD. - validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) // Test prediction on Array. - validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } } diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala index 1eef9387e3..9e0970812d 100644 --- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala @@ -19,21 +19,24 @@ package spark.mllib.classification import scala.util.Random import scala.math.signum +import scala.collection.JavaConversions._ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import spark.SparkContext -import spark.mllib.optimization._ +import spark.mllib.regression._ import org.jblas.DoubleMatrix -class SVMSuite extends FunSuite with BeforeAndAfterAll { - val sc = new SparkContext("local", "test") +object SVMSuite { - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") + def generateSVMInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed)) } // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) @@ -41,7 +44,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { intercept: Double, weights: Array[Double], nPoints: Int, - seed: Int): Seq[(Int, Array[Double])] = { + seed: Int): Seq[LabeledPoint] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) val x = Array.fill[Array[Double]](nPoints)( @@ -53,17 +56,28 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { 0.1 * rnd.nextGaussian() ).toInt } - y.zip(x) + y.zip(x).map(p => LabeledPoint(p._1, p._2)) } - def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) { - val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => - (prediction != expected) +} + +class SVMSuite extends FunSuite with BeforeAndAfterAll { + val sc = new SparkContext("local", "test") + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + (prediction != expected.label) }.size // At least 80% of the predictions should be on. assert(numOffPredictions < input.length / 5) } + test("SVM using local random SGD") { val nPoints = 10000 @@ -71,23 +85,24 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val B = -1.5 val C = 1.0 - val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42) + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + val svm = new SVMWithSGD() + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val model = svm.train(testRDD) + val model = svm.run(testRDD) - val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) val validationRDD = sc.parallelize(validationData,2) // Test prediction on RDD. - validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) // Test prediction on Array. - validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } test("SVM local random SGD with initial weights") { @@ -97,7 +112,7 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val B = -1.5 val C = 1.0 - val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42) + val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42) val initialB = -1.0 val initialC = -1.0 @@ -106,17 +121,18 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val svm = new SVMWithSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + val svm = new SVMWithSGD() + svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) - val model = svm.train(testRDD, initialWeights) + val model = svm.run(testRDD, initialWeights) - val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17) val validationRDD = sc.parallelize(validationData,2) // Test prediction on RDD. - validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) // Test prediction on Array. - validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } } diff --git a/mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java new file mode 100644 index 0000000000..e26d7b385c --- /dev/null +++ b/mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +public class JavaLassoSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLassoSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, LassoModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + // A prediction is off if the prediction is more than 0.5 away from expected value. + if (Math.abs(prediction - point.label()) <= 0.5) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLassoUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); + + LassoWithSGD svmSGDImpl = new LassoWithSGD(); + svmSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(0.01) + .setNumIterations(20); + LassoModel model = svmSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLassoUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); + + LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala index ab1d07b879..b9ada2b1ec 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala @@ -17,31 +17,33 @@ package spark.mllib.regression +import scala.collection.JavaConversions._ import scala.util.Random import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import spark.SparkContext -import spark.mllib.optimization._ import org.jblas.DoubleMatrix +object LassoSuite { -class LassoSuite extends FunSuite with BeforeAndAfterAll { - val sc = new SparkContext("local", "test") - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") + def generateLassoInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLassoInput(intercept, weights, nPoints, seed)) } + // Generate noisy input of the form Y = x.dot(weights) + intercept + noise def generateLassoInput( intercept: Double, weights: Array[Double], nPoints: Int, - seed: Int): Seq[(Double, Array[Double])] = { + seed: Int): Seq[LabeledPoint] = { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) val x = Array.fill[Array[Double]](nPoints)( @@ -49,13 +51,23 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val y = x.map(xi => (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() ) - y zip x + y.zip(x).map(p => LabeledPoint(p._1, p._2)) } - def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) { - val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => +} + +class LassoSuite extends FunSuite with BeforeAndAfterAll { + val sc = new SparkContext("local", "test") + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => // A prediction is off if the prediction is more than 0.5 away from expected value. - math.abs(prediction - expected) > 0.5 + math.abs(prediction - expected.label) > 0.5 }.size // At least 80% of the predictions should be on. assert(numOffPredictions < input.length / 5) @@ -68,14 +80,15 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val B = -1.5 val C = 1.0e-2 - val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42) + val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42) val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + val ls = new LassoWithSGD() + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val model = ls.train(testRDD) + val model = ls.run(testRDD) val weight0 = model.weights(0) val weight1 = model.weights(1) @@ -83,14 +96,14 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") - val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17) - val validationRDD = sc.parallelize(validationData,2) + val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) // Test prediction on RDD. - validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) // Test prediction on Array. - validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } test("Lasso local random SGD with initial weights") { @@ -100,7 +113,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val B = -1.5 val C = 1.0e-2 - val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42) + val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42) val initialB = -1.0 val initialC = -1.0 @@ -109,9 +122,10 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val ls = new LassoWithSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + val ls = new LassoWithSGD() + ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20) - val model = ls.train(testRDD, initialWeights) + val model = ls.run(testRDD, initialWeights) val weight0 = model.weights(0) val weight1 = model.weights(1) @@ -119,13 +133,13 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") - val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17) + val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17) val validationRDD = sc.parallelize(validationData,2) // Test prediction on RDD. - validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) // Test prediction on Array. - validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } } diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala index 3c588c6162..4c4900658f 100644 --- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala @@ -47,7 +47,7 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { val xMat = (0 until 20).map(i => Array(x1(i), x2(i))).toArray val y = xMat.map(i => 3 + i(0) + i(1)) - val testData = (0 until 20).map(i => (y(i), xMat(i))).toArray + val testData = (0 until 20).map(i => LabeledPoint(y(i), xMat(i))).toArray val testRDD = sc.parallelize(testData, 2) testRDD.cache() diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 9920e00a67..3cb0fd7c43 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -46,7 +46,7 @@ object SparkBuild extends Build { lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) - lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) + lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib) lazy val tools = Project("tools", file("tools"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) -- cgit v1.2.3 From 2812e722008b772756cbd0ef0600a55b65d6ee0e Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 8 Aug 2013 16:24:31 -0700 Subject: Add setters for optimizer, gradient in SGD. Also remove java-specific constructor for LabeledPoint. --- .../src/main/java/spark/mllib/examples/JavaLR.java | 2 +- .../spark/mllib/optimization/GradientDescent.scala | 19 ++++++++++++++++++- .../scala/spark/mllib/regression/LabeledPoint.scala | 8 +------- 3 files changed, 20 insertions(+), 9 deletions(-) (limited to 'mllib/src/main/scala') diff --git a/examples/src/main/java/spark/mllib/examples/JavaLR.java b/examples/src/main/java/spark/mllib/examples/JavaLR.java index e11f4830a8..bf4aeaf40f 100644 --- a/examples/src/main/java/spark/mllib/examples/JavaLR.java +++ b/examples/src/main/java/spark/mllib/examples/JavaLR.java @@ -37,7 +37,7 @@ public class JavaLR { static class ParsePoint extends Function { public LabeledPoint call(String line) { String[] parts = line.split(","); - Double y = Double.parseDouble(parts[0]); + double y = Double.parseDouble(parts[0]); StringTokenizer tok = new StringTokenizer(parts[1], " "); int numTokens = tok.countTokens(); double[] x = new double[numTokens]; diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 54793ca74d..1f04398d0c 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -24,7 +24,7 @@ import org.jblas.DoubleMatrix import scala.collection.mutable.ArrayBuffer -class GradientDescent(gradient: Gradient, updater: Updater) extends Optimizer { +class GradientDescent(var gradient: Gradient, var updater: Updater) extends Optimizer { var stepSize: Double = 1.0 var numIterations: Int = 100 @@ -63,6 +63,23 @@ class GradientDescent(gradient: Gradient, updater: Updater) extends Optimizer { this } + /** + * Set the gradient function to be used for SGD. + */ + def setGradient(gradient: Gradient): this.type = { + this.gradient = gradient + this + } + + + /** + * Set the updater function to be used for SGD. + */ + def setUpdater(updater: Updater): this.type = { + this.updater = updater + this + } + def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]) : Array[Double] = { diff --git a/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala index 592f0b5414..3de60482c5 100644 --- a/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala @@ -23,10 +23,4 @@ package spark.mllib.regression * @param label Label for this data point. * @param features List of features for this data point. */ -case class LabeledPoint(val label: Double, val features: Array[Double]) { - - /** - * Construct a labeled point using java.lang.Double. - */ - def this(label: java.lang.Double, features: Array[Double]) = this(label.doubleValue(), features) -} +case class LabeledPoint(val label: Double, val features: Array[Double]) -- cgit v1.2.3 From a65a6ed5140446651916aff1761a9a755194eaf4 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sat, 10 Aug 2013 18:54:10 -0700 Subject: Fix GLM code review comments and move java tests --- .../regression/GeneralizedLinearAlgorithm.scala | 8 +- .../JavaLogisticRegressionSuite.java | 98 ++++++++++++++++++++++ .../spark/mllib/classification/JavaSVMSuite.java | 98 ++++++++++++++++++++++ .../spark/mllib/regression/JavaLassoSuite.java | 96 +++++++++++++++++++++ .../JavaLogisticRegressionSuite.java | 98 ---------------------- .../spark/mllib/classification/JavaSVMSuite.java | 98 ---------------------- .../spark/mllib/regression/JavaLassoSuite.java | 96 --------------------- 7 files changed, 294 insertions(+), 298 deletions(-) create mode 100644 mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java create mode 100644 mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java create mode 100644 mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java delete mode 100644 mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java delete mode 100644 mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java delete mode 100644 mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java (limited to 'mllib/src/main/scala') diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 03a7755541..8ea823b307 100644 --- a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -65,8 +65,7 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: * GeneralizedLinearAlgorithm abstracts out the training for all GLMs. * This class should be extended with an Optimizer to create a new GLM. */ -abstract class GeneralizedLinearAlgorithm[M](implicit - methodEv: M <:< GeneralizedLinearModel) +abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] extends Logging with Serializable { val optimizer: Optimizer @@ -89,10 +88,7 @@ abstract class GeneralizedLinearAlgorithm[M](implicit run(input, initialWeights) } - def run( - input: RDD[LabeledPoint], - initialWeights: Array[Double]) - : M = { + def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = { // Add a extra variable consisting of all 1.0's for the intercept. val data = if (addIntercept) { diff --git a/mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java new file mode 100644 index 0000000000..e0ebd45cd8 --- /dev/null +++ b/mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.classification; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +import spark.mllib.regression.LabeledPoint; + +public class JavaLogisticRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLogisticRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, LogisticRegressionModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + if (prediction == point.label()) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLRUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double B = -1.5; + + JavaRDD testRDD = sc.parallelize( + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); + List validationData = + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); + + LogisticRegressionWithSGD lrImpl = new LogisticRegressionWithSGD(); + lrImpl.optimizer().setStepSize(1.0) + .setRegParam(1.0) + .setNumIterations(100); + LogisticRegressionModel model = lrImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLRUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double B = -1.5; + + JavaRDD testRDD = sc.parallelize( + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); + List validationData = + LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); + + LogisticRegressionModel model = LogisticRegressionWithSGD.train( + testRDD.rdd(), 100, 1.0, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java new file mode 100644 index 0000000000..7881b3c38f --- /dev/null +++ b/mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.classification; + + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +import spark.mllib.regression.LabeledPoint; + +public class JavaSVMSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaSVMSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, SVMModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + if (prediction == point.label()) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runSVMUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0}; + + JavaRDD testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); + + SVMWithSGD svmSGDImpl = new SVMWithSGD(); + svmSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(1.0) + .setNumIterations(100); + SVMModel model = svmSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runSVMUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0}; + + JavaRDD testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); + + SVMModel model = SVMWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java new file mode 100644 index 0000000000..e26d7b385c --- /dev/null +++ b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +public class JavaLassoSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLassoSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, LassoModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + // A prediction is off if the prediction is more than 0.5 away from expected value. + if (Math.abs(prediction - point.label()) <= 0.5) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLassoUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); + + LassoWithSGD svmSGDImpl = new LassoWithSGD(); + svmSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(0.01) + .setNumIterations(20); + LassoModel model = svmSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLassoUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); + + LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java deleted file mode 100644 index e0ebd45cd8..0000000000 --- a/mllib/src/test/scala/spark/mllib/classification/JavaLogisticRegressionSuite.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.classification; - -import java.io.Serializable; -import java.util.List; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; - -import spark.mllib.regression.LabeledPoint; - -public class JavaLogisticRegressionSuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaLogisticRegressionSuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - System.clearProperty("spark.driver.port"); - } - - int validatePrediction(List validationData, LogisticRegressionModel model) { - int numAccurate = 0; - for (LabeledPoint point: validationData) { - Double prediction = model.predict(point.features()); - if (prediction == point.label()) { - numAccurate++; - } - } - return numAccurate; - } - - @Test - public void runLRUsingConstructor() { - int nPoints = 10000; - double A = 2.0; - double B = -1.5; - - JavaRDD testRDD = sc.parallelize( - LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); - List validationData = - LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); - - LogisticRegressionWithSGD lrImpl = new LogisticRegressionWithSGD(); - lrImpl.optimizer().setStepSize(1.0) - .setRegParam(1.0) - .setNumIterations(100); - LogisticRegressionModel model = lrImpl.run(testRDD.rdd()); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - - @Test - public void runLRUsingStaticMethods() { - int nPoints = 10000; - double A = 2.0; - double B = -1.5; - - JavaRDD testRDD = sc.parallelize( - LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache(); - List validationData = - LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17); - - LogisticRegressionModel model = LogisticRegressionWithSGD.train( - testRDD.rdd(), 100, 1.0, 1.0); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - -} diff --git a/mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java deleted file mode 100644 index 7881b3c38f..0000000000 --- a/mllib/src/test/scala/spark/mllib/classification/JavaSVMSuite.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.classification; - - -import java.io.Serializable; -import java.util.List; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; - -import spark.mllib.regression.LabeledPoint; - -public class JavaSVMSuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaSVMSuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - System.clearProperty("spark.driver.port"); - } - - int validatePrediction(List validationData, SVMModel model) { - int numAccurate = 0; - for (LabeledPoint point: validationData) { - Double prediction = model.predict(point.features()); - if (prediction == point.label()) { - numAccurate++; - } - } - return numAccurate; - } - - @Test - public void runSVMUsingConstructor() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0}; - - JavaRDD testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); - - SVMWithSGD svmSGDImpl = new SVMWithSGD(); - svmSGDImpl.optimizer().setStepSize(1.0) - .setRegParam(1.0) - .setNumIterations(100); - SVMModel model = svmSGDImpl.run(testRDD.rdd()); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - - @Test - public void runSVMUsingStaticMethods() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0}; - - JavaRDD testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17); - - SVMModel model = SVMWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0, 1.0); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - -} diff --git a/mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java deleted file mode 100644 index e26d7b385c..0000000000 --- a/mllib/src/test/scala/spark/mllib/regression/JavaLassoSuite.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.regression; - -import java.io.Serializable; -import java.util.List; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; - -public class JavaLassoSuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaLassoSuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - System.clearProperty("spark.driver.port"); - } - - int validatePrediction(List validationData, LassoModel model) { - int numAccurate = 0; - for (LabeledPoint point: validationData) { - Double prediction = model.predict(point.features()); - // A prediction is off if the prediction is more than 0.5 away from expected value. - if (Math.abs(prediction - point.label()) <= 0.5) { - numAccurate++; - } - } - return numAccurate; - } - - @Test - public void runLassoUsingConstructor() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0e-2}; - - JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); - - LassoWithSGD svmSGDImpl = new LassoWithSGD(); - svmSGDImpl.optimizer().setStepSize(1.0) - .setRegParam(0.01) - .setNumIterations(20); - LassoModel model = svmSGDImpl.run(testRDD.rdd()); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - - @Test - public void runLassoUsingStaticMethods() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0e-2}; - - JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); - - LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - -} -- cgit v1.2.3