diff options
6 files changed, 412 insertions, 64 deletions
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 243a346364..40b96fbe3a 100644 --- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.mllib.classification import spark.{Logging, RDD, SparkContext} @@ -11,32 +28,39 @@ import org.jblas.DoubleMatrix * Based on Matlab code written by John Duchi. */ class LogisticRegressionModel( - val weights: DoubleMatrix, + val weights: Array[Double], val intercept: Double, - val losses: Array[Double]) extends ClassificationModel { + val stochasticLosses: Array[Double]) extends ClassificationModel { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) override def predict(testData: spark.RDD[Array[Double]]) = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept testData.map { x => - val margin = new DoubleMatrix(1, x.length, x:_*).mmul(this.weights).get(0) + this.intercept + val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept 1.0/ (1.0 + math.exp(margin * -1)) } } override def predict(testData: Array[Double]): Double = { val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - val margin = dataMat.mmul(this.weights).get(0) + this.intercept + val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept 1.0/ (1.0 + math.exp(margin * -1)) } } -class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double, +class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double, var numIters: Int) extends Logging { /** * Construct a LogisticRegression object with default parameters */ - def this() = this(1.0, 0.0, 1.0, 100) + def this() = this(1.0, 1.0, 100) /** * Set the step size per-iteration of SGD. Default 1.0. @@ -46,6 +70,14 @@ class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var regPa this } + + + + + + + + /** * Set fraction of data to be used for each SGD iteration. Default 1.0. */ @@ -63,51 +95,97 @@ class LogisticRegression_LocalRandomSGD private (var stepSize: Double, var regPa } def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(Double, Array[Double])], + initialWeights: Array[Double]): LogisticRegressionModel = { + // Add a extra variable consisting of all 1.0's for the intercept. val data = input.map { case (y, features) => (y, Array(1.0, features:_*)) } - val (weights, losses) = GradientDescent.runMiniBatchSGD( - data, new LogisticGradient(), new SimpleUpdater(), stepSize, numIters, regParam, miniBatchFraction) + val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) - val weightsScaled = weights.getRange(1, weights.length) - val intercept = weights.get(0) + val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( + data, + new LogisticGradient(), + new SimpleUpdater(), + stepSize, + numIters, + 0.0, + initalWeightsWithIntercept, + miniBatchFraction) - val model = new LogisticRegressionModel(weightsScaled, intercept, losses) + val intercept = weights(0) + val weightsScaled = weights.tail - logInfo("Final model weights " + model.weights) + val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses) + + logInfo("Final model weights " + model.weights.mkString(",")) logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", ")) + logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", ")) model } } /** * Top-level methods for calling Logistic Regression. + * NOTE(shivaram): We use multiple train methods instead of default arguments to support + * Java programs. */ object LogisticRegression_LocalRandomSGD { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[(Double, Array[Double])], + numIterations: Int, + stepSize: Double, + + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LogisticRegressionModel = + { + new LogisticRegression_LocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input, initialWeights) + } + + /** + * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate the gradient. * * @param input RDD of (label, array of features) pairs. * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. - * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( input: RDD[(Double, Array[Double])], numIterations: Int, stepSize: Double, - regParam: Double, + miniBatchFraction: Double) : LogisticRegressionModel = { - new LogisticRegression_LocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + new LogisticRegression_LocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input) } /** @@ -117,18 +195,18 @@ object LogisticRegression_LocalRandomSGD { * * @param input RDD of (label, array of features) pairs. * @param stepSize Step size to be used for each iteration of Gradient Descent. - * @param regParam Regularization parameter. + * @param numIterations Number of iterations of gradient descent to run. * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( input: RDD[(Double, Array[Double])], numIterations: Int, - stepSize: Double, - regParam: Double) + stepSize: Double + ) : LogisticRegressionModel = { - train(input, numIterations, stepSize, regParam, 1.0) + train(input, numIterations, stepSize, 1.0) } /** @@ -145,7 +223,7 @@ object LogisticRegression_LocalRandomSGD { numIterations: Int) : LogisticRegressionModel = { - train(input, numIterations, 1.0, 0.0, 1.0) + train(input, numIterations, 1.0, 1.0) } def main(args: Array[String]) { diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala index bf10493bf5..2cd1d668eb 100644 --- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.mllib.classification import scala.math.signum @@ -11,22 +28,31 @@ import org.jblas.DoubleMatrix * SVM using Stochastic Gradient Descent. */ class SVMModel( - val weights: DoubleMatrix, + val weights: Array[Double], val intercept: Double, - val losses: Array[Double]) extends ClassificationModel { + val stochasticLosses: Array[Double]) extends ClassificationModel { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) override def predict(testData: spark.RDD[Array[Double]]) = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept testData.map { x => - signum(new DoubleMatrix(1, x.length, x:_*).dot(this.weights) + this.intercept) + signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept) } } override def predict(testData: Array[Double]): Double = { val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - signum(dataMat.dot(this.weights) + this.intercept) + signum(dataMat.dot(weightsMatrix) + this.intercept) } } + + class SVM_LocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double, var numIters: Int) extends Logging { @@ -69,34 +95,80 @@ class SVM_LocalRandomSGD private (var stepSize: Double, var regParam: Double, va } def train(input: RDD[(Double, Array[Double])]): SVMModel = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(Double, Array[Double])], + initialWeights: Array[Double]): SVMModel = { + // Add a extra variable consisting of all 1.0's for the intercept. val data = input.map { case (y, features) => (y, Array(1.0, features:_*)) } - val (weights, losses) = GradientDescent.runMiniBatchSGD( - data, new HingeGradient(), new SquaredL2Updater(), stepSize, numIters, regParam, miniBatchFraction) + val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) + + val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( + data, + new HingeGradient(), + new SquaredL2Updater(), + stepSize, + numIters, + regParam, + initalWeightsWithIntercept, + miniBatchFraction) - val weightsScaled = weights.getRange(1, weights.length) - val intercept = weights.get(0) + val intercept = weights(0) + val weightsScaled = weights.tail - val model = new SVMModel(weightsScaled, intercept, losses) + val model = new SVMModel(weightsScaled, intercept, stochasticLosses) - logInfo("Final model weights " + model.weights) + logInfo("Final model weights " + model.weights.mkString(",")) logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", ")) + logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) model } } /** * Top-level methods for calling SVM. + + */ object SVM_LocalRandomSGD { /** * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[(Double, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : SVMModel = + { + new SVM_LocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights) + } + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate the gradient. * * @param input RDD of (label, array of features) pairs. @@ -151,7 +223,7 @@ object SVM_LocalRandomSGD { numIterations: Int) : SVMModel = { - train(input, numIterations, 1.0, 0.10, 1.0) + train(input, numIterations, 1.0, 1.0, 1.0) } def main(args: Array[String]) { diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala index 6ffc3b128b..4864ab7ccf 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.mllib.optimization import org.jblas.DoubleMatrix @@ -58,4 +75,3 @@ class HingeGradient extends Gradient { (DoubleMatrix.zeros(1,weights.length), 0.0) } } - diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index bd8489c386..8387d4939b 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.mllib.optimization import spark.{Logging, RDD, SparkContext} @@ -23,8 +40,9 @@ object GradientDescent { * @param miniBatchFraction - fraction of the input data set that should be used for * one iteration of SGD. Default value 1.0. * - * @return weights - Column matrix containing weights for every feature. - * @return lossHistory - Array containing the loss computed for every iteration. + * @return A tuple containing two elements. The first element is a column matrix containing + * weights for every feature, and the second element is an array containing the stochastic + * loss computed for every iteration. */ def runMiniBatchSGD( data: RDD[(Double, Array[Double])], @@ -33,16 +51,16 @@ object GradientDescent { stepSize: Double, numIters: Int, regParam: Double, - miniBatchFraction: Double=1.0) : (DoubleMatrix, Array[Double]) = { + initialWeights: Array[Double], + miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = { - val lossHistory = new ArrayBuffer[Double](numIters) + val stochasticLossHistory = new ArrayBuffer[Double](numIters) - val nfeatures: Int = data.take(1)(0)._2.length val nexamples: Long = data.count() val miniBatchSize = nexamples * miniBatchFraction - // Initialize weights as a column matrix - var weights = DoubleMatrix.ones(nfeatures) + // Initialize weights as a column vector + var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*) var reg_val = 0.0 for (i <- 1 to numIters) { @@ -53,16 +71,17 @@ object GradientDescent { (grad, loss) }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) + stochasticLossHistory.append(lossSum / miniBatchSize + reg_val) val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) weights = update._1 reg_val = update._2 - lossHistory.append(lossSum / miniBatchSize + reg_val) + stochasticLossHistory.append(lossSum / miniBatchSize + reg_val) /*** Xinghao: The loss here is sum of lossSum computed using the weights before applying updater, and reg_val using weights after applying updater ***/ } - (weights, lossHistory.toArray) + (weights.toArray, stochasticLossHistory.toArray) } } diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index bb2305c811..64364323a2 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package spark.mllib.regression import spark.{Logging, RDD, SparkContext} @@ -8,24 +25,34 @@ import org.jblas.DoubleMatrix /** * Lasso using Stochastic Gradient Descent. + * */ class LassoModel( - val weights: DoubleMatrix, + val weights: Array[Double], val intercept: Double, - val losses: Array[Double]) extends RegressionModel { + val stochasticLosses: Array[Double]) extends RegressionModel { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) override def predict(testData: spark.RDD[Array[Double]]) = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept testData.map { x => - new DoubleMatrix(1, x.length, x:_*).dot(this.weights) + this.intercept + new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept } } + override def predict(testData: Array[Double]): Double = { val dataMat = new DoubleMatrix(1, testData.length, testData:_*) - dataMat.dot(this.weights) + this.intercept + dataMat.dot(weightsMatrix) + this.intercept } } + class Lasso_LocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double, var numIters: Int) extends Logging { @@ -68,34 +95,80 @@ class Lasso_LocalRandomSGD private (var stepSize: Double, var regParam: Double, } def train(input: RDD[(Double, Array[Double])]): LassoModel = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(Double, Array[Double])], + initialWeights: Array[Double]): LassoModel = { + // Add a extra variable consisting of all 1.0's for the intercept. val data = input.map { case (y, features) => (y, Array(1.0, features:_*)) } - val (weights, losses) = GradientDescent.runMiniBatchSGD( - data, new SquaredGradient(), new L1Updater(), stepSize, numIters, regParam, miniBatchFraction) + val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) + + val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( + data, + new SquaredGradient(), + new L1Updater(), + stepSize, + numIters, + regParam, + initalWeightsWithIntercept, + miniBatchFraction) - val weightsScaled = weights.getRange(1, weights.length) - val intercept = weights.get(0) + val intercept = weights(0) + val weightsScaled = weights.tail - val model = new LassoModel(weightsScaled, intercept, losses) + val model = new LassoModel(weightsScaled, intercept, stochasticLosses) - logInfo("Final model weights " + model.weights) + logInfo("Final model weights " + model.weights.mkString(",")) logInfo("Final model intercept " + model.intercept) - logInfo("Last 10 losses " + model.losses.takeRight(10).mkString(", ")) + logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) model } } /** * Top-level methods for calling Lasso. + * + * */ object Lasso_LocalRandomSGD { /** * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[(Double, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LassoModel = + { + new Lasso_LocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights) + } + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate the gradient. * * @param input RDD of (label, array of features) pairs. @@ -150,7 +223,7 @@ object Lasso_LocalRandomSGD { numIterations: Int) : LassoModel = { - train(input, numIterations, 1.0, 0.10, 1.0) + train(input, numIterations, 1.0, 1.0, 1.0) } def main(args: Array[String]) { diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index 5741906a14..827ca66330 100644 --- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -1,4 +1,25 @@ +<<<<<<< HEAD:mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala package spark.mllib.classification +======= +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression +>>>>>>> FETCH_HEAD:mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala import scala.util.Random @@ -6,7 +27,6 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import spark.SparkContext -import spark.SparkContext._ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { @@ -17,16 +37,23 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { System.clearProperty("spark.driver.port") } +<<<<<<< HEAD:mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala // Test if we can correctly learn A, B where Y = logistic(A + B*X) test("LogisticRegression_LocalRandomSGD") { val nPoints = 10000 val rnd = new Random(42) +======= + // Generate input of the form Y = logistic(offset + scale*X) + def generateLogisticInput( + offset: Double, + scale: Double, + nPoints: Int, + seed: Int): Seq[(Double, Array[Double])] = { + val rnd = new Random(seed) +>>>>>>> FETCH_HEAD:mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) - val A = 2.0 - val B = -1.5 - // NOTE: if U is uniform[0, 1] then ln(u) - ln(1-u) is Logistic(0,1) val unifRand = new scala.util.Random(45) val rLogis = (0 until nPoints).map { i => @@ -34,24 +61,87 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { math.log(u) - math.log(1.0-u) } - // y <- A + B*x + rlogis(100) + // y <- A + B*x + rLogis() // y <- as.numeric(y > 0) - val y = (0 until nPoints).map { i => - val yVal = A + B * x1(i) + rLogis(i) + val y: Seq[Double] = (0 until nPoints).map { i => + val yVal = offset + scale * x1(i) + rLogis(i) if (yVal > 0) 1.0 else 0.0 } - val testData = (0 until nPoints).map(i => (y(i).toDouble, Array(x1(i)))).toArray + val testData = (0 until nPoints).map(i => (y(i), Array(x1(i)))) + testData + } + + def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected) > 0.5 + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can correctly learn A, B where Y = logistic(A + B*X) + test("logistic regression") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val testData = generateLogisticInput(A, B, nPoints, 42) val testRDD = sc.parallelize(testData, 2) testRDD.cache() +<<<<<<< HEAD:mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala val lr = new LogisticRegression_LocalRandomSGD().setStepSize(10.0) .setNumIterations(20) +======= + val lr = new LogisticRegression().setStepSize(10.0).setNumIterations(20) +>>>>>>> FETCH_HEAD:mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala val model = lr.train(testRDD) - val weight0 = model.weights.get(0) + // Test the weights + val weight0 = model.weights(0) + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + + val validationData = generateLogisticInput(A, B, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + } + + test("logistic regression with initial weights") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val testData = generateLogisticInput(A, B, nPoints, 42) + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + // Use half as many iterations as the previous test. + val lr = new LogisticRegression().setStepSize(10.0).setNumIterations(10) + + val model = lr.train(testRDD, initialWeights) + + val weight0 = model.weights(0) assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + + val validationData = generateLogisticInput(A, B, nPoints, 17) + val validationRDD = sc.parallelize(validationData, 2) + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row._2)), validationData) } } |