From b659af83d3f91f0f339d874b2742ddca20a9f610 Mon Sep 17 00:00:00 2001 From: Evan Sparks Date: Wed, 14 Aug 2013 16:24:23 -0700 Subject: Adding Linear Regression, and refactoring Ridge Regression. --- .../spark/mllib/regression/LinearRegression.scala | 168 +++++++++++++ .../spark/mllib/regression/RidgeRegression.scala | 271 +++++++++------------ .../mllib/util/LinearRegressionDataGenerator.scala | 98 ++++++++ .../mllib/util/RidgeRegressionDataGenerator.scala | 5 +- .../regression/JavaLinearRegressionSuite.java | 96 ++++++++ .../mllib/regression/JavaRidgeRegressionSuite.java | 96 ++++++++ .../mllib/regression/LinearRegressionSuite.scala | 87 +++++++ .../mllib/regression/RidgeRegressionSuite.scala | 68 ++++-- 8 files changed, 713 insertions(+), 176 deletions(-) create mode 100644 mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala create mode 100644 mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala create mode 100644 mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java create mode 100644 mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java create mode 100644 mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala (limited to 'mllib/src') diff --git a/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala new file mode 100644 index 0000000000..0ea5348a1f --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import spark.{Logging, RDD, SparkContext} +import spark.mllib.optimization._ +import spark.mllib.util.MLUtils + +import org.jblas.DoubleMatrix + +/** + * Regression model trained using LinearRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class LinearRegressionModel( + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept + } +} + +/** + * Train a regression model with no regularization using Stochastic Gradient Descent. + */ +class LinearRegressionWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[LinearRegressionModel] + with Serializable { + + val gradient = new SquaredGradient() + val updater = new SimpleUpdater() + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setMiniBatchFraction(miniBatchFraction) + + /** + * Construct a LinearRegression object with default parameters + */ + def this() = this(1.0, 100, 1.0, true) + + def createModel(weights: Array[Double], intercept: Double) = { + new LinearRegressionModel(weights, intercept) + } +} + +/** + * Top-level methods for calling LinearRegression. + */ +object LinearRegressionWithSGD { + + /** + * Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LinearRegressionModel = + { + new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, true).run(input, + initialWeights) + } + + /** + * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double) + : LinearRegressionModel = + { + new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, true).run(input) + } + + /** + * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LinearRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double) + : LinearRegressionModel = + { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * update the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LinearRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int) + : LinearRegressionModel = + { + train(input, numIterations, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: LinearRegression ") + System.exit(1) + } + val sc = new SparkContext(args(0), "LinearRegression") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala index b42d94af41..addf8cd59e 100644 --- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala @@ -18,200 +18,159 @@ package spark.mllib.regression import spark.{Logging, RDD, SparkContext} +import spark.mllib.optimization._ import spark.mllib.util.MLUtils import org.jblas.DoubleMatrix -import org.jblas.Solve - -import scala.annotation.tailrec -import scala.collection.mutable /** - * Ridge Regression from Joseph Gonzalez's implementation in MLBase + * Regression model trained using RidgeRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. */ class RidgeRegressionModel( - val weights: DoubleMatrix, - val intercept: Double, - val lambdaOpt: Double, - val lambdas: Seq[(Double, Double, DoubleMatrix)]) - extends RegressionModel { - - override def predict(testData: RDD[Array[Double]]): RDD[Double] = { - // A small optimization to avoid serializing the entire model. - val localIntercept = this.intercept - val localWeights = this.weights - testData.map { x => - (new DoubleMatrix(1, x.length, x:_*).mmul(localWeights)).get(0) + localIntercept - } - } - - override def predict(testData: Array[Double]): Double = { - (new DoubleMatrix(1, testData.length, testData:_*).mmul(this.weights)).get(0) + this.intercept + override val weights: Array[Double], + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) + with RegressionModel with Serializable { + + override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, + intercept: Double) = { + dataMatrix.dot(weightMatrix) + intercept } } -class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double) - extends Logging { - - def this() = this(0.0, 100.0) +/** + * Train a regression model with L2-regularization using Stochastic Gradient Descent. + */ +class RidgeRegressionWithSGD private ( + var stepSize: Double, + var numIterations: Int, + var regParam: Double, + var miniBatchFraction: Double, + var addIntercept: Boolean) + extends GeneralizedLinearAlgorithm[RidgeRegressionModel] + with Serializable { + + val gradient = new SquaredGradient() + val updater = new SquaredL2Updater() + val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) /** - * Set the lower bound on binary search for lambda's. Default is 0. + * Construct a RidgeRegression object with default parameters */ - def setLowLambda(low: Double) = { - this.lambdaLow = low - this + def this() = this(1.0, 100, 1.0, 1.0, true) + + def createModel(weights: Array[Double], intercept: Double) = { + new RidgeRegressionModel(weights, intercept) } +} + +/** + * Top-level methods for calling RidgeRegression. + */ +object RidgeRegressionWithSGD { /** - * Set the upper bound on binary search for lambda's. Default is 100.0. + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. */ - def setHighLambda(hi: Double) = { - this.lambdaHigh = hi - this + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : RidgeRegressionModel = + { + new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input, + initialWeights) } - def train(inputLabeled: RDD[LabeledPoint]): RidgeRegressionModel = { - val input = inputLabeled.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) - val nfeatures: Int = input.take(1)(0)._2.length - val nexamples: Long = input.count() - - val (yMean, xColMean, xColSd) = MLUtils.computeStats(input, nfeatures, nexamples) - - val data = input.map { case(y, features) => - val yNormalized = y - yMean - val featuresMat = new DoubleMatrix(nfeatures, 1, features:_*) - val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd) - (yNormalized, featuresNormalized.toArray) - } - - // Compute XtX - Size of XtX is nfeatures by nfeatures - val XtX: DoubleMatrix = data.map { case (y, features) => - val x = new DoubleMatrix(1, features.length, features:_*) - x.transpose().mmul(x) - }.reduce(_.addi(_)) - - // Compute Xt*y - Size of Xty is nfeatures by 1 - val Xty: DoubleMatrix = data.map { case (y, features) => - new DoubleMatrix(features.length, 1, features:_*).mul(y) - }.reduce(_.addi(_)) - - // Define a function to compute the leave one out cross validation error - // for a single example - def crossValidate(lambda: Double): (Double, Double, DoubleMatrix) = { - // Compute the MLE ridge regression parameter value - - // Ridge Regression parameter = inv(XtX + \lambda*I) * Xty - val XtXlambda = DoubleMatrix.eye(nfeatures).muli(lambda).addi(XtX) - val w = Solve.solveSymmetric(XtXlambda, Xty) - - val invXtX = Solve.solveSymmetric(XtXlambda, DoubleMatrix.eye(nfeatures)) - - // compute the generalized cross validation score - val cverror = data.map { - case (y, features) => - val x = new DoubleMatrix(features.length, 1, features:_*) - val yhat = w.transpose().mmul(x).get(0) - val H_ii = x.transpose().mmul(invXtX).mmul(x).get(0) - val residual = (y - yhat) / (1.0 - H_ii) - residual * residual - }.reduce(_ + _) / nexamples - - (lambda, cverror, w) - } - - // Binary search for the best assignment to lambda. - def binSearch(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = { - val buffer = mutable.ListBuffer.empty[(Double, Double, DoubleMatrix)] - - @tailrec - def loop(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = { - val mid = (high - low) / 2 + low - val lowValue = crossValidate((mid - low) / 2 + low) - val highValue = crossValidate((high - mid) / 2 + mid) - val (newLow, newHigh) = if (lowValue._2 < highValue._2) { - (low, mid + (high-low)/4) - } else { - (mid - (high-low)/4, high) - } - if (newHigh - newLow > 1.0E-7) { - buffer += lowValue += highValue - loop(newLow, newHigh) - } else { - buffer += lowValue += highValue - buffer.result() - } - } - - loop(low, high) - } - - // Actually compute the best lambda - val lambdas = binSearch(lambdaLow, lambdaHigh).sortBy(_._1) - - // Find the best parameter set by taking the lowest cverror. - val (lambdaOpt, cverror, weights) = lambdas.reduce((a, b) => if (a._2 < b._2) a else b) - - // Return the model which contains the solution - val weightsScaled = weights.div(xColSd) - val intercept = yMean - (weights.transpose().mmul(xColMean.div(xColSd)).get(0)) - val model = new RidgeRegressionModel(weightsScaled, intercept, lambdaOpt, lambdas) - - logInfo("RidgeRegression: optimal lambda " + model.lambdaOpt) - logInfo("RidgeRegression: optimal weights " + model.weights) - logInfo("RidgeRegression: optimal intercept " + model.intercept) - logInfo("RidgeRegression: cross-validation error " + cverror) - - model + /** + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double) + : RidgeRegressionModel = + { + new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input) } -} - -/** - * Top-level methods for calling Ridge Regression. - */ -object RidgeRegression { - // NOTE(shivaram): We use multiple train methods instead of default arguments to support - // Java programs. /** - * Train a ridge regression model given an RDD of (response, features) pairs. - * We use the closed form solution to compute the cross-validation score for - * a given lambda. The optimal lambda is computed by performing binary search - * between the provided bounds of lambda. + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * update the gradient in each iteration. * - * @param input RDD of (response, array of features) pairs. - * @param lambdaLow lower bound used in binary search for lambda - * @param lambdaHigh upper bound used in binary search for lambda + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param regParam Regularization parameter. + * @param numIterations Number of iterations of gradient descent to run. + * @return a RidgeRegressionModel which has the weights and offset from training. */ def train( - input: RDD[LabeledPoint], - lambdaLow: Double, - lambdaHigh: Double) - : RidgeRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double) + : RidgeRegressionModel = { - new RidgeRegression(lambdaLow, lambdaHigh).train(input) + train(input, numIterations, stepSize, regParam, 1.0) } /** - * Train a ridge regression model given an RDD of (response, features) pairs. - * We use the closed form solution to compute the cross-validation score for - * a given lambda. The optimal lambda is computed by performing binary search - * between lambda values of 0 and 100. + * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * update the gradient in each iteration. * - * @param input RDD of (response, array of features) pairs. + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a RidgeRegressionModel which has the weights and offset from training. */ - def train(input: RDD[LabeledPoint]) : RidgeRegressionModel = { - train(input, 0.0, 100.0) + def train( + input: RDD[LabeledPoint], + numIterations: Int) + : RidgeRegressionModel = + { + train(input, numIterations, 1.0, 1.0, 1.0) } def main(args: Array[String]) { - if (args.length != 2) { - println("Usage: RidgeRegression ") + if (args.length != 5) { + println("Usage: RidgeRegression ") System.exit(1) } val sc = new SparkContext(args(0), "RidgeRegression") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = RidgeRegression.train(data, 0, 1000) + val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + sc.stop() } } diff --git a/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala new file mode 100644 index 0000000000..39e2a30b55 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.util + +import scala.util.Random + +import org.jblas.DoubleMatrix + +import spark.{RDD, SparkContext} +import spark.mllib.regression.LabeledPoint + +/** + * Generate sample data used for LinearRegression. This class generates + * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the + * response variable `Y`. + * + */ +object LinearRegressionDataGenerator { + + /** + * Generate an RDD containing sample data for LinearRegression. + * + * @param sc SparkContext to be used for generating the RDD. + * @param nexamples Number of examples that will be contained in the RDD. + * @param nfeatures Number of features to generate for each example. + * @param eps Epsilon factor by which examples are scaled. + * @param nparts Number of partitions in the RDD. Default value is 2. + * + * @return RDD of LabeledPoint containing sample data. + */ + def generateLinearRDD( + sc: SparkContext, + nexamples: Int, + nfeatures: Int, + eps: Double, + nparts: Int = 2, + intercept: Double = 0.0) : RDD[LabeledPoint] = { + org.jblas.util.Random.seed(42) + // Random values distributed uniformly in [-0.5, 0.5] + val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) + w.put(0, 0, 10) + w.put(1, 0, 10) + + val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => + org.jblas.util.Random.seed(42 + p) + val examplesInPartition = nexamples / nparts + + val X = DoubleMatrix.rand(examplesInPartition, nfeatures) + val y = X.mmul(w).add(intercept) + + val rnd = new Random(42 + p) + + val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps) + val yObs = new DoubleMatrix(normalValues).addi(y) + + Iterator.tabulate(examplesInPartition) { i => + LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray) + } + } + data + } + + def main(args: Array[String]) { + if (args.length < 2) { + println("Usage: LinearRegressionGenerator " + + " [num_examples] [num_features] [num_partitions]") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 100 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + val eps = 10 + + val sc = new SparkContext(sparkMaster, "LinearRegressionDataGenerator") + val data = generateLinearRDD(sc, nexamples, nfeatures, eps, parts) + + MLUtils.saveLabeledData(data, outputPath) + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala index 4d329168be..08dce723b8 100644 --- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala @@ -48,7 +48,8 @@ object RidgeRegressionDataGenerator { nexamples: Int, nfeatures: Int, eps: Double, - nparts: Int = 2) : RDD[LabeledPoint] = { + nparts: Int = 2, + intercept: Double = 0.0) : RDD[LabeledPoint] = { org.jblas.util.Random.seed(42) // Random values distributed uniformly in [-0.5, 0.5] val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) @@ -60,7 +61,7 @@ object RidgeRegressionDataGenerator { val examplesInPartition = nexamples / nparts val X = DoubleMatrix.rand(examplesInPartition, nfeatures) - val y = X.mmul(w) + val y = X.mmul(w).add(intercept) val rnd = new Random(42 + p) diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java new file mode 100644 index 0000000000..14d3d4ef39 --- /dev/null +++ b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +public class JavaLinearRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, LinearRegressionModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + // A prediction is off if the prediction is more than 0.5 away from expected value. + if (Math.abs(prediction - point.label()) <= 0.5) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runLinearRegressionUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(LinearRegressionSuite.generateLinearRegressionInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + LinearRegressionSuite.generateLinearRegressionInputAsList(A, weights, nPoints, 17); + + LinearRegressionWithSGD svmSGDImpl = new LinearRegressionWithSGD(); + svmSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(0.01) + .setNumIterations(20); + LinearRegressionModel model = svmSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLinearRegressionUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(LinearRegressionSuite.generateLinearRegressionInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + LinearRegressionSuite.generateLinearRegressionInputAsList(A, weights, nPoints, 17); + + LinearRegressionModel model = LinearRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java new file mode 100644 index 0000000000..4f379b51d5 --- /dev/null +++ b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; + +public class JavaRidgeRegressionSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaRidgeRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, RidgeRegressionModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + // A prediction is off if the prediction is more than 0.5 away from expected value. + if (Math.abs(prediction - point.label()) <= 0.5) { + numAccurate++; + } + } + return numAccurate; + } + + @Test + public void runRidgeRegressionUsingConstructor() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, weights, nPoints, 17); + + RidgeRegressionWithSGD svmSGDImpl = new RidgeRegressionWithSGD(); + svmSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(0.01) + .setNumIterations(20); + RidgeRegressionModel model = svmSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runRidgeRegressionUsingStaticMethods() { + int nPoints = 10000; + double A = 2.0; + double[] weights = {-1.5, 1.0e-2}; + + JavaRDD testRDD = sc.parallelize(RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, + weights, nPoints, 42), 2).cache(); + List validationData = + RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, weights, nPoints, 17); + + RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + +} diff --git a/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala new file mode 100644 index 0000000000..c794c1cac5 --- /dev/null +++ b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import scala.collection.JavaConversions._ +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import spark.SparkContext +import spark.SparkContext._ +import spark.mllib.util.LinearRegressionDataGenerator +import spark.mllib.regression.LabeledPoint +import org.jblas.DoubleMatrix + +object LinearRegressionSuite { + + def generateLinearRegressionInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLinearRegressionInput(intercept, weights, nPoints, seed)) + } + + + // Generate noisy input of the form Y = x.dot(weights) + intercept + noise + def generateLinearRegressionInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) + val y = x.map(xi => + (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() + ) + y.zip(x).map(p => LabeledPoint(p._1, p._2)) + } + +} + +class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when + // X1 and X2 are collinear. + test("multi-collinear variables") { + val testRDD = LinearRegressionDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, intercept=3.0).cache() + val linReg = new LinearRegressionWithSGD() + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + assert(model.weights.length === 2) + assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0) + assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0) + } +} diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala index e2b244894d..aaac083ad9 100644 --- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala @@ -17,6 +17,7 @@ package spark.mllib.regression +import scala.collection.JavaConversions._ import scala.util.Random import org.scalatest.BeforeAndAfterAll @@ -24,6 +25,37 @@ import org.scalatest.FunSuite import spark.SparkContext import spark.SparkContext._ +import spark.mllib.util.RidgeRegressionDataGenerator +import org.jblas.DoubleMatrix + +object RidgeRegressionSuite { + + def generateRidgeRegressionInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateRidgeRegressionInput(intercept, weights, nPoints, seed)) + } + + + // Generate noisy input of the form Y = x.dot(weights) + intercept + noise + def generateRidgeRegressionInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[LabeledPoint] = { + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) + val y = x.map(xi => + (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() + ) + y.zip(x).map(p => LabeledPoint(p._1, p._2)) + } + +} class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { @@ -38,31 +70,31 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { System.clearProperty("spark.driver.port") } - // Test if we can correctly learn Y = 3 + X1 + X2 when + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when // X1 and X2 are collinear. test("multi-collinear variables") { - val rnd = new Random(43) - val x1 = Array.fill[Double](20)(rnd.nextGaussian()) - - // Pick a mean close to mean of x1 - val rnd1 = new Random(42) //new NormalDistribution(0.1, 0.01) - val x2 = Array.fill[Double](20)(0.1 + rnd1.nextGaussian() * 0.01) + val testRDD = RidgeRegressionDataGenerator.generateRidgeRDD(sc, 100, 2, 0.0, intercept=3.0).cache() + val ridgeReg = new RidgeRegressionWithSGD() + ridgeReg.optimizer.setNumIterations(1000).setRegParam(0.0).setStepSize(1.0) - val xMat = (0 until 20).map(i => Array(x1(i), x2(i))).toArray + val model = ridgeReg.run(testRDD) - val y = xMat.map(i => 3 + i(0) + i(1)) - val testData = (0 until 20).map(i => LabeledPoint(y(i), xMat(i))).toArray + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + assert(model.weights.length === 2) + assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0) + assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0) + } - val testRDD = sc.parallelize(testData, 2) - testRDD.cache() - val ridgeReg = new RidgeRegression().setLowLambda(0) - .setHighLambda(10) + test("multi-collinear variables with regularization") { + val testRDD = RidgeRegressionDataGenerator.generateRidgeRDD(sc, 100, 2, 0.0, intercept=3.0).cache() + val ridgeReg = new RidgeRegressionWithSGD() + ridgeReg.optimizer.setNumIterations(1000).setRegParam(1.0).setStepSize(1.0) - val model = ridgeReg.train(testRDD) + val model = ridgeReg.run(testRDD) - assert(model.intercept >= 2.9 && model.intercept <= 3.1) + assert(model.intercept <= 5.0) assert(model.weights.length === 2) - assert(model.weights.get(0) >= 0.9 && model.weights.get(0) <= 1.1) - assert(model.weights.get(1) >= 0.9 && model.weights.get(1) <= 1.1) + assert(model.weights(0) <= 3.0) + assert(model.weights(1) <= 3.0) } } -- cgit v1.2.3 From b291db712e73fdff0c02946bac96e330b089409d Mon Sep 17 00:00:00 2001 From: Evan Sparks Date: Fri, 16 Aug 2013 17:48:26 -0700 Subject: Centralizing linear data generator and mllib regression tests to use it. --- .../spark/mllib/util/LassoDataGenerator.scala | 48 -------- .../spark/mllib/util/LinearDataGenerator.scala | 136 +++++++++++++++++++++ .../mllib/util/LinearRegressionDataGenerator.scala | 98 --------------- .../mllib/util/RidgeRegressionDataGenerator.scala | 98 --------------- .../spark/mllib/regression/JavaLassoSuite.java | 11 +- .../regression/JavaLinearRegressionSuite.java | 9 +- .../mllib/regression/JavaRidgeRegressionSuite.java | 9 +- .../scala/spark/mllib/regression/LassoSuite.scala | 39 +----- .../mllib/regression/LinearRegressionSuite.scala | 38 +----- .../mllib/regression/RidgeRegressionSuite.scala | 38 +----- 10 files changed, 163 insertions(+), 361 deletions(-) delete mode 100644 mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala create mode 100644 mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala delete mode 100644 mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala delete mode 100644 mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala (limited to 'mllib/src') diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala deleted file mode 100644 index eeb14fc4e3..0000000000 --- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala +++ /dev/null @@ -1,48 +0,0 @@ -package spark.mllib.util - -import scala.util.Random - -import org.jblas.DoubleMatrix - -import spark.{RDD, SparkContext} -import spark.mllib.regression.LabeledPoint - -/** - * Generate sample data used for Lasso Regression. This class generates uniform random values - * for the features and adds Gaussian noise with weight 0.1 to generate response variables. - */ -object LassoDataGenerator { - - def main(args: Array[String]) { - if (args.length < 2) { - println("Usage: LassoGenerator " + - " [num_examples] [num_features] [num_partitions]") - System.exit(1) - } - - val sparkMaster: String = args(0) - val outputPath: String = args(1) - val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 - val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 - val parts: Int = if (args.length > 4) args(4).toInt else 2 - - val sc = new SparkContext(sparkMaster, "LassoGenerator") - - val globalRnd = new Random(94720) - val trueWeights = new DoubleMatrix(1, nfeatures+1, - Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*) - - val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx => - val rnd = new Random(42 + idx) - - val x = Array.fill[Double](nfeatures) { - rnd.nextDouble() * 2.0 - 1.0 - } - val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1 - LabeledPoint(y, x) - } - - MLUtils.saveLabeledData(data, outputPath) - sc.stop() - } -} diff --git a/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala new file mode 100644 index 0000000000..8fe3ab4754 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.util + +import scala.util.Random + +import org.jblas.DoubleMatrix + +import spark.{RDD, SparkContext} +import spark.mllib.regression.LabeledPoint +import scala.collection.JavaConversions._ +import spark.mllib.regression.LabeledPoint + +/** + * Generate sample data used for Linear Data. This class generates + * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the + * response variable `Y`. + * + */ +object LinearDataGenerator { + + /** + * Return a Java List of synthetic data randomly generated according to a multi + * collinear model. + * @param intercept Data intercept + * @param weights Weights to be applied. + * @param nPoints Number of points in sample. + * @param seed Random seed + * @return Java List of input. + */ + def generateLinearInputAsList( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLinearInput(intercept, weights, nPoints, seed)) + } + + /** + * + * @param intercept Data intercept + * @param weights Weights to be applied. + * @param nPoints Number of points in sample. + * @param seed Random seed + * @param eps Epsilon scaling factor. + * @return + */ + def generateLinearInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int, + eps: Double = 0.1): Seq[LabeledPoint] = { + + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)( + Array.fill[Double](weights.length)(rnd.nextGaussian())) + val y = x.map(xi => + (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + eps * rnd.nextGaussian() + ) + y.zip(x).map(p => LabeledPoint(p._1, p._2)) + } + + /** + * Generate an RDD containing sample data for Linear Regression models - including Ridge, Lasso, + * and uregularized variants. + * + * @param sc SparkContext to be used for generating the RDD. + * @param nexamples Number of examples that will be contained in the RDD. + * @param nfeatures Number of features to generate for each example. + * @param eps Epsilon factor by which examples are scaled. + * @param weights Weights associated with the first weights.length features. + * @param nparts Number of partitions in the RDD. Default value is 2. + * + * @return RDD of LabeledPoint containing sample data. + */ + def generateLinearRDD( + sc: SparkContext, + nexamples: Int, + nfeatures: Int, + eps: Double, + weights: Array[Double] = Array[Double](), + nparts: Int = 2, + intercept: Double = 0.0) : RDD[LabeledPoint] = { + org.jblas.util.Random.seed(42) + // Random values distributed uniformly in [-0.5, 0.5] + val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) + + (0 until weights.length.max(nfeatures)).map(i => w.put(i, 0, weights(i))) + + val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => + val seed = 42+p + val examplesInPartition = nexamples / nparts + + generateLinearInput(intercept, w.toArray, examplesInPartition, seed, eps) + } + data + } + + def main(args: Array[String]) { + if (args.length < 2) { + println("Usage: RidgeRegressionGenerator " + + " [num_examples] [num_features] [num_partitions]") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 100 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + val eps = 10 + + val sc = new SparkContext(sparkMaster, "RidgeRegressionDataGenerator") + val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts) + + MLUtils.saveLabeledData(data, outputPath) + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala deleted file mode 100644 index 39e2a30b55..0000000000 --- a/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.util - -import scala.util.Random - -import org.jblas.DoubleMatrix - -import spark.{RDD, SparkContext} -import spark.mllib.regression.LabeledPoint - -/** - * Generate sample data used for LinearRegression. This class generates - * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the - * response variable `Y`. - * - */ -object LinearRegressionDataGenerator { - - /** - * Generate an RDD containing sample data for LinearRegression. - * - * @param sc SparkContext to be used for generating the RDD. - * @param nexamples Number of examples that will be contained in the RDD. - * @param nfeatures Number of features to generate for each example. - * @param eps Epsilon factor by which examples are scaled. - * @param nparts Number of partitions in the RDD. Default value is 2. - * - * @return RDD of LabeledPoint containing sample data. - */ - def generateLinearRDD( - sc: SparkContext, - nexamples: Int, - nfeatures: Int, - eps: Double, - nparts: Int = 2, - intercept: Double = 0.0) : RDD[LabeledPoint] = { - org.jblas.util.Random.seed(42) - // Random values distributed uniformly in [-0.5, 0.5] - val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) - w.put(0, 0, 10) - w.put(1, 0, 10) - - val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => - org.jblas.util.Random.seed(42 + p) - val examplesInPartition = nexamples / nparts - - val X = DoubleMatrix.rand(examplesInPartition, nfeatures) - val y = X.mmul(w).add(intercept) - - val rnd = new Random(42 + p) - - val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps) - val yObs = new DoubleMatrix(normalValues).addi(y) - - Iterator.tabulate(examplesInPartition) { i => - LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray) - } - } - data - } - - def main(args: Array[String]) { - if (args.length < 2) { - println("Usage: LinearRegressionGenerator " + - " [num_examples] [num_features] [num_partitions]") - System.exit(1) - } - - val sparkMaster: String = args(0) - val outputPath: String = args(1) - val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 - val nfeatures: Int = if (args.length > 3) args(3).toInt else 100 - val parts: Int = if (args.length > 4) args(4).toInt else 2 - val eps = 10 - - val sc = new SparkContext(sparkMaster, "LinearRegressionDataGenerator") - val data = generateLinearRDD(sc, nexamples, nfeatures, eps, parts) - - MLUtils.saveLabeledData(data, outputPath) - sc.stop() - } -} diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala deleted file mode 100644 index 08dce723b8..0000000000 --- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.mllib.util - -import scala.util.Random - -import org.jblas.DoubleMatrix - -import spark.{RDD, SparkContext} -import spark.mllib.regression.LabeledPoint - -/** - * Generate sample data used for RidgeRegression. This class generates - * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the - * response variable `Y`. - * - */ -object RidgeRegressionDataGenerator { - - /** - * Generate an RDD containing sample data for RidgeRegression. - * - * @param sc SparkContext to be used for generating the RDD. - * @param nexamples Number of examples that will be contained in the RDD. - * @param nfeatures Number of features to generate for each example. - * @param eps Epsilon factor by which examples are scaled. - * @param nparts Number of partitions in the RDD. Default value is 2. - * - * @return RDD of LabeledPoint containing sample data. - */ - def generateRidgeRDD( - sc: SparkContext, - nexamples: Int, - nfeatures: Int, - eps: Double, - nparts: Int = 2, - intercept: Double = 0.0) : RDD[LabeledPoint] = { - org.jblas.util.Random.seed(42) - // Random values distributed uniformly in [-0.5, 0.5] - val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) - w.put(0, 0, 10) - w.put(1, 0, 10) - - val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => - org.jblas.util.Random.seed(42 + p) - val examplesInPartition = nexamples / nparts - - val X = DoubleMatrix.rand(examplesInPartition, nfeatures) - val y = X.mmul(w).add(intercept) - - val rnd = new Random(42 + p) - - val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps) - val yObs = new DoubleMatrix(normalValues).addi(y) - - Iterator.tabulate(examplesInPartition) { i => - LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray) - } - } - data - } - - def main(args: Array[String]) { - if (args.length < 2) { - println("Usage: RidgeRegressionGenerator " + - " [num_examples] [num_features] [num_partitions]") - System.exit(1) - } - - val sparkMaster: String = args(0) - val outputPath: String = args(1) - val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 - val nfeatures: Int = if (args.length > 3) args(3).toInt else 100 - val parts: Int = if (args.length > 4) args(4).toInt else 2 - val eps = 10 - - val sc = new SparkContext(sparkMaster, "RidgeRegressionDataGenerator") - val data = generateRidgeRDD(sc, nexamples, nfeatures, eps, parts) - - MLUtils.saveLabeledData(data, outputPath) - sc.stop() - } -} diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java index e26d7b385c..8d692c2d0d 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java @@ -27,6 +27,7 @@ import org.junit.Test; import spark.api.java.JavaRDD; import spark.api.java.JavaSparkContext; +import spark.mllib.util.LinearDataGenerator; public class JavaLassoSuite implements Serializable { private transient JavaSparkContext sc; @@ -61,10 +62,10 @@ public class JavaLassoSuite implements Serializable { double A = 2.0; double[] weights = {-1.5, 1.0e-2}; - JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, - weights, nPoints, 42), 2).cache(); + JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, + weights, nPoints, 42), 2).cache(); List validationData = - LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); LassoWithSGD svmSGDImpl = new LassoWithSGD(); svmSGDImpl.optimizer().setStepSize(1.0) @@ -82,10 +83,10 @@ public class JavaLassoSuite implements Serializable { double A = 2.0; double[] weights = {-1.5, 1.0e-2}; - JavaRDD testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A, + JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42), 2).cache(); List validationData = - LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java index 14d3d4ef39..d2d8a62980 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -27,6 +27,7 @@ import org.junit.Test; import spark.api.java.JavaRDD; import spark.api.java.JavaSparkContext; +import spark.mllib.util.LinearDataGenerator; public class JavaLinearRegressionSuite implements Serializable { private transient JavaSparkContext sc; @@ -61,10 +62,10 @@ public class JavaLinearRegressionSuite implements Serializable { double A = 2.0; double[] weights = {-1.5, 1.0e-2}; - JavaRDD testRDD = sc.parallelize(LinearRegressionSuite.generateLinearRegressionInputAsList(A, + JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42), 2).cache(); List validationData = - LinearRegressionSuite.generateLinearRegressionInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); LinearRegressionWithSGD svmSGDImpl = new LinearRegressionWithSGD(); svmSGDImpl.optimizer().setStepSize(1.0) @@ -82,10 +83,10 @@ public class JavaLinearRegressionSuite implements Serializable { double A = 2.0; double[] weights = {-1.5, 1.0e-2}; - JavaRDD testRDD = sc.parallelize(LinearRegressionSuite.generateLinearRegressionInputAsList(A, + JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42), 2).cache(); List validationData = - LinearRegressionSuite.generateLinearRegressionInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); LinearRegressionModel model = LinearRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0); diff --git a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java index 4f379b51d5..72ab875985 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -27,6 +27,7 @@ import org.junit.Test; import spark.api.java.JavaRDD; import spark.api.java.JavaSparkContext; +import spark.mllib.util.LinearDataGenerator; public class JavaRidgeRegressionSuite implements Serializable { private transient JavaSparkContext sc; @@ -61,10 +62,10 @@ public class JavaRidgeRegressionSuite implements Serializable { double A = 2.0; double[] weights = {-1.5, 1.0e-2}; - JavaRDD testRDD = sc.parallelize(RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, + JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42), 2).cache(); List validationData = - RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); RidgeRegressionWithSGD svmSGDImpl = new RidgeRegressionWithSGD(); svmSGDImpl.optimizer().setStepSize(1.0) @@ -82,10 +83,10 @@ public class JavaRidgeRegressionSuite implements Serializable { double A = 2.0; double[] weights = {-1.5, 1.0e-2}; - JavaRDD testRDD = sc.parallelize(RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, + JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42), 2).cache(); List validationData = - RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala index 55a738f1e4..622dbbab7f 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala @@ -24,37 +24,8 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import spark.SparkContext +import spark.mllib.util.LinearDataGenerator -import org.jblas.DoubleMatrix - -object LassoSuite { - - def generateLassoInputAsList( - intercept: Double, - weights: Array[Double], - nPoints: Int, - seed: Int): java.util.List[LabeledPoint] = { - seqAsJavaList(generateLassoInput(intercept, weights, nPoints, seed)) - } - - - // Generate noisy input of the form Y = x.dot(weights) + intercept + noise - def generateLassoInput( - intercept: Double, - weights: Array[Double], - nPoints: Int, - seed: Int): Seq[LabeledPoint] = { - val rnd = new Random(seed) - val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)( - Array.fill[Double](weights.length)(rnd.nextGaussian())) - val y = x.map(xi => - (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() - ) - y.zip(x).map(p => LabeledPoint(p._1, p._2)) - } - -} class LassoSuite extends FunSuite with BeforeAndAfterAll { @transient private var sc: SparkContext = _ @@ -85,7 +56,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val B = -1.5 val C = 1.0e-2 - val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42) + val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42) val testRDD = sc.parallelize(testData, 2) testRDD.cache() @@ -101,7 +72,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") - val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17) + val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17) val validationRDD = sc.parallelize(validationData, 2) // Test prediction on RDD. @@ -118,7 +89,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { val B = -1.5 val C = 1.0e-2 - val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42) + val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42) val initialB = -1.0 val initialC = -1.0 @@ -138,7 +109,7 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll { assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") - val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17) + val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17) val validationRDD = sc.parallelize(validationData,2) // Test prediction on RDD. diff --git a/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala index c794c1cac5..3d22b7d385 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala @@ -17,46 +17,12 @@ package spark.mllib.regression -import scala.collection.JavaConversions._ -import scala.util.Random - import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import spark.SparkContext import spark.SparkContext._ -import spark.mllib.util.LinearRegressionDataGenerator -import spark.mllib.regression.LabeledPoint -import org.jblas.DoubleMatrix - -object LinearRegressionSuite { - - def generateLinearRegressionInputAsList( - intercept: Double, - weights: Array[Double], - nPoints: Int, - seed: Int): java.util.List[LabeledPoint] = { - seqAsJavaList(generateLinearRegressionInput(intercept, weights, nPoints, seed)) - } - - - // Generate noisy input of the form Y = x.dot(weights) + intercept + noise - def generateLinearRegressionInput( - intercept: Double, - weights: Array[Double], - nPoints: Int, - seed: Int): Seq[LabeledPoint] = { - val rnd = new Random(seed) - val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)( - Array.fill[Double](weights.length)(rnd.nextGaussian())) - val y = x.map(xi => - (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() - ) - y.zip(x).map(p => LabeledPoint(p._1, p._2)) - } - -} +import spark.mllib.util.LinearDataGenerator class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { @transient private var sc: SparkContext = _ @@ -73,7 +39,7 @@ class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when // X1 and X2 are collinear. test("multi-collinear variables") { - val testRDD = LinearRegressionDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, intercept=3.0).cache() + val testRDD = LinearDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, Array(10.0, 10.0), intercept=3.0).cache() val linReg = new LinearRegressionWithSGD() linReg.optimizer.setNumIterations(1000).setStepSize(1.0) diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala index aaac083ad9..0237ccdf87 100644 --- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala @@ -25,37 +25,7 @@ import org.scalatest.FunSuite import spark.SparkContext import spark.SparkContext._ -import spark.mllib.util.RidgeRegressionDataGenerator -import org.jblas.DoubleMatrix - -object RidgeRegressionSuite { - - def generateRidgeRegressionInputAsList( - intercept: Double, - weights: Array[Double], - nPoints: Int, - seed: Int): java.util.List[LabeledPoint] = { - seqAsJavaList(generateRidgeRegressionInput(intercept, weights, nPoints, seed)) - } - - - // Generate noisy input of the form Y = x.dot(weights) + intercept + noise - def generateRidgeRegressionInput( - intercept: Double, - weights: Array[Double], - nPoints: Int, - seed: Int): Seq[LabeledPoint] = { - val rnd = new Random(seed) - val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) - val x = Array.fill[Array[Double]](nPoints)( - Array.fill[Double](weights.length)(rnd.nextGaussian())) - val y = x.map(xi => - (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() - ) - y.zip(x).map(p => LabeledPoint(p._1, p._2)) - } - -} +import spark.mllib.util.LinearDataGenerator class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { @@ -73,7 +43,7 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when // X1 and X2 are collinear. test("multi-collinear variables") { - val testRDD = RidgeRegressionDataGenerator.generateRidgeRDD(sc, 100, 2, 0.0, intercept=3.0).cache() + val testRDD = LinearDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, Array(10.0, 10.0), intercept=3.0).cache() val ridgeReg = new RidgeRegressionWithSGD() ridgeReg.optimizer.setNumIterations(1000).setRegParam(0.0).setStepSize(1.0) @@ -86,7 +56,7 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { } test("multi-collinear variables with regularization") { - val testRDD = RidgeRegressionDataGenerator.generateRidgeRDD(sc, 100, 2, 0.0, intercept=3.0).cache() + val testRDD = LinearDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, Array(10.0, 10.0), intercept=3.0).cache() val ridgeReg = new RidgeRegressionWithSGD() ridgeReg.optimizer.setNumIterations(1000).setRegParam(1.0).setStepSize(1.0) @@ -94,7 +64,7 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { assert(model.intercept <= 5.0) assert(model.weights.length === 2) - assert(model.weights(0) <= 3.0) + assert(model.weights(0) <= 4.0) assert(model.weights(1) <= 3.0) } } -- cgit v1.2.3 From 07fe910669b2ec15b6b5c1e5186df5036d05b9b1 Mon Sep 17 00:00:00 2001 From: Evan Sparks Date: Fri, 16 Aug 2013 18:00:20 -0700 Subject: Fixing typos in Java tests, and addressing alignment issues. --- .../main/scala/spark/mllib/util/LinearDataGenerator.scala | 14 +++++++------- .../test/java/spark/mllib/regression/JavaLassoSuite.java | 6 +++--- .../spark/mllib/regression/JavaLinearRegressionSuite.java | 6 +++--- .../spark/mllib/regression/JavaRidgeRegressionSuite.java | 6 +++--- 4 files changed, 16 insertions(+), 16 deletions(-) (limited to 'mllib/src') diff --git a/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala index 8fe3ab4754..20e1656beb 100644 --- a/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala @@ -91,13 +91,13 @@ object LinearDataGenerator { * @return RDD of LabeledPoint containing sample data. */ def generateLinearRDD( - sc: SparkContext, - nexamples: Int, - nfeatures: Int, - eps: Double, - weights: Array[Double] = Array[Double](), - nparts: Int = 2, - intercept: Double = 0.0) : RDD[LabeledPoint] = { + sc: SparkContext, + nexamples: Int, + nfeatures: Int, + eps: Double, + weights: Array[Double] = Array[Double](), + nparts: Int = 2, + intercept: Double = 0.0) : RDD[LabeledPoint] = { org.jblas.util.Random.seed(42) // Random values distributed uniformly in [-0.5, 0.5] val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java index 8d692c2d0d..428902e85c 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java @@ -67,11 +67,11 @@ public class JavaLassoSuite implements Serializable { List validationData = LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); - LassoWithSGD svmSGDImpl = new LassoWithSGD(); - svmSGDImpl.optimizer().setStepSize(1.0) + LassoWithSGD lassoSGDImpl = new LassoWithSGD(); + lassoSGDImpl.optimizer().setStepSize(1.0) .setRegParam(0.01) .setNumIterations(20); - LassoModel model = svmSGDImpl.run(testRDD.rdd()); + LassoModel model = lassoSGDImpl.run(testRDD.rdd()); int numAccurate = validatePrediction(validationData, model); Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java index d2d8a62980..9642e89844 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -67,11 +67,11 @@ public class JavaLinearRegressionSuite implements Serializable { List validationData = LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); - LinearRegressionWithSGD svmSGDImpl = new LinearRegressionWithSGD(); - svmSGDImpl.optimizer().setStepSize(1.0) + LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD(); + linSGDImpl.optimizer().setStepSize(1.0) .setRegParam(0.01) .setNumIterations(20); - LinearRegressionModel model = svmSGDImpl.run(testRDD.rdd()); + LinearRegressionModel model = linSGDImpl.run(testRDD.rdd()); int numAccurate = validatePrediction(validationData, model); Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); diff --git a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java index 72ab875985..5df6d8076d 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -67,11 +67,11 @@ public class JavaRidgeRegressionSuite implements Serializable { List validationData = LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); - RidgeRegressionWithSGD svmSGDImpl = new RidgeRegressionWithSGD(); - svmSGDImpl.optimizer().setStepSize(1.0) + RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD(); + ridgeSGDImpl.optimizer().setStepSize(1.0) .setRegParam(0.01) .setNumIterations(20); - RidgeRegressionModel model = svmSGDImpl.run(testRDD.rdd()); + RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd()); int numAccurate = validatePrediction(validationData, model); Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); -- cgit v1.2.3 From b8c50a0642cf74c25fd70cc1e7d1be95ddafc5d8 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 25 Aug 2013 22:24:27 -0700 Subject: Center & scale variables in Ridge, Lasso. Also add a unit test that checks if ridge regression lowers cross-validation error. --- .../main/scala/spark/mllib/regression/Lasso.scala | 45 ++++++- .../spark/mllib/regression/LinearRegression.scala | 44 +++---- .../spark/mllib/regression/RidgeRegression.scala | 95 +++++++++----- .../spark/mllib/util/LinearDataGenerator.scala | 24 ++-- .../src/main/scala/spark/mllib/util/MLUtils.scala | 10 +- .../spark/mllib/regression/JavaLassoSuite.java | 8 +- .../regression/JavaLinearRegressionSuite.java | 119 +++++++++--------- .../mllib/regression/JavaRidgeRegressionSuite.java | 139 +++++++++++---------- .../mllib/regression/LinearRegressionSuite.scala | 27 +++- .../mllib/regression/RidgeRegressionSuite.scala | 64 ++++++---- 10 files changed, 347 insertions(+), 228 deletions(-) (limited to 'mllib/src') diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala index 6bbc990a5a..929c36bd76 100644 --- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -55,10 +55,17 @@ class LassoWithSGD private ( val gradient = new SquaredGradient() val updater = new L1Updater() - val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) - .setNumIterations(numIterations) - .setRegParam(regParam) - .setMiniBatchFraction(miniBatchFraction) + @transient val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .setMiniBatchFraction(miniBatchFraction) + + // We don't want to penalize the intercept, so set this to false. + setIntercept(false) + + var yMean = 0.0 + var xColMean: DoubleMatrix = _ + var xColSd: DoubleMatrix = _ /** * Construct a Lasso object with default parameters @@ -66,7 +73,35 @@ class LassoWithSGD private ( def this() = this(1.0, 100, 1.0, 1.0, true) def createModel(weights: Array[Double], intercept: Double) = { - new LassoModel(weights, intercept) + val weightsMat = new DoubleMatrix(weights.length + 1, 1, (Array(intercept) ++ weights):_*) + val weightsScaled = weightsMat.div(xColSd) + val interceptScaled = yMean - (weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)) + + new LassoModel(weightsScaled.data, interceptScaled) + } + + override def run( + input: RDD[LabeledPoint], + initialWeights: Array[Double]) + : LassoModel = + { + val nfeatures: Int = input.first.features.length + val nexamples: Long = input.count() + + // To avoid penalizing the intercept, we center and scale the data. + val stats = MLUtils.computeStats(input, nfeatures, nexamples) + yMean = stats._1 + xColMean = stats._2 + xColSd = stats._3 + + val normalizedData = input.map { point => + val yNormalized = point.label - yMean + val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*) + val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd) + LabeledPoint(yNormalized, featuresNormalized.toArray) + } + + super.run(normalizedData, initialWeights) } } diff --git a/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala index 0ea5348a1f..5b3743f2fa 100644 --- a/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala @@ -45,10 +45,10 @@ class LinearRegressionModel( * Train a regression model with no regularization using Stochastic Gradient Descent. */ class LinearRegressionWithSGD private ( - var stepSize: Double, - var numIterations: Int, - var miniBatchFraction: Double, - var addIntercept: Boolean) + var stepSize: Double, + var numIterations: Int, + var miniBatchFraction: Double, + var addIntercept: Boolean) extends GeneralizedLinearAlgorithm[LinearRegressionModel] with Serializable { @@ -87,12 +87,12 @@ object LinearRegressionWithSGD { * the number of features in the data. */ def train( - input: RDD[LabeledPoint], - numIterations: Int, - stepSize: Double, - miniBatchFraction: Double, - initialWeights: Array[Double]) - : LinearRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LinearRegressionModel = { new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, true).run(input, initialWeights) @@ -109,11 +109,11 @@ object LinearRegressionWithSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[LabeledPoint], - numIterations: Int, - stepSize: Double, - miniBatchFraction: Double) - : LinearRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double) + : LinearRegressionModel = { new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, true).run(input) } @@ -129,10 +129,10 @@ object LinearRegressionWithSGD { * @return a LinearRegressionModel which has the weights and offset from training. */ def train( - input: RDD[LabeledPoint], - numIterations: Int, - stepSize: Double) - : LinearRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double) + : LinearRegressionModel = { train(input, numIterations, stepSize, 1.0) } @@ -147,9 +147,9 @@ object LinearRegressionWithSGD { * @return a LinearRegressionModel which has the weights and offset from training. */ def train( - input: RDD[LabeledPoint], - numIterations: Int) - : LinearRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int) + : LinearRegressionModel = { train(input, numIterations, 1.0, 1.0) } diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala index addf8cd59e..ccf7364806 100644 --- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala @@ -55,18 +55,54 @@ class RidgeRegressionWithSGD private ( val gradient = new SquaredGradient() val updater = new SquaredL2Updater() - val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) + + @transient val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize) .setNumIterations(numIterations) .setRegParam(regParam) .setMiniBatchFraction(miniBatchFraction) + // We don't want to penalize the intercept in RidgeRegression, so set this to false. + setIntercept(false) + + var yMean = 0.0 + var xColMean: DoubleMatrix = _ + var xColSd: DoubleMatrix = _ + /** * Construct a RidgeRegression object with default parameters */ def this() = this(1.0, 100, 1.0, 1.0, true) def createModel(weights: Array[Double], intercept: Double) = { - new RidgeRegressionModel(weights, intercept) + val weightsMat = new DoubleMatrix(weights.length + 1, 1, (Array(intercept) ++ weights):_*) + val weightsScaled = weightsMat.div(xColSd) + val interceptScaled = yMean - (weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)) + + new RidgeRegressionModel(weightsScaled.data, interceptScaled) + } + + override def run( + input: RDD[LabeledPoint], + initialWeights: Array[Double]) + : RidgeRegressionModel = + { + val nfeatures: Int = input.first.features.length + val nexamples: Long = input.count() + + // To avoid penalizing the intercept, we center and scale the data. + val stats = MLUtils.computeStats(input, nfeatures, nexamples) + yMean = stats._1 + xColMean = stats._2 + xColSd = stats._3 + + val normalizedData = input.map { point => + val yNormalized = point.label - yMean + val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*) + val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd) + LabeledPoint(yNormalized, featuresNormalized.toArray) + } + + super.run(normalizedData, initialWeights) } } @@ -90,16 +126,16 @@ object RidgeRegressionWithSGD { * the number of features in the data. */ def train( - input: RDD[LabeledPoint], - numIterations: Int, - stepSize: Double, - regParam: Double, - miniBatchFraction: Double, - initialWeights: Array[Double]) - : RidgeRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : RidgeRegressionModel = { - new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input, - initialWeights) + new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run( + input, initialWeights) } /** @@ -114,14 +150,15 @@ object RidgeRegressionWithSGD { * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[LabeledPoint], - numIterations: Int, - stepSize: Double, - regParam: Double, - miniBatchFraction: Double) - : RidgeRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double) + : RidgeRegressionModel = { - new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input) + new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run( + input) } /** @@ -136,11 +173,11 @@ object RidgeRegressionWithSGD { * @return a RidgeRegressionModel which has the weights and offset from training. */ def train( - input: RDD[LabeledPoint], - numIterations: Int, - stepSize: Double, - regParam: Double) - : RidgeRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double) + : RidgeRegressionModel = { train(input, numIterations, stepSize, regParam, 1.0) } @@ -155,21 +192,23 @@ object RidgeRegressionWithSGD { * @return a RidgeRegressionModel which has the weights and offset from training. */ def train( - input: RDD[LabeledPoint], - numIterations: Int) - : RidgeRegressionModel = + input: RDD[LabeledPoint], + numIterations: Int) + : RidgeRegressionModel = { train(input, numIterations, 1.0, 1.0, 1.0) } def main(args: Array[String]) { if (args.length != 5) { - println("Usage: RidgeRegression ") + println("Usage: RidgeRegression " + + " ") System.exit(1) } val sc = new SparkContext(args(0), "RidgeRegression") val data = MLUtils.loadLabeledData(sc, args(1)) - val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, + args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala index 20e1656beb..9f48477f84 100644 --- a/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala +++ b/mllib/src/main/scala/spark/mllib/util/LinearDataGenerator.scala @@ -17,20 +17,19 @@ package spark.mllib.util +import scala.collection.JavaConversions._ import scala.util.Random import org.jblas.DoubleMatrix import spark.{RDD, SparkContext} import spark.mllib.regression.LabeledPoint -import scala.collection.JavaConversions._ import spark.mllib.regression.LabeledPoint /** * Generate sample data used for Linear Data. This class generates * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the * response variable `Y`. - * */ object LinearDataGenerator { @@ -47,8 +46,9 @@ object LinearDataGenerator { intercept: Double, weights: Array[Double], nPoints: Int, - seed: Int): java.util.List[LabeledPoint] = { - seqAsJavaList(generateLinearInput(intercept, weights, nPoints, seed)) + seed: Int, + eps: Double): java.util.List[LabeledPoint] = { + seqAsJavaList(generateLinearInput(intercept, weights, nPoints, seed, eps)) } /** @@ -70,10 +70,10 @@ object LinearDataGenerator { val rnd = new Random(seed) val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) val x = Array.fill[Array[Double]](nPoints)( - Array.fill[Double](weights.length)(rnd.nextGaussian())) - val y = x.map(xi => + Array.fill[Double](weights.length)(2 * rnd.nextDouble - 1.0)) + val y = x.map { xi => (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + eps * rnd.nextGaussian() - ) + } y.zip(x).map(p => LabeledPoint(p._1, p._2)) } @@ -95,19 +95,15 @@ object LinearDataGenerator { nexamples: Int, nfeatures: Int, eps: Double, - weights: Array[Double] = Array[Double](), nparts: Int = 2, intercept: Double = 0.0) : RDD[LabeledPoint] = { org.jblas.util.Random.seed(42) // Random values distributed uniformly in [-0.5, 0.5] val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) - (0 until weights.length.max(nfeatures)).map(i => w.put(i, 0, weights(i))) - val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p => - val seed = 42+p + val seed = 42 + p val examplesInPartition = nexamples / nparts - generateLinearInput(intercept, w.toArray, examplesInPartition, seed, eps) } data @@ -115,7 +111,7 @@ object LinearDataGenerator { def main(args: Array[String]) { if (args.length < 2) { - println("Usage: RidgeRegressionGenerator " + + println("Usage: LinearDataGenerator " + " [num_examples] [num_features] [num_partitions]") System.exit(1) } @@ -127,7 +123,7 @@ object LinearDataGenerator { val parts: Int = if (args.length > 4) args(4).toInt else 2 val eps = 10 - val sc = new SparkContext(sparkMaster, "RidgeRegressionDataGenerator") + val sc = new SparkContext(sparkMaster, "LinearDataGenerator") val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts) MLUtils.saveLabeledData(data, outputPath) diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala index 4e030a81b4..a8e6ae9953 100644 --- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala @@ -72,16 +72,16 @@ object MLUtils { * xColMean - Row vector with mean for every column (or feature) of the input data * xColSd - Row vector standard deviation for every column (or feature) of the input data. */ - def computeStats(data: RDD[(Double, Array[Double])], nfeatures: Int, nexamples: Long): + def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long): (Double, DoubleMatrix, DoubleMatrix) = { - val yMean: Double = data.map { case (y, features) => y }.reduce(_ + _) / nexamples + val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples // NOTE: We shuffle X by column here to compute column sum and sum of squares. - val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { case(y, features) => - val nCols = features.length + val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint => + val nCols = labeledPoint.features.length // Traverse over every column and emit (col, value, value^2) Iterator.tabulate(nCols) { i => - (i, (features(i), features(i)*features(i))) + (i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i))) } }.reduceByKey { case(x1, x2) => (x1._1 + x2._1, x1._2 + x2._2) diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java index 428902e85c..5863140baf 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java @@ -63,9 +63,9 @@ public class JavaLassoSuite implements Serializable { double[] weights = {-1.5, 1.0e-2}; JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, - weights, nPoints, 42), 2).cache(); + weights, nPoints, 42, 0.1), 2).cache(); List validationData = - LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); LassoWithSGD lassoSGDImpl = new LassoWithSGD(); lassoSGDImpl.optimizer().setStepSize(1.0) @@ -84,9 +84,9 @@ public class JavaLassoSuite implements Serializable { double[] weights = {-1.5, 1.0e-2}; JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, - weights, nPoints, 42), 2).cache(); + weights, nPoints, 42, 0.1), 2).cache(); List validationData = - LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java index 9642e89844..50716c7861 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -30,68 +30,65 @@ import spark.api.java.JavaSparkContext; import spark.mllib.util.LinearDataGenerator; public class JavaLinearRegressionSuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - System.clearProperty("spark.driver.port"); - } - - int validatePrediction(List validationData, LinearRegressionModel model) { - int numAccurate = 0; - for (LabeledPoint point: validationData) { - Double prediction = model.predict(point.features()); - // A prediction is off if the prediction is more than 0.5 away from expected value. - if (Math.abs(prediction - point.label()) <= 0.5) { - numAccurate++; - } + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + int validatePrediction(List validationData, LinearRegressionModel model) { + int numAccurate = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + // A prediction is off if the prediction is more than 0.5 away from expected value. + if (Math.abs(prediction - point.label()) <= 0.5) { + numAccurate++; } - return numAccurate; - } - - @Test - public void runLinearRegressionUsingConstructor() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0e-2}; - - JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); - - LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD(); - linSGDImpl.optimizer().setStepSize(1.0) - .setRegParam(0.01) - .setNumIterations(20); - LinearRegressionModel model = linSGDImpl.run(testRDD.rdd()); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - - @Test - public void runLinearRegressionUsingStaticMethods() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0e-2}; - - JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); - - LinearRegressionModel model = LinearRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); } + return numAccurate; + } + + @Test + public void runLinearRegressionUsingConstructor() { + int nPoints = 100; + double A = 3.0; + double[] weights = {10, 10}; + + JavaRDD testRDD = sc.parallelize( + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache(); + List validationData = + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); + + LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD(); + LinearRegressionModel model = linSGDImpl.run(testRDD.rdd()); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } + + @Test + public void runLinearRegressionUsingStaticMethods() { + int nPoints = 100; + double A = 3.0; + double[] weights = {10, 10}; + + JavaRDD testRDD = sc.parallelize( + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache(); + List validationData = + LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17, 0.1); + + LinearRegressionModel model = LinearRegressionWithSGD.train(testRDD.rdd(), 100); + + int numAccurate = validatePrediction(validationData, model); + Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + } } diff --git a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java index 5df6d8076d..2c0aabad30 100644 --- a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java +++ b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java @@ -25,73 +25,86 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.jblas.DoubleMatrix; + import spark.api.java.JavaRDD; import spark.api.java.JavaSparkContext; import spark.mllib.util.LinearDataGenerator; public class JavaRidgeRegressionSuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaRidgeRegressionSuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - System.clearProperty("spark.driver.port"); - } - - int validatePrediction(List validationData, RidgeRegressionModel model) { - int numAccurate = 0; - for (LabeledPoint point: validationData) { - Double prediction = model.predict(point.features()); - // A prediction is off if the prediction is more than 0.5 away from expected value. - if (Math.abs(prediction - point.label()) <= 0.5) { - numAccurate++; - } - } - return numAccurate; - } - - @Test - public void runRidgeRegressionUsingConstructor() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0e-2}; - - JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); - - RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD(); - ridgeSGDImpl.optimizer().setStepSize(1.0) - .setRegParam(0.01) - .setNumIterations(20); - RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd()); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaRidgeRegressionSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + double predictionError(List validationData, RidgeRegressionModel model) { + double errorSum = 0; + for (LabeledPoint point: validationData) { + Double prediction = model.predict(point.features()); + errorSum += (prediction - point.label()) * (prediction - point.label()); } - - @Test - public void runRidgeRegressionUsingStaticMethods() { - int nPoints = 10000; - double A = 2.0; - double[] weights = {-1.5, 1.0e-2}; - - JavaRDD testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A, - weights, nPoints, 42), 2).cache(); - List validationData = - LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 17); - - RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0); - - int numAccurate = validatePrediction(validationData, model); - Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0); - } - + return errorSum / validationData.size(); + } + + List generateRidgeData(int numPoints, int nfeatures, double eps) { + org.jblas.util.Random.seed(42); + // Pick weights as random values distributed uniformly in [-0.5, 0.5] + DoubleMatrix w = DoubleMatrix.rand(nfeatures, 1).subi(0.5); + // Set first two weights to eps + w.put(0, 0, eps); + w.put(1, 0, eps); + return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, eps); + } + + @Test + public void runRidgeRegressionUsingConstructor() { + int nexamples = 200; + int nfeatures = 20; + double eps = 10.0; + List data = generateRidgeData(2*nexamples, nfeatures, eps); + + JavaRDD testRDD = sc.parallelize(data.subList(0, nexamples)); + List validationData = data.subList(nexamples, 2*nexamples); + + RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD(); + ridgeSGDImpl.optimizer().setStepSize(1.0) + .setRegParam(0.0) + .setNumIterations(200); + RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd()); + double unRegularizedErr = predictionError(validationData, model); + + ridgeSGDImpl.optimizer().setRegParam(0.1); + model = ridgeSGDImpl.run(testRDD.rdd()); + double regularizedErr = predictionError(validationData, model); + + Assert.assertTrue(regularizedErr < unRegularizedErr); + } + + @Test + public void runRidgeRegressionUsingStaticMethods() { + int nexamples = 200; + int nfeatures = 20; + double eps = 10.0; + List data = generateRidgeData(2*nexamples, nfeatures, eps); + + JavaRDD testRDD = sc.parallelize(data.subList(0, nexamples)); + List validationData = data.subList(nexamples, 2*nexamples); + + RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.0); + double unRegularizedErr = predictionError(validationData, model); + + model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.1); + double regularizedErr = predictionError(validationData, model); + + Assert.assertTrue(regularizedErr < unRegularizedErr); + } } diff --git a/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala index 3d22b7d385..acc48a3283 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala @@ -36,10 +36,19 @@ class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { System.clearProperty("spark.driver.port") } - // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when - // X1 and X2 are collinear. - test("multi-collinear variables") { - val testRDD = LinearDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, Array(10.0, 10.0), intercept=3.0).cache() + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 + test("linear regression") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 42), 2).cache() val linReg = new LinearRegressionWithSGD() linReg.optimizer.setNumIterations(1000).setStepSize(1.0) @@ -49,5 +58,15 @@ class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { assert(model.weights.length === 2) assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0) assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) } } diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala index 0237ccdf87..c482035706 100644 --- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala @@ -20,6 +20,7 @@ package spark.mllib.regression import scala.collection.JavaConversions._ import scala.util.Random +import org.jblas.DoubleMatrix import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite @@ -27,7 +28,6 @@ import spark.SparkContext import spark.SparkContext._ import spark.mllib.util.LinearDataGenerator - class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { @transient private var sc: SparkContext = _ @@ -40,31 +40,51 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { System.clearProperty("spark.driver.port") } - // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when - // X1 and X2 are collinear. - test("multi-collinear variables") { - val testRDD = LinearDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, Array(10.0, 10.0), intercept=3.0).cache() - val ridgeReg = new RidgeRegressionWithSGD() - ridgeReg.optimizer.setNumIterations(1000).setRegParam(0.0).setStepSize(1.0) + def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = { + predictions.zip(input).map { case (prediction, expected) => + (prediction - expected.label) * (prediction - expected.label) + }.reduceLeft(_ + _) / predictions.size + } - val model = ridgeReg.run(testRDD) + test("regularization with skewed weights") { + val nexamples = 200 + val nfeatures = 20 + val eps = 10 - assert(model.intercept >= 2.5 && model.intercept <= 3.5) - assert(model.weights.length === 2) - assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0) - assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0) - } + org.jblas.util.Random.seed(42) + // Pick weights as random values distributed uniformly in [-0.5, 0.5] + val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5) + // Set first two weights to eps + w.put(0, 0, eps) + w.put(1, 0, eps) - test("multi-collinear variables with regularization") { - val testRDD = LinearDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, Array(10.0, 10.0), intercept=3.0).cache() - val ridgeReg = new RidgeRegressionWithSGD() - ridgeReg.optimizer.setNumIterations(1000).setRegParam(1.0).setStepSize(1.0) + // Use half of data for training and other half for validation + val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps) + val testData = data.take(nexamples) + val validationData = data.takeRight(nexamples) - val model = ridgeReg.run(testRDD) + val testRDD = sc.parallelize(testData, 2).cache() + val validationRDD = sc.parallelize(validationData, 2).cache() + + // First run without regularization. + val linearReg = new LinearRegressionWithSGD() + linearReg.optimizer.setNumIterations(200) + .setStepSize(1.0) + + val linearModel = linearReg.run(testRDD) + val linearErr = predictionError( + linearModel.predict(validationRDD.map(_.features)).collect(), validationData) + + val ridgeReg = new RidgeRegressionWithSGD() + ridgeReg.optimizer.setNumIterations(200) + .setRegParam(0.1) + .setStepSize(1.0) + val ridgeModel = ridgeReg.run(testRDD) + val ridgeErr = predictionError( + ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData) - assert(model.intercept <= 5.0) - assert(model.weights.length === 2) - assert(model.weights(0) <= 4.0) - assert(model.weights(1) <= 3.0) + // Ridge CV-error should be lower than linear regression + assert(ridgeErr < linearErr, + "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")") } } -- cgit v1.2.3