aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorEvan Sparks <sparks@cs.berkeley.edu>2013-08-14 16:24:23 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-08-18 15:03:13 -0700
commitb659af83d3f91f0f339d874b2742ddca20a9f610 (patch)
tree77b2b98a2bdf1433fd49144632f2d29f1b53f803 /mllib
parent044a088c0db68220aae2dad425886b618bb0023f (diff)
downloadspark-b659af83d3f91f0f339d874b2742ddca20a9f610.tar.gz
spark-b659af83d3f91f0f339d874b2742ddca20a9f610.tar.bz2
spark-b659af83d3f91f0f339d874b2742ddca20a9f610.zip
Adding Linear Regression, and refactoring Ridge Regression.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala168
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala271
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala98
-rw-r--r--mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala5
-rw-r--r--mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java96
-rw-r--r--mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java96
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala87
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala68
8 files changed, 713 insertions, 176 deletions
diff --git a/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala
new file mode 100644
index 0000000000..0ea5348a1f
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/LinearRegression.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD, SparkContext}
+import spark.mllib.optimization._
+import spark.mllib.util.MLUtils
+
+import org.jblas.DoubleMatrix
+
+/**
+ * Regression model trained using LinearRegression.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
+ */
+class LinearRegressionModel(
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with RegressionModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ dataMatrix.dot(weightMatrix) + intercept
+ }
+}
+
+/**
+ * Train a regression model with no regularization using Stochastic Gradient Descent.
+ */
+class LinearRegressionWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[LinearRegressionModel]
+ with Serializable {
+
+ val gradient = new SquaredGradient()
+ val updater = new SimpleUpdater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setMiniBatchFraction(miniBatchFraction)
+
+ /**
+ * Construct a LinearRegression object with default parameters
+ */
+ def this() = this(1.0, 100, 1.0, true)
+
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LinearRegressionModel(weights, intercept)
+ }
+}
+
+/**
+ * Top-level methods for calling LinearRegression.
+ */
+object LinearRegressionWithSGD {
+
+ /**
+ * Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : LinearRegressionModel =
+ {
+ new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, true).run(input,
+ initialWeights)
+ }
+
+ /**
+ * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double)
+ : LinearRegressionModel =
+ {
+ new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, true).run(input)
+ }
+
+ /**
+ * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LinearRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double)
+ : LinearRegressionModel =
+ {
+ train(input, numIterations, stepSize, 1.0)
+ }
+
+ /**
+ * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a LinearRegressionModel which has the weights and offset from training.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int)
+ : LinearRegressionModel =
+ {
+ train(input, numIterations, 1.0, 1.0)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ println("Usage: LinearRegression <master> <input_dir> <step_size> <niters>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "LinearRegression")
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
+
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
index b42d94af41..addf8cd59e 100644
--- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
@@ -18,200 +18,159 @@
package spark.mllib.regression
import spark.{Logging, RDD, SparkContext}
+import spark.mllib.optimization._
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
-import org.jblas.Solve
-
-import scala.annotation.tailrec
-import scala.collection.mutable
/**
- * Ridge Regression from Joseph Gonzalez's implementation in MLBase
+ * Regression model trained using RidgeRegression.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
*/
class RidgeRegressionModel(
- val weights: DoubleMatrix,
- val intercept: Double,
- val lambdaOpt: Double,
- val lambdas: Seq[(Double, Double, DoubleMatrix)])
- extends RegressionModel {
-
- override def predict(testData: RDD[Array[Double]]): RDD[Double] = {
- // A small optimization to avoid serializing the entire model.
- val localIntercept = this.intercept
- val localWeights = this.weights
- testData.map { x =>
- (new DoubleMatrix(1, x.length, x:_*).mmul(localWeights)).get(0) + localIntercept
- }
- }
-
- override def predict(testData: Array[Double]): Double = {
- (new DoubleMatrix(1, testData.length, testData:_*).mmul(this.weights)).get(0) + this.intercept
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with RegressionModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ dataMatrix.dot(weightMatrix) + intercept
}
}
-class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double)
- extends Logging {
-
- def this() = this(0.0, 100.0)
+/**
+ * Train a regression model with L2-regularization using Stochastic Gradient Descent.
+ */
+class RidgeRegressionWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[RidgeRegressionModel]
+ with Serializable {
+
+ val gradient = new SquaredGradient()
+ val updater = new SquaredL2Updater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
/**
- * Set the lower bound on binary search for lambda's. Default is 0.
+ * Construct a RidgeRegression object with default parameters
*/
- def setLowLambda(low: Double) = {
- this.lambdaLow = low
- this
+ def this() = this(1.0, 100, 1.0, 1.0, true)
+
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new RidgeRegressionModel(weights, intercept)
}
+}
+
+/**
+ * Top-level methods for calling RidgeRegression.
+ */
+object RidgeRegressionWithSGD {
/**
- * Set the upper bound on binary search for lambda's. Default is 100.0.
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
+ * gradient descent are initialized using the initial weights provided.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * the number of features in the data.
*/
- def setHighLambda(hi: Double) = {
- this.lambdaHigh = hi
- this
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
+ initialWeights: Array[Double])
+ : RidgeRegressionModel =
+ {
+ new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
+ initialWeights)
}
- def train(inputLabeled: RDD[LabeledPoint]): RidgeRegressionModel = {
- val input = inputLabeled.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
- val nfeatures: Int = input.take(1)(0)._2.length
- val nexamples: Long = input.count()
-
- val (yMean, xColMean, xColSd) = MLUtils.computeStats(input, nfeatures, nexamples)
-
- val data = input.map { case(y, features) =>
- val yNormalized = y - yMean
- val featuresMat = new DoubleMatrix(nfeatures, 1, features:_*)
- val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
- (yNormalized, featuresNormalized.toArray)
- }
-
- // Compute XtX - Size of XtX is nfeatures by nfeatures
- val XtX: DoubleMatrix = data.map { case (y, features) =>
- val x = new DoubleMatrix(1, features.length, features:_*)
- x.transpose().mmul(x)
- }.reduce(_.addi(_))
-
- // Compute Xt*y - Size of Xty is nfeatures by 1
- val Xty: DoubleMatrix = data.map { case (y, features) =>
- new DoubleMatrix(features.length, 1, features:_*).mul(y)
- }.reduce(_.addi(_))
-
- // Define a function to compute the leave one out cross validation error
- // for a single example
- def crossValidate(lambda: Double): (Double, Double, DoubleMatrix) = {
- // Compute the MLE ridge regression parameter value
-
- // Ridge Regression parameter = inv(XtX + \lambda*I) * Xty
- val XtXlambda = DoubleMatrix.eye(nfeatures).muli(lambda).addi(XtX)
- val w = Solve.solveSymmetric(XtXlambda, Xty)
-
- val invXtX = Solve.solveSymmetric(XtXlambda, DoubleMatrix.eye(nfeatures))
-
- // compute the generalized cross validation score
- val cverror = data.map {
- case (y, features) =>
- val x = new DoubleMatrix(features.length, 1, features:_*)
- val yhat = w.transpose().mmul(x).get(0)
- val H_ii = x.transpose().mmul(invXtX).mmul(x).get(0)
- val residual = (y - yhat) / (1.0 - H_ii)
- residual * residual
- }.reduce(_ + _) / nexamples
-
- (lambda, cverror, w)
- }
-
- // Binary search for the best assignment to lambda.
- def binSearch(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = {
- val buffer = mutable.ListBuffer.empty[(Double, Double, DoubleMatrix)]
-
- @tailrec
- def loop(low: Double, high: Double): Seq[(Double, Double, DoubleMatrix)] = {
- val mid = (high - low) / 2 + low
- val lowValue = crossValidate((mid - low) / 2 + low)
- val highValue = crossValidate((high - mid) / 2 + mid)
- val (newLow, newHigh) = if (lowValue._2 < highValue._2) {
- (low, mid + (high-low)/4)
- } else {
- (mid - (high-low)/4, high)
- }
- if (newHigh - newLow > 1.0E-7) {
- buffer += lowValue += highValue
- loop(newLow, newHigh)
- } else {
- buffer += lowValue += highValue
- buffer.result()
- }
- }
-
- loop(low, high)
- }
-
- // Actually compute the best lambda
- val lambdas = binSearch(lambdaLow, lambdaHigh).sortBy(_._1)
-
- // Find the best parameter set by taking the lowest cverror.
- val (lambdaOpt, cverror, weights) = lambdas.reduce((a, b) => if (a._2 < b._2) a else b)
-
- // Return the model which contains the solution
- val weightsScaled = weights.div(xColSd)
- val intercept = yMean - (weights.transpose().mmul(xColMean.div(xColSd)).get(0))
- val model = new RidgeRegressionModel(weightsScaled, intercept, lambdaOpt, lambdas)
-
- logInfo("RidgeRegression: optimal lambda " + model.lambdaOpt)
- logInfo("RidgeRegression: optimal weights " + model.weights)
- logInfo("RidgeRegression: optimal intercept " + model.intercept)
- logInfo("RidgeRegression: cross-validation error " + cverror)
-
- model
+ /**
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. Each iteration uses
+ * `miniBatchFraction` fraction of the data to calculate the gradient.
+ *
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @param stepSize Step size to be used for each iteration of gradient descent.
+ * @param regParam Regularization parameter.
+ * @param miniBatchFraction Fraction of data to be used per iteration.
+ */
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double)
+ : RidgeRegressionModel =
+ {
+ new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
}
-}
-
-/**
- * Top-level methods for calling Ridge Regression.
- */
-object RidgeRegression {
- // NOTE(shivaram): We use multiple train methods instead of default arguments to support
- // Java programs.
/**
- * Train a ridge regression model given an RDD of (response, features) pairs.
- * We use the closed form solution to compute the cross-validation score for
- * a given lambda. The optimal lambda is computed by performing binary search
- * between the provided bounds of lambda.
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
*
- * @param input RDD of (response, array of features) pairs.
- * @param lambdaLow lower bound used in binary search for lambda
- * @param lambdaHigh upper bound used in binary search for lambda
+ * @param input RDD of (label, array of features) pairs.
+ * @param stepSize Step size to be used for each iteration of Gradient Descent.
+ * @param regParam Regularization parameter.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a RidgeRegressionModel which has the weights and offset from training.
*/
def train(
- input: RDD[LabeledPoint],
- lambdaLow: Double,
- lambdaHigh: Double)
- : RidgeRegressionModel =
+ input: RDD[LabeledPoint],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double)
+ : RidgeRegressionModel =
{
- new RidgeRegression(lambdaLow, lambdaHigh).train(input)
+ train(input, numIterations, stepSize, regParam, 1.0)
}
/**
- * Train a ridge regression model given an RDD of (response, features) pairs.
- * We use the closed form solution to compute the cross-validation score for
- * a given lambda. The optimal lambda is computed by performing binary search
- * between lambda values of 0 and 100.
+ * Train a RidgeRegression model given an RDD of (label, features) pairs. We run a fixed number
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
*
- * @param input RDD of (response, array of features) pairs.
+ * @param input RDD of (label, array of features) pairs.
+ * @param numIterations Number of iterations of gradient descent to run.
+ * @return a RidgeRegressionModel which has the weights and offset from training.
*/
- def train(input: RDD[LabeledPoint]) : RidgeRegressionModel = {
- train(input, 0.0, 100.0)
+ def train(
+ input: RDD[LabeledPoint],
+ numIterations: Int)
+ : RidgeRegressionModel =
+ {
+ train(input, numIterations, 1.0, 1.0, 1.0)
}
def main(args: Array[String]) {
- if (args.length != 2) {
- println("Usage: RidgeRegression <master> <input_dir>")
+ if (args.length != 5) {
+ println("Usage: RidgeRegression <master> <input_dir> <step_size> <regularization_parameter> <niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "RidgeRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
- val model = RidgeRegression.train(data, 0, 1000)
+ val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+
sc.stop()
}
}
diff --git a/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala
new file mode 100644
index 0000000000..39e2a30b55
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/LinearRegressionDataGenerator.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.util
+
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.regression.LabeledPoint
+
+/**
+ * Generate sample data used for LinearRegression. This class generates
+ * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the
+ * response variable `Y`.
+ *
+ */
+object LinearRegressionDataGenerator {
+
+ /**
+ * Generate an RDD containing sample data for LinearRegression.
+ *
+ * @param sc SparkContext to be used for generating the RDD.
+ * @param nexamples Number of examples that will be contained in the RDD.
+ * @param nfeatures Number of features to generate for each example.
+ * @param eps Epsilon factor by which examples are scaled.
+ * @param nparts Number of partitions in the RDD. Default value is 2.
+ *
+ * @return RDD of LabeledPoint containing sample data.
+ */
+ def generateLinearRDD(
+ sc: SparkContext,
+ nexamples: Int,
+ nfeatures: Int,
+ eps: Double,
+ nparts: Int = 2,
+ intercept: Double = 0.0) : RDD[LabeledPoint] = {
+ org.jblas.util.Random.seed(42)
+ // Random values distributed uniformly in [-0.5, 0.5]
+ val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
+ w.put(0, 0, 10)
+ w.put(1, 0, 10)
+
+ val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
+ org.jblas.util.Random.seed(42 + p)
+ val examplesInPartition = nexamples / nparts
+
+ val X = DoubleMatrix.rand(examplesInPartition, nfeatures)
+ val y = X.mmul(w).add(intercept)
+
+ val rnd = new Random(42 + p)
+
+ val normalValues = Array.fill[Double](examplesInPartition)(rnd.nextGaussian() * eps)
+ val yObs = new DoubleMatrix(normalValues).addi(y)
+
+ Iterator.tabulate(examplesInPartition) { i =>
+ LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray)
+ }
+ }
+ data
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("Usage: LinearRegressionGenerator " +
+ "<master> <output_dir> [num_examples] [num_features] [num_partitions]")
+ System.exit(1)
+ }
+
+ val sparkMaster: String = args(0)
+ val outputPath: String = args(1)
+ val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
+ val nfeatures: Int = if (args.length > 3) args(3).toInt else 100
+ val parts: Int = if (args.length > 4) args(4).toInt else 2
+ val eps = 10
+
+ val sc = new SparkContext(sparkMaster, "LinearRegressionDataGenerator")
+ val data = generateLinearRDD(sc, nexamples, nfeatures, eps, parts)
+
+ MLUtils.saveLabeledData(data, outputPath)
+ sc.stop()
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
index 4d329168be..08dce723b8 100644
--- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
@@ -48,7 +48,8 @@ object RidgeRegressionDataGenerator {
nexamples: Int,
nfeatures: Int,
eps: Double,
- nparts: Int = 2) : RDD[LabeledPoint] = {
+ nparts: Int = 2,
+ intercept: Double = 0.0) : RDD[LabeledPoint] = {
org.jblas.util.Random.seed(42)
// Random values distributed uniformly in [-0.5, 0.5]
val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
@@ -60,7 +61,7 @@ object RidgeRegressionDataGenerator {
val examplesInPartition = nexamples / nparts
val X = DoubleMatrix.rand(examplesInPartition, nfeatures)
- val y = X.mmul(w)
+ val y = X.mmul(w).add(intercept)
val rnd = new Random(42 + p)
diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java
new file mode 100644
index 0000000000..14d3d4ef39
--- /dev/null
+++ b/mllib/src/test/java/spark/mllib/regression/JavaLinearRegressionSuite.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+
+public class JavaLinearRegressionSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaLinearRegressionSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ int validatePrediction(List<LabeledPoint> validationData, LinearRegressionModel model) {
+ int numAccurate = 0;
+ for (LabeledPoint point: validationData) {
+ Double prediction = model.predict(point.features());
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ if (Math.abs(prediction - point.label()) <= 0.5) {
+ numAccurate++;
+ }
+ }
+ return numAccurate;
+ }
+
+ @Test
+ public void runLinearRegressionUsingConstructor() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0e-2};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearRegressionSuite.generateLinearRegressionInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ LinearRegressionSuite.generateLinearRegressionInputAsList(A, weights, nPoints, 17);
+
+ LinearRegressionWithSGD svmSGDImpl = new LinearRegressionWithSGD();
+ svmSGDImpl.optimizer().setStepSize(1.0)
+ .setRegParam(0.01)
+ .setNumIterations(20);
+ LinearRegressionModel model = svmSGDImpl.run(testRDD.rdd());
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+ @Test
+ public void runLinearRegressionUsingStaticMethods() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0e-2};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearRegressionSuite.generateLinearRegressionInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ LinearRegressionSuite.generateLinearRegressionInputAsList(A, weights, nPoints, 17);
+
+ LinearRegressionModel model = LinearRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0);
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+}
diff --git a/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java
new file mode 100644
index 0000000000..4f379b51d5
--- /dev/null
+++ b/mllib/src/test/java/spark/mllib/regression/JavaRidgeRegressionSuite.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+
+public class JavaRidgeRegressionSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaRidgeRegressionSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ int validatePrediction(List<LabeledPoint> validationData, RidgeRegressionModel model) {
+ int numAccurate = 0;
+ for (LabeledPoint point: validationData) {
+ Double prediction = model.predict(point.features());
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ if (Math.abs(prediction - point.label()) <= 0.5) {
+ numAccurate++;
+ }
+ }
+ return numAccurate;
+ }
+
+ @Test
+ public void runRidgeRegressionUsingConstructor() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0e-2};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(RidgeRegressionSuite.generateRidgeRegressionInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, weights, nPoints, 17);
+
+ RidgeRegressionWithSGD svmSGDImpl = new RidgeRegressionWithSGD();
+ svmSGDImpl.optimizer().setStepSize(1.0)
+ .setRegParam(0.01)
+ .setNumIterations(20);
+ RidgeRegressionModel model = svmSGDImpl.run(testRDD.rdd());
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+ @Test
+ public void runRidgeRegressionUsingStaticMethods() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0e-2};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(RidgeRegressionSuite.generateRidgeRegressionInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ RidgeRegressionSuite.generateRidgeRegressionInputAsList(A, weights, nPoints, 17);
+
+ RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0);
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+}
diff --git a/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala
new file mode 100644
index 0000000000..c794c1cac5
--- /dev/null
+++ b/mllib/src/test/scala/spark/mllib/regression/LinearRegressionSuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import spark.SparkContext
+import spark.SparkContext._
+import spark.mllib.util.LinearRegressionDataGenerator
+import spark.mllib.regression.LabeledPoint
+import org.jblas.DoubleMatrix
+
+object LinearRegressionSuite {
+
+ def generateLinearRegressionInputAsList(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateLinearRegressionInput(intercept, weights, nPoints, seed))
+ }
+
+
+ // Generate noisy input of the form Y = x.dot(weights) + intercept + noise
+ def generateLinearRegressionInput(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val rnd = new Random(seed)
+ val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val y = x.map(xi =>
+ (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()
+ )
+ y.zip(x).map(p => LabeledPoint(p._1, p._2))
+ }
+
+}
+
+class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when
+ // X1 and X2 are collinear.
+ test("multi-collinear variables") {
+ val testRDD = LinearRegressionDataGenerator.generateLinearRDD(sc, 100, 2, 0.0, intercept=3.0).cache()
+ val linReg = new LinearRegressionWithSGD()
+ linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+ val model = linReg.run(testRDD)
+
+ assert(model.intercept >= 2.5 && model.intercept <= 3.5)
+ assert(model.weights.length === 2)
+ assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
+ assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+ }
+}
diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
index e2b244894d..aaac083ad9 100644
--- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -17,6 +17,7 @@
package spark.mllib.regression
+import scala.collection.JavaConversions._
import scala.util.Random
import org.scalatest.BeforeAndAfterAll
@@ -24,6 +25,37 @@ import org.scalatest.FunSuite
import spark.SparkContext
import spark.SparkContext._
+import spark.mllib.util.RidgeRegressionDataGenerator
+import org.jblas.DoubleMatrix
+
+object RidgeRegressionSuite {
+
+ def generateRidgeRegressionInputAsList(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateRidgeRegressionInput(intercept, weights, nPoints, seed))
+ }
+
+
+ // Generate noisy input of the form Y = x.dot(weights) + intercept + noise
+ def generateRidgeRegressionInput(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): Seq[LabeledPoint] = {
+ val rnd = new Random(seed)
+ val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val y = x.map(xi =>
+ (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()
+ )
+ y.zip(x).map(p => LabeledPoint(p._1, p._2))
+ }
+
+}
class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
@@ -38,31 +70,31 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
System.clearProperty("spark.driver.port")
}
- // Test if we can correctly learn Y = 3 + X1 + X2 when
+ // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 when
// X1 and X2 are collinear.
test("multi-collinear variables") {
- val rnd = new Random(43)
- val x1 = Array.fill[Double](20)(rnd.nextGaussian())
-
- // Pick a mean close to mean of x1
- val rnd1 = new Random(42) //new NormalDistribution(0.1, 0.01)
- val x2 = Array.fill[Double](20)(0.1 + rnd1.nextGaussian() * 0.01)
+ val testRDD = RidgeRegressionDataGenerator.generateRidgeRDD(sc, 100, 2, 0.0, intercept=3.0).cache()
+ val ridgeReg = new RidgeRegressionWithSGD()
+ ridgeReg.optimizer.setNumIterations(1000).setRegParam(0.0).setStepSize(1.0)
- val xMat = (0 until 20).map(i => Array(x1(i), x2(i))).toArray
+ val model = ridgeReg.run(testRDD)
- val y = xMat.map(i => 3 + i(0) + i(1))
- val testData = (0 until 20).map(i => LabeledPoint(y(i), xMat(i))).toArray
+ assert(model.intercept >= 2.5 && model.intercept <= 3.5)
+ assert(model.weights.length === 2)
+ assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
+ assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+ }
- val testRDD = sc.parallelize(testData, 2)
- testRDD.cache()
- val ridgeReg = new RidgeRegression().setLowLambda(0)
- .setHighLambda(10)
+ test("multi-collinear variables with regularization") {
+ val testRDD = RidgeRegressionDataGenerator.generateRidgeRDD(sc, 100, 2, 0.0, intercept=3.0).cache()
+ val ridgeReg = new RidgeRegressionWithSGD()
+ ridgeReg.optimizer.setNumIterations(1000).setRegParam(1.0).setStepSize(1.0)
- val model = ridgeReg.train(testRDD)
+ val model = ridgeReg.run(testRDD)
- assert(model.intercept >= 2.9 && model.intercept <= 3.1)
+ assert(model.intercept <= 5.0)
assert(model.weights.length === 2)
- assert(model.weights.get(0) >= 0.9 && model.weights.get(0) <= 1.1)
- assert(model.weights.get(1) >= 0.9 && model.weights.get(1) <= 1.1)
+ assert(model.weights(0) <= 3.0)
+ assert(model.weights(1) <= 3.0)
}
}