aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SerializableWritable.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockFetcherIterator.scala3
-rwxr-xr-xdocs/_layouts/global.html1
-rw-r--r--docs/_plugins/copy_api_dirs.rb2
-rw-r--r--docs/spark-simple-tutorial.md41
-rw-r--r--examples/src/main/java/spark/mllib/examples/JavaLR.java85
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala4
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala152
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala151
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Gradient.scala23
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala102
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala29
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Updater.scala23
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala9
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala142
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala26
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala147
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala11
-rw-r--r--mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala10
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala19
-rw-r--r--mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala13
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MLUtils.scala30
-rw-r--r--mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala23
-rw-r--r--mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala21
-rw-r--r--mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java98
-rw-r--r--mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java98
-rw-r--r--mllib/src/test/java/spark/mllib/clustering/JavaKMeansSuite.java (renamed from mllib/src/test/scala/spark/mllib/clustering/JavaKMeansSuite.java)0
-rw-r--r--mllib/src/test/java/spark/mllib/recommendation/JavaALSSuite.java (renamed from mllib/src/test/scala/spark/mllib/recommendation/JavaALSSuite.java)0
-rw-r--r--mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java96
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala61
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala78
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala74
-rw-r--r--mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala2
-rw-r--r--pom.xml2
34 files changed, 1068 insertions, 512 deletions
diff --git a/core/src/main/scala/spark/SerializableWritable.scala b/core/src/main/scala/spark/SerializableWritable.scala
index 0236611ef9..936d8e6241 100644
--- a/core/src/main/scala/spark/SerializableWritable.scala
+++ b/core/src/main/scala/spark/SerializableWritable.scala
@@ -21,7 +21,7 @@ import java.io._
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.conf.Configuration
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
@@ -35,7 +35,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
val ow = new ObjectWritable()
- ow.setConf(new JobConf())
+ ow.setConf(new Configuration())
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index 1965c5bc19..07e3db30fe 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -132,9 +132,10 @@ object BlockFetcherIterator {
"Unexpected message " + blockMessage.getType + " received from " + cmId)
}
val blockId = blockMessage.getId
+ val networkSize = blockMessage.getData.limit()
results.put(new FetchResult(blockId, sizeMap(blockId),
() => dataDeserialize(blockId, blockMessage.getData, serializer)))
- _remoteBytesRead += req.size
+ _remoteBytesRead += networkSize
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}
}
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index f06ab2d5b0..a76346f428 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -74,6 +74,7 @@
<li><a href="api/core/index.html">Spark Java/Scala (Scaladoc)</a></li>
<li><a href="api/pyspark/index.html">Spark Python (Epydoc)</a></li>
<li><a href="api/streaming/index.html">Spark Streaming Java/Scala (Scaladoc) </a></li>
+ <li><a href="api/mllib/index.html">Spark ML Library (Scaladoc) </a></li>
</ul>
</li>
diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb
index 45ef4bba82..217254c59f 100644
--- a/docs/_plugins/copy_api_dirs.rb
+++ b/docs/_plugins/copy_api_dirs.rb
@@ -20,7 +20,7 @@ include FileUtils
if ENV['SKIP_API'] != '1'
# Build Scaladoc for Java/Scala
- projects = ["core", "examples", "repl", "bagel", "streaming"]
+ projects = ["core", "examples", "repl", "bagel", "streaming", "mllib"]
puts "Moving to project root and building scaladoc."
curr_dir = pwd
diff --git a/docs/spark-simple-tutorial.md b/docs/spark-simple-tutorial.md
deleted file mode 100644
index fbdbc7d19d..0000000000
--- a/docs/spark-simple-tutorial.md
+++ /dev/null
@@ -1,41 +0,0 @@
----
-layout: global
-title: Tutorial - Running a Simple Spark Application
----
-
-1. Create directory for spark demo:
-
- ~$ mkdir SparkTest
-
-2. Copy the sbt files in ~/spark/sbt directory:
-
- ~/SparkTest$ cp -r ../spark/sbt .
-
-3. Edit the ~/SparkTest/sbt/sbt file to look like this:
-
- #!/usr/bin/env bash
- java -Xmx800M -XX:MaxPermSize=150m -jar $(dirname $0)/sbt-launch-*.jar "$@"
-
-4. To build a Spark application, you need Spark and its dependencies in a single Java archive (JAR) file. Create this JAR in Spark's main directory with sbt as:
-
- ~/spark$ sbt/sbt assembly
-
-5. create a source file in ~/SparkTest/src/main/scala directory:
-
- ~/SparkTest/src/main/scala$ vi Test1.scala
-
-6. Make the contain of the Test1.scala file like this:
-
- import spark.SparkContext
- import spark.SparkContext._
- object Test1 {
- def main(args: Array[String]) {
- val sc = new SparkContext("local", "SparkTest")
- println(sc.parallelize(1 to 10).reduce(_ + _))
- System.exit(0)
- }
- }
-
-7. Run the Test1.scala file:
-
- ~/SparkTest$ sbt/sbt run
diff --git a/examples/src/main/java/spark/mllib/examples/JavaLR.java b/examples/src/main/java/spark/mllib/examples/JavaLR.java
new file mode 100644
index 0000000000..bf4aeaf40f
--- /dev/null
+++ b/examples/src/main/java/spark/mllib/examples/JavaLR.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.examples;
+
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.Function;
+
+import spark.mllib.classification.LogisticRegressionWithSGD;
+import spark.mllib.classification.LogisticRegressionModel;
+import spark.mllib.regression.LabeledPoint;
+
+import java.util.Arrays;
+import java.util.StringTokenizer;
+
+/**
+ * Logistic regression based classification using ML Lib.
+ */
+public class JavaLR {
+
+ static class ParsePoint extends Function<String, LabeledPoint> {
+ public LabeledPoint call(String line) {
+ String[] parts = line.split(",");
+ double y = Double.parseDouble(parts[0]);
+ StringTokenizer tok = new StringTokenizer(parts[1], " ");
+ int numTokens = tok.countTokens();
+ double[] x = new double[numTokens];
+ for (int i = 0; i < numTokens; ++i) {
+ x[i] = Double.parseDouble(tok.nextToken());
+ }
+ return new LabeledPoint(y, x);
+ }
+ }
+
+ public static void printWeights(double[] a) {
+ System.out.println(Arrays.toString(a));
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 4) {
+ System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
+ System.exit(1);
+ }
+
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ JavaRDD<String> lines = sc.textFile(args[1]);
+ JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
+ double stepSize = Double.parseDouble(args[2]);
+ int iterations = Integer.parseInt(args[3]);
+
+ // Another way to configure LogisticRegression
+ //
+ // LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD();
+ // lr.optimizer().setNumIterations(iterations)
+ // .setStepSize(stepSize)
+ // .setMiniBatchFraction(1.0);
+ // lr.setIntercept(true);
+ // LogisticRegressionModel model = lr.train(points.rdd());
+
+ LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
+ iterations, stepSize);
+
+ System.out.print("Final w: ");
+ printWeights(model.weights());
+
+ System.exit(0);
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
index d6154b66ae..70fae8c15a 100644
--- a/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
@@ -9,7 +9,7 @@ trait ClassificationModel extends Serializable {
* @param testData RDD representing data points to be predicted
* @return RDD[Int] where each entry contains the corresponding prediction
*/
- def predict(testData: RDD[Array[Double]]): RDD[Int]
+ def predict(testData: RDD[Array[Double]]): RDD[Double]
/**
* Predict values for a single data point using the model trained.
@@ -17,5 +17,5 @@ trait ClassificationModel extends Serializable {
* @param testData array representing a single data point
* @return Int prediction from the trained model
*/
- def predict(testData: Array[Double]): Int
+ def predict(testData: Array[Double]): Double
}
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index 203aa8fdd4..30ee0ab0ff 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -19,6 +19,7 @@ package spark.mllib.classification
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
+import spark.mllib.regression._
import spark.mllib.util.MLUtils
import scala.math.round
@@ -26,113 +27,58 @@ import scala.math.round
import org.jblas.DoubleMatrix
/**
- * Logistic Regression using Stochastic Gradient Descent.
- * Based on Matlab code written by John Duchi.
+ * Classification model trained using Logistic Regression.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
*/
class LogisticRegressionModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends ClassificationModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept
- round(1.0/ (1.0 + math.exp(margin * -1))).toInt
- }
- }
-
- override def predict(testData: Array[Double]): Int = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept
- round(1.0/ (1.0 + math.exp(margin * -1))).toInt
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with ClassificationModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
+ round(1.0/ (1.0 + math.exp(margin * -1)))
}
}
-class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double,
- var numIters: Int)
- extends Logging {
-
+/**
+ * Train a classification model for Logistic Regression using Stochastic Gradient Descent.
+ */
+class LogisticRegressionWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
+ with Serializable {
+
+ val gradient = new LogisticGradient()
+ val updater = new SimpleUpdater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
/**
* Construct a LogisticRegression object with default parameters
*/
- def this() = this(1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
-
- def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Int, Array[Double])],
- initialWeights: Array[Double]): LogisticRegressionModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y.toDouble, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new LogisticGradient(),
- new SimpleUpdater(),
- stepSize,
- numIters,
- 0.0,
- initalWeightsWithIntercept,
- miniBatchFraction)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new LogisticRegressionModel(weightsScaled, intercept, stochasticLosses)
+ def this() = this(1.0, 100, 0.0, 1.0, true)
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochastic losses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LogisticRegressionModel(weights, intercept)
}
}
/**
* Top-level methods for calling Logistic Regression.
- * NOTE(shivaram): We use multiple train methods instead of default arguments to support
- * Java programs.
*/
-object LogisticRegressionLocalRandomSGD {
+object LogisticRegressionWithSGD {
+ // NOTE(shivaram): We use multiple train methods instead of default arguments to support
+ // Java programs.
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
@@ -148,14 +94,14 @@ object LogisticRegressionLocalRandomSGD {
* the number of features in the data.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
{
- new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
input, initialWeights)
}
@@ -171,13 +117,14 @@ object LogisticRegressionLocalRandomSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double)
: LogisticRegressionModel =
{
- new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input)
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction, true).run(
+ input)
}
/**
@@ -192,7 +139,7 @@ object LogisticRegressionLocalRandomSGD {
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double)
: LogisticRegressionModel =
@@ -210,7 +157,7 @@ object LogisticRegressionLocalRandomSGD {
* @return a LogisticRegressionModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int)
: LogisticRegressionModel =
{
@@ -218,15 +165,14 @@ object LogisticRegressionLocalRandomSGD {
}
def main(args: Array[String]) {
- if (args.length != 5) {
+ if (args.length != 4) {
println("Usage: LogisticRegression <master> <input_dir> <step_size> " +
- "<regularization_parameter> <niters>")
+ "<niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "LogisticRegression")
- val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = LogisticRegressionLocalRandomSGD.train(
- data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
index 3a6a12814a..f799cb2829 100644
--- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -20,125 +20,60 @@ package spark.mllib.classification
import scala.math.signum
import spark.{Logging, RDD, SparkContext}
import spark.mllib.optimization._
+import spark.mllib.regression._
import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
/**
- * SVM using Stochastic Gradient Descent.
+ * Model built using SVM.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
*/
class SVMModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends ClassificationModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt
- }
- }
-
- override def predict(testData: Array[Double]): Int = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- signum(dataMat.dot(weightsMatrix) + this.intercept).toInt
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with ClassificationModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ signum(dataMatrix.dot(weightMatrix) + intercept)
}
}
-
-
-class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double,
- var miniBatchFraction: Double, var numIters: Int)
- extends Logging {
-
+/**
+ * Train an SVM using Stochastic Gradient Descent.
+ */
+class SVMWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {
+
+ val gradient = new HingeGradient()
+ val updater = new SquaredL2Updater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
/**
* Construct a SVM object with default parameters
*/
- def this() = this(1.0, 1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
+ def this() = this(1.0, 100, 1.0, 1.0, true)
- /**
- * Set the regularization parameter. Default 1.0.
- */
- def setRegParam(param: Double) = {
- this.regParam = param
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
-
- def train(input: RDD[(Int, Array[Double])]): SVMModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Int, Array[Double])],
- initialWeights: Array[Double]): SVMModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y.toDouble, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new HingeGradient(),
- new SquaredL2Updater(),
- stepSize,
- numIters,
- regParam,
- initalWeightsWithIntercept,
- miniBatchFraction)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new SVMModel(weightsScaled, intercept, stochasticLosses)
-
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new SVMModel(weights, intercept)
}
}
/**
* Top-level methods for calling SVM.
-
-
*/
-object SVMLocalRandomSGD {
+object SVMWithSGD {
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
@@ -155,7 +90,7 @@ object SVMLocalRandomSGD {
* the number of features in the data.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
@@ -163,8 +98,8 @@ object SVMLocalRandomSGD {
initialWeights: Array[Double])
: SVMModel =
{
- new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
- input, initialWeights)
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
+ initialWeights)
}
/**
@@ -179,14 +114,14 @@ object SVMLocalRandomSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double)
: SVMModel =
{
- new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
}
/**
@@ -201,7 +136,7 @@ object SVMLocalRandomSGD {
* @return a SVMModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double)
@@ -220,7 +155,7 @@ object SVMLocalRandomSGD {
* @return a SVMModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Int, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int)
: SVMModel =
{
@@ -233,8 +168,8 @@ object SVMLocalRandomSGD {
System.exit(1)
}
val sc = new SparkContext(args(0), "SVM")
- val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
index 22b2ec5ed6..e72b8b3a92 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
@@ -19,18 +19,29 @@ package spark.mllib.optimization
import org.jblas.DoubleMatrix
+/**
+ * Class used to compute the gradient for a loss function, given a single data point.
+ */
abstract class Gradient extends Serializable {
/**
- * Compute the gradient for a given row of data.
+ * Compute the gradient and loss given features of a single data point.
*
- * @param data - One row of data. Row matrix of size 1xn where n is the number of features.
+ * @param data - Feature values for one data point. Column matrix of size nx1
+ * where n is the number of features.
* @param label - Label for this data item.
* @param weights - Column matrix containing weights for every feature.
+ *
+ * @return A tuple of 2 elements. The first element is a column matrix containing the computed
+ * gradient and the second element is the loss computed at this data point.
+ *
*/
def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double)
}
+/**
+ * Compute gradient and loss for a logistic loss function.
+ */
class LogisticGradient extends Gradient {
override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
@@ -49,7 +60,9 @@ class LogisticGradient extends Gradient {
}
}
-
+/**
+ * Compute gradient and loss for a Least-squared loss function.
+ */
class SquaredGradient extends Gradient {
override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
@@ -62,7 +75,9 @@ class SquaredGradient extends Gradient {
}
}
-
+/**
+ * Compute gradient and loss for a Hinge loss function.
+ */
class HingeGradient extends Gradient {
override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double) = {
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index 19cda26446..31917df7e8 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -24,18 +24,94 @@ import org.jblas.DoubleMatrix
import scala.collection.mutable.ArrayBuffer
+/**
+ * Class used to solve an optimization problem using Gradient Descent.
+ * @param gradient Gradient function to be used.
+ * @param updater Updater to be used to update weights after every iteration.
+ */
+class GradientDescent(var gradient: Gradient, var updater: Updater) extends Optimizer {
+
+ private var stepSize: Double = 1.0
+ private var numIterations: Int = 100
+ private var regParam: Double = 0.0
+ private var miniBatchFraction: Double = 1.0
+
+ /**
+ * Set the step size per-iteration of SGD. Default 1.0.
+ */
+ def setStepSize(step: Double): this.type = {
+ this.stepSize = step
+ this
+ }
+
+ /**
+ * Set fraction of data to be used for each SGD iteration. Default 1.0.
+ */
+ def setMiniBatchFraction(fraction: Double): this.type = {
+ this.miniBatchFraction = fraction
+ this
+ }
+
+ /**
+ * Set the number of iterations for SGD. Default 100.
+ */
+ def setNumIterations(iters: Int): this.type = {
+ this.numIterations = iters
+ this
+ }
+
+ /**
+ * Set the regularization parameter used for SGD. Default 0.0.
+ */
+ def setRegParam(regParam: Double): this.type = {
+ this.regParam = regParam
+ this
+ }
+
+ /**
+ * Set the gradient function to be used for SGD.
+ */
+ def setGradient(gradient: Gradient): this.type = {
+ this.gradient = gradient
+ this
+ }
+
+
+ /**
+ * Set the updater function to be used for SGD.
+ */
+ def setUpdater(updater: Updater): this.type = {
+ this.updater = updater
+ this
+ }
+
+ def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
+ : Array[Double] = {
-object GradientDescent {
+ val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
+ data,
+ gradient,
+ updater,
+ stepSize,
+ numIterations,
+ regParam,
+ miniBatchFraction,
+ initialWeights)
+ weights
+ }
+
+}
+// Top-level method to run gradient descent.
+object GradientDescent extends Logging {
/**
* Run gradient descent in parallel using mini batches.
- * Based on Matlab code written by John Duchi.
*
* @param data - Input data for SGD. RDD of form (label, [feature values]).
* @param gradient - Gradient object that will be used to compute the gradient.
* @param updater - Updater object that will be used to update the model.
* @param stepSize - stepSize to be used during update.
- * @param numIters - number of iterations that SGD should be run.
+ * @param numIterations - number of iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
@@ -49,12 +125,12 @@ object GradientDescent {
gradient: Gradient,
updater: Updater,
stepSize: Double,
- numIters: Int,
+ numIterations: Int,
regParam: Double,
- initialWeights: Array[Double],
- miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = {
+ miniBatchFraction: Double,
+ initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
- val stochasticLossHistory = new ArrayBuffer[Double](numIters)
+ val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction
@@ -63,11 +139,11 @@ object GradientDescent {
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var regVal = 0.0
- for (i <- 1 to numIters) {
+ for (i <- 1 to numIterations) {
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
case (y, features) =>
- val featuresRow = new DoubleMatrix(features.length, 1, features:_*)
- val (grad, loss) = gradient.compute(featuresRow, y, weights)
+ val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
+ val (grad, loss) = gradient.compute(featuresCol, y, weights)
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
@@ -76,11 +152,15 @@ object GradientDescent {
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
- val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
+ val update = updater.compute(
+ weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2
}
+ logInfo("GradientDescent finished. Last 10 stochastic losses %s".format(
+ stochasticLossHistory.takeRight(10).mkString(", ")))
+
(weights.toArray, stochasticLossHistory.toArray)
}
}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala
new file mode 100644
index 0000000000..76a519c338
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/optimization/Optimizer.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.optimization
+
+import spark.RDD
+
+trait Optimizer {
+
+ /**
+ * Solve the provided convex optimization problem.
+ */
+ def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double]
+
+}
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
index 3ebc1409b6..db67d6b0bc 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
@@ -20,10 +20,14 @@ package spark.mllib.optimization
import scala.math._
import org.jblas.DoubleMatrix
+/**
+ * Class used to update weights used in Gradient Descent.
+ */
abstract class Updater extends Serializable {
/**
- * Compute an updated value for weights given the gradient, stepSize and iteration number.
- * Also returns the regularization value computed using the *updated* weights.
+ * Compute an updated value for weights given the gradient, stepSize, iteration number and
+ * regularization parameter. Also returns the regularization value computed using the
+ * *updated* weights.
*
* @param weightsOld - Column matrix of size nx1 where n is the number of features.
* @param gradient - Column matrix of size nx1 where n is the number of features.
@@ -38,6 +42,10 @@ abstract class Updater extends Serializable {
regParam: Double): (DoubleMatrix, Double)
}
+/**
+ * A simple updater that adaptively adjusts the learning rate the
+ * square root of the number of iterations. Does not perform any regularization.
+ */
class SimpleUpdater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
@@ -48,11 +56,15 @@ class SimpleUpdater extends Updater {
}
/**
- * L1 regularization -- corresponding proximal operator is the soft-thresholding function
- * That is, each weight component is shrunk towards 0 by shrinkageVal
+ * Updater that adjusts learning rate and performs L1 regularization.
+ *
+ * The corresponding proximal operator used is the soft-thresholding function.
+ * That is, each weight component is shrunk towards 0 by shrinkageVal.
+ *
* If w > shrinkageVal, set weight component to w-shrinkageVal.
* If w < -shrinkageVal, set weight component to w+shrinkageVal.
* If -shrinkageVal < w < shrinkageVal, set weight component to 0.
+ *
* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
*/
class L1Updater extends Updater {
@@ -72,6 +84,9 @@ class L1Updater extends Updater {
}
}
+/**
+ * Updater that adjusts the learning rate and performs L2 regularization
+ */
class SquaredL2Updater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 38637b3dd1..5e21717da5 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -22,6 +22,15 @@ import spark.SparkContext._
import org.jblas._
+/**
+ * Model representing the result of matrix factorization.
+ *
+ * @param rank Rank for the features in this model.
+ * @param userFeatures RDD of tuples where each tuple represents the userId and
+ * the features computed for this user.
+ * @param productFeatures RDD of tuples where each tuple represents the productId
+ * and the features computed for this product.
+ */
class MatrixFactorizationModel(
val rank: Int,
val userFeatures: RDD[(Int, Array[Double])],
diff --git a/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
new file mode 100644
index 0000000000..4ecafff08b
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+import spark.{Logging, RDD}
+import spark.mllib.optimization._
+
+import org.jblas.DoubleMatrix
+
+/**
+ * GeneralizedLinearModel (GLM) represents a model trained using
+ * GeneralizedLinearAlgorithm. GLMs consist of a weight vector and
+ * an intercept.
+ *
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
+ */
+abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double)
+ extends Serializable {
+
+ // Create a column vector that can be used for predictions
+ private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
+
+ /**
+ * Predict the result given a data point and the weights learned.
+ *
+ * @param dataMatrix Row vector containing the features for this data point
+ * @param weightMatrix Column vector containing the weights of the model
+ * @param intercept Intercept of the model.
+ */
+ def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double): Double
+
+ /**
+ * Predict values for the given data set using the model trained.
+ *
+ * @param testData RDD representing data points to be predicted
+ * @return RDD[Double] where each entry contains the corresponding prediction
+ */
+ def predict(testData: spark.RDD[Array[Double]]): RDD[Double] = {
+ // A small optimization to avoid serializing the entire model. Only the weightsMatrix
+ // and intercept is needed.
+ val localWeights = weightsMatrix
+ val localIntercept = intercept
+
+ testData.map { x =>
+ val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
+ predictPoint(dataMatrix, localWeights, localIntercept)
+ }
+ }
+
+ /**
+ * Predict values for a single data point using the model trained.
+ *
+ * @param testData array representing a single data point
+ * @return Double prediction from the trained model
+ */
+ def predict(testData: Array[Double]): Double = {
+ val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
+ predictPoint(dataMat, weightsMatrix, intercept)
+ }
+}
+
+/**
+ * GeneralizedLinearAlgorithm implements methods to train a Genearalized Linear Model (GLM).
+ * This class should be extended with an Optimizer to create a new GLM.
+ */
+abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
+ extends Logging with Serializable {
+
+ val optimizer: Optimizer
+
+ /**
+ * Create a model given the weights and intercept
+ */
+ protected def createModel(weights: Array[Double], intercept: Double): M
+
+ protected var addIntercept: Boolean
+
+ /**
+ * Set if the algorithm should add an intercept. Default true.
+ */
+ def setIntercept(addIntercept: Boolean): this.type = {
+ this.addIntercept = addIntercept
+ this
+ }
+
+ /**
+ * Run the algorithm with the configured parameters on an input
+ * RDD of LabeledPoint entries.
+ */
+ def run(input: RDD[LabeledPoint]) : M = {
+ val nfeatures: Int = input.first().features.length
+ val initialWeights = Array.fill(nfeatures)(1.0)
+ run(input, initialWeights)
+ }
+
+ /**
+ * Run the algorithm with the configured parameters on an input RDD
+ * of LabeledPoint entries starting from the initial weights provided.
+ */
+ def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {
+
+ // Add a extra variable consisting of all 1.0's for the intercept.
+ val data = if (addIntercept) {
+ input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*)))
+ } else {
+ input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
+ }
+
+ val initialWeightsWithIntercept = if (addIntercept) {
+ Array(1.0, initialWeights:_*)
+ } else {
+ initialWeights
+ }
+
+ val weights = optimizer.optimize(data, initialWeightsWithIntercept)
+ val intercept = weights(0)
+ val weightsScaled = weights.tail
+
+ val model = createModel(weightsScaled, intercept)
+
+ logInfo("Final model weights " + model.weights.mkString(","))
+ logInfo("Final model intercept " + model.intercept)
+ model
+ }
+}
diff --git a/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala
new file mode 100644
index 0000000000..3de60482c5
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/regression/LabeledPoint.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression
+
+/**
+ * Class that represents the features and labels of a data point.
+ *
+ * @param label Label for this data point.
+ * @param features List of features for this data point.
+ */
+case class LabeledPoint(val label: Double, val features: Array[Double])
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
index e8b1ed8a48..6bbc990a5a 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -24,121 +24,56 @@ import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
/**
- * Lasso using Stochastic Gradient Descent.
+ * Regression model trained using Lasso.
*
+ * @param weights Weights computed for every feature.
+ * @param intercept Intercept computed for this model.
*/
class LassoModel(
- val weights: Array[Double],
- val intercept: Double,
- val stochasticLosses: Array[Double]) extends RegressionModel {
-
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
- override def predict(testData: spark.RDD[Array[Double]]) = {
- // A small optimization to avoid serializing the entire model. Only the weightsMatrix
- // and intercept is needed.
- val localWeights = weightsMatrix
- val localIntercept = intercept
- testData.map { x =>
- new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept
- }
- }
-
-
- override def predict(testData: Array[Double]): Double = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- dataMat.dot(weightsMatrix) + this.intercept
+ override val weights: Array[Double],
+ override val intercept: Double)
+ extends GeneralizedLinearModel(weights, intercept)
+ with RegressionModel with Serializable {
+
+ override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ intercept: Double) = {
+ dataMatrix.dot(weightMatrix) + intercept
}
}
-
-class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double,
- var miniBatchFraction: Double, var numIters: Int)
- extends Logging {
+/**
+ * Train a regression model with L1-regularization using Stochastic Gradient Descent.
+ */
+class LassoWithSGD private (
+ var stepSize: Double,
+ var numIterations: Int,
+ var regParam: Double,
+ var miniBatchFraction: Double,
+ var addIntercept: Boolean)
+ extends GeneralizedLinearAlgorithm[LassoModel]
+ with Serializable {
+
+ val gradient = new SquaredGradient()
+ val updater = new L1Updater()
+ val optimizer = new GradientDescent(gradient, updater).setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
/**
* Construct a Lasso object with default parameters
*/
- def this() = this(1.0, 1.0, 1.0, 100)
-
- /**
- * Set the step size per-iteration of SGD. Default 1.0.
- */
- def setStepSize(step: Double) = {
- this.stepSize = step
- this
- }
+ def this() = this(1.0, 100, 1.0, 1.0, true)
- /**
- * Set the regularization parameter. Default 1.0.
- */
- def setRegParam(param: Double) = {
- this.regParam = param
- this
- }
-
- /**
- * Set fraction of data to be used for each SGD iteration. Default 1.0.
- */
- def setMiniBatchFraction(fraction: Double) = {
- this.miniBatchFraction = fraction
- this
- }
-
- /**
- * Set the number of iterations for SGD. Default 100.
- */
- def setNumIterations(iters: Int) = {
- this.numIters = iters
- this
- }
-
- def train(input: RDD[(Double, Array[Double])]): LassoModel = {
- val nfeatures: Int = input.take(1)(0)._2.length
- val initialWeights = Array.fill(nfeatures)(1.0)
- train(input, initialWeights)
- }
-
- def train(
- input: RDD[(Double, Array[Double])],
- initialWeights: Array[Double]): LassoModel = {
-
- // Add a extra variable consisting of all 1.0's for the intercept.
- val data = input.map { case (y, features) =>
- (y, Array(1.0, features:_*))
- }
-
- val initalWeightsWithIntercept = Array(1.0, initialWeights:_*)
-
- val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD(
- data,
- new SquaredGradient(),
- new L1Updater(),
- stepSize,
- numIters,
- regParam,
- initalWeightsWithIntercept,
- miniBatchFraction)
-
- val intercept = weights(0)
- val weightsScaled = weights.tail
-
- val model = new LassoModel(weightsScaled, intercept, stochasticLosses)
-
- logInfo("Final model weights " + model.weights.mkString(","))
- logInfo("Final model intercept " + model.intercept)
- logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", "))
- model
+ def createModel(weights: Array[Double], intercept: Double) = {
+ new LassoModel(weights, intercept)
}
}
/**
* Top-level methods for calling Lasso.
- *
- *
*/
-object LassoLocalRandomSGD {
+object LassoWithSGD {
/**
* Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
@@ -155,7 +90,7 @@ object LassoLocalRandomSGD {
* the number of features in the data.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
@@ -163,8 +98,8 @@ object LassoLocalRandomSGD {
initialWeights: Array[Double])
: LassoModel =
{
- new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
- input, initialWeights)
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input,
+ initialWeights)
}
/**
@@ -179,14 +114,14 @@ object LassoLocalRandomSGD {
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
miniBatchFraction: Double)
: LassoModel =
{
- new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input)
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction, true).run(input)
}
/**
@@ -201,7 +136,7 @@ object LassoLocalRandomSGD {
* @return a LassoModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double)
@@ -220,7 +155,7 @@ object LassoLocalRandomSGD {
* @return a LassoModel which has the weights and offset from training.
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
numIterations: Int)
: LassoModel =
{
@@ -234,7 +169,7 @@ object LassoLocalRandomSGD {
}
val sc = new SparkContext(args(0), "Lasso")
val data = MLUtils.loadLabeledData(sc, args(1))
- val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
index 6ba141e8fb..b42d94af41 100644
--- a/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/RidgeRegression.scala
@@ -71,7 +71,8 @@ class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double)
this
}
- def train(input: RDD[(Double, Array[Double])]): RidgeRegressionModel = {
+ def train(inputLabeled: RDD[LabeledPoint]): RidgeRegressionModel = {
+ val input = inputLabeled.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
val nfeatures: Int = input.take(1)(0)._2.length
val nexamples: Long = input.count()
@@ -167,10 +168,10 @@ class RidgeRegression private (var lambdaLow: Double, var lambdaHigh: Double)
/**
* Top-level methods for calling Ridge Regression.
- * NOTE(shivaram): We use multiple train methods instead of default arguments to support
- * Java programs.
*/
object RidgeRegression {
+ // NOTE(shivaram): We use multiple train methods instead of default arguments to support
+ // Java programs.
/**
* Train a ridge regression model given an RDD of (response, features) pairs.
@@ -183,7 +184,7 @@ object RidgeRegression {
* @param lambdaHigh upper bound used in binary search for lambda
*/
def train(
- input: RDD[(Double, Array[Double])],
+ input: RDD[LabeledPoint],
lambdaLow: Double,
lambdaHigh: Double)
: RidgeRegressionModel =
@@ -199,7 +200,7 @@ object RidgeRegression {
*
* @param input RDD of (response, array of features) pairs.
*/
- def train(input: RDD[(Double, Array[Double])]) : RidgeRegressionModel = {
+ def train(input: RDD[LabeledPoint]) : RidgeRegressionModel = {
train(input, 0.0, 100.0)
}
diff --git a/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala
index c89e5dd738..672b63f65a 100644
--- a/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala
@@ -21,12 +21,16 @@ import scala.util.Random
import spark.{RDD, SparkContext}
+/**
+ * Generate test data for KMeans. This class first chooses k cluster centers
+ * from a d-dimensional Gaussian distribution scaled by factor r and then creates a Gaussian
+ * cluster with scale 1 around each center.
+ */
+
object KMeansDataGenerator {
/**
- * Generate an RDD containing test data for KMeans. This function chooses k cluster centers
- * from a d-dimensional Gaussian distribution scaled by factor r, then creates a Gaussian
- * cluster with scale 1 around each center.
+ * Generate an RDD containing test data for KMeans.
*
* @param sc SparkContext to use for creating the RDD
* @param numPoints Number of points that will be contained in the RDD
diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
index ef4f42a494..eeb14fc4e3 100644
--- a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala
@@ -1,18 +1,22 @@
-package spark.mllib.regression
+package spark.mllib.util
import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
-import spark.mllib.util.MLUtils
+import spark.mllib.regression.LabeledPoint
-object LassoGenerator {
+/**
+ * Generate sample data used for Lasso Regression. This class generates uniform random values
+ * for the features and adds Gaussian noise with weight 0.1 to generate response variables.
+ */
+object LassoDataGenerator {
def main(args: Array[String]) {
- if (args.length != 5) {
+ if (args.length < 2) {
println("Usage: LassoGenerator " +
- "<master> <output_dir> <num_examples> <num_features> <num_partitions>")
+ "<master> <output_dir> [num_examples] [num_features] [num_partitions]")
System.exit(1)
}
@@ -21,7 +25,6 @@ object LassoGenerator {
val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
val parts: Int = if (args.length > 4) args(4).toInt else 2
- val eps = 3
val sc = new SparkContext(sparkMaster, "LassoGenerator")
@@ -29,14 +32,14 @@ object LassoGenerator {
val trueWeights = new DoubleMatrix(1, nfeatures+1,
Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
- val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
+ val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1
- (y, x)
+ LabeledPoint(y, x)
}
MLUtils.saveLabeledData(data, outputPath)
diff --git a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala
index 8d659cd97c..d6402f23e2 100644
--- a/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -20,12 +20,17 @@ package spark.mllib.util
import scala.util.Random
import spark.{RDD, SparkContext}
+import spark.mllib.regression.LabeledPoint
+
+/**
+ * Generate test data for LogisticRegression. This class chooses positive labels
+ * with probability `probOne` and scales features for positive examples by `eps`.
+ */
object LogisticRegressionDataGenerator {
/**
- * Generate an RDD containing test data for LogisticRegression. This function chooses
- * positive labels with probability `probOne` and scales positive examples by `eps`.
+ * Generate an RDD containing test data for LogisticRegression.
*
* @param sc SparkContext to use for creating the RDD.
* @param nexamples Number of examples that will be contained in the RDD.
@@ -40,7 +45,7 @@ object LogisticRegressionDataGenerator {
nfeatures: Int,
eps: Double,
nparts: Int = 2,
- probOne: Double = 0.5): RDD[(Double, Array[Double])] = {
+ probOne: Double = 0.5): RDD[LabeledPoint] = {
val data = sc.parallelize(0 until nexamples, nparts).map { idx =>
val rnd = new Random(42 + idx)
@@ -48,7 +53,7 @@ object LogisticRegressionDataGenerator {
val x = Array.fill[Double](nfeatures) {
rnd.nextGaussian() + (y * eps)
}
- (y, x)
+ LabeledPoint(y, x)
}
data
}
diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
index 25d9673004..4e030a81b4 100644
--- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
@@ -21,32 +21,42 @@ import spark.{RDD, SparkContext}
import spark.SparkContext._
import org.jblas.DoubleMatrix
+import spark.mllib.regression.LabeledPoint
/**
- * Helper methods to load and save data
- * Data format:
- * <l>, <f1> <f2> ...
- * where <f1>, <f2> are feature values in Double and <l> is the corresponding label as Double.
+ * Helper methods to load, save and pre-process data used in ML Lib.
*/
object MLUtils {
/**
+ * Load labeled data from a file. The data format used here is
+ * <L>, <f1> <f2> ...
+ * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
+ *
* @param sc SparkContext
* @param dir Directory to the input data files.
- * @return An RDD of tuples. For each tuple, the first element is the label, and the second
- * element represents the feature values (an array of Double).
+ * @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is
+ * the label, and the second element represents the feature values (an array of Double).
*/
- def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = {
+ def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
sc.textFile(dir).map { line =>
val parts = line.split(',')
val label = parts(0).toDouble
val features = parts(1).trim().split(' ').map(_.toDouble)
- (label, features)
+ LabeledPoint(label, features)
}
}
- def saveLabeledData(data: RDD[(Double, Array[Double])], dir: String) {
- val dataStr = data.map(x => x._1 + "," + x._2.mkString(" "))
+ /**
+ * Save labeled data to a file. The data format used here is
+ * <L>, <f1> <f2> ...
+ * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
+ *
+ * @param data An RDD of LabeledPoints containing data to be saved.
+ * @param dir Directory to save the data.
+ */
+ def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
+ val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
dataStr.saveAsTextFile(dir)
}
diff --git a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
index c5b8a29942..4d329168be 100644
--- a/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/RidgeRegressionDataGenerator.scala
@@ -22,33 +22,40 @@ import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
+import spark.mllib.regression.LabeledPoint
+/**
+ * Generate sample data used for RidgeRegression. This class generates
+ * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the
+ * response variable `Y`.
+ *
+ */
object RidgeRegressionDataGenerator {
/**
- * Generate an RDD containing test data used for RidgeRegression. This function generates
- * uniformly random values for every feature and adds Gaussian noise with mean `eps` to the
- * response variable `Y`.
+ * Generate an RDD containing sample data for RidgeRegression.
*
* @param sc SparkContext to be used for generating the RDD.
* @param nexamples Number of examples that will be contained in the RDD.
* @param nfeatures Number of features to generate for each example.
* @param eps Epsilon factor by which examples are scaled.
* @param nparts Number of partitions in the RDD. Default value is 2.
+ *
+ * @return RDD of LabeledPoint containing sample data.
*/
def generateRidgeRDD(
sc: SparkContext,
nexamples: Int,
nfeatures: Int,
eps: Double,
- nparts: Int = 2) : RDD[(Double, Array[Double])] = {
+ nparts: Int = 2) : RDD[LabeledPoint] = {
org.jblas.util.Random.seed(42)
// Random values distributed uniformly in [-0.5, 0.5]
val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
w.put(0, 0, 10)
w.put(1, 0, 10)
- val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
+ val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
org.jblas.util.Random.seed(42 + p)
val examplesInPartition = nexamples / nparts
@@ -61,16 +68,16 @@ object RidgeRegressionDataGenerator {
val yObs = new DoubleMatrix(normalValues).addi(y)
Iterator.tabulate(examplesInPartition) { i =>
- (yObs.get(i, 0), X.getRow(i).toArray)
+ LabeledPoint(yObs.get(i, 0), X.getRow(i).toArray)
}
}
data
}
def main(args: Array[String]) {
- if (args.length != 5) {
+ if (args.length < 2) {
println("Usage: RidgeRegressionGenerator " +
- "<master> <output_dir> <num_examples> <num_features> <num_partitions>")
+ "<master> <output_dir> [num_examples] [num_features] [num_partitions]")
System.exit(1)
}
diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
index 00a54d9a70..e02bd190f6 100644
--- a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala
@@ -1,21 +1,23 @@
-package spark.mllib.classification
+package spark.mllib.util
import scala.util.Random
import scala.math.signum
-import org.jblas.DoubleMatrix
-
import spark.{RDD, SparkContext}
-import spark.mllib.util.MLUtils
import org.jblas.DoubleMatrix
+import spark.mllib.regression.LabeledPoint
-object SVMGenerator {
+/**
+ * Generate sample data used for SVM. This class generates uniform random values
+ * for the features and adds Gaussian noise with weight 0.1 to generate labels.
+ */
+object SVMDataGenerator {
def main(args: Array[String]) {
- if (args.length != 5) {
+ if (args.length < 2) {
println("Usage: SVMGenerator " +
- "<master> <output_dir> <num_examples> <num_features> <num_partitions>")
+ "<master> <output_dir> [num_examples] [num_features] [num_partitions]")
System.exit(1)
}
@@ -24,7 +26,6 @@ object SVMGenerator {
val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
val parts: Int = if (args.length > 4) args(4).toInt else 2
- val eps = 3
val sc = new SparkContext(sparkMaster, "SVMGenerator")
@@ -32,14 +33,14 @@ object SVMGenerator {
val trueWeights = new DoubleMatrix(1, nfeatures+1,
Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*)
- val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx =>
+ val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1)
- (y, x)
+ LabeledPoint(y, x)
}
MLUtils.saveLabeledData(data, outputPath)
diff --git a/mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java
new file mode 100644
index 0000000000..e0ebd45cd8
--- /dev/null
+++ b/mllib/src/test/java/spark/mllib/classification/JavaLogisticRegressionSuite.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.classification;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+
+import spark.mllib.regression.LabeledPoint;
+
+public class JavaLogisticRegressionSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaLogisticRegressionSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ int validatePrediction(List<LabeledPoint> validationData, LogisticRegressionModel model) {
+ int numAccurate = 0;
+ for (LabeledPoint point: validationData) {
+ Double prediction = model.predict(point.features());
+ if (prediction == point.label()) {
+ numAccurate++;
+ }
+ }
+ return numAccurate;
+ }
+
+ @Test
+ public void runLRUsingConstructor() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double B = -1.5;
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(
+ LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17);
+
+ LogisticRegressionWithSGD lrImpl = new LogisticRegressionWithSGD();
+ lrImpl.optimizer().setStepSize(1.0)
+ .setRegParam(1.0)
+ .setNumIterations(100);
+ LogisticRegressionModel model = lrImpl.run(testRDD.rdd());
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+ @Test
+ public void runLRUsingStaticMethods() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double B = -1.5;
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(
+ LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ LogisticRegressionSuite.generateLogisticInputAsList(A, B, nPoints, 17);
+
+ LogisticRegressionModel model = LogisticRegressionWithSGD.train(
+ testRDD.rdd(), 100, 1.0, 1.0);
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+}
diff --git a/mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java
new file mode 100644
index 0000000000..7881b3c38f
--- /dev/null
+++ b/mllib/src/test/java/spark/mllib/classification/JavaSVMSuite.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.classification;
+
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+
+import spark.mllib.regression.LabeledPoint;
+
+public class JavaSVMSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaSVMSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ int validatePrediction(List<LabeledPoint> validationData, SVMModel model) {
+ int numAccurate = 0;
+ for (LabeledPoint point: validationData) {
+ Double prediction = model.predict(point.features());
+ if (prediction == point.label()) {
+ numAccurate++;
+ }
+ }
+ return numAccurate;
+ }
+
+ @Test
+ public void runSVMUsingConstructor() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17);
+
+ SVMWithSGD svmSGDImpl = new SVMWithSGD();
+ svmSGDImpl.optimizer().setStepSize(1.0)
+ .setRegParam(1.0)
+ .setNumIterations(100);
+ SVMModel model = svmSGDImpl.run(testRDD.rdd());
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+ @Test
+ public void runSVMUsingStaticMethods() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(SVMSuite.generateSVMInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ SVMSuite.generateSVMInputAsList(A, weights, nPoints, 17);
+
+ SVMModel model = SVMWithSGD.train(testRDD.rdd(), 100, 1.0, 1.0, 1.0);
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+}
diff --git a/mllib/src/test/scala/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/spark/mllib/clustering/JavaKMeansSuite.java
index 3f2d82bfb4..3f2d82bfb4 100644
--- a/mllib/src/test/scala/spark/mllib/clustering/JavaKMeansSuite.java
+++ b/mllib/src/test/java/spark/mllib/clustering/JavaKMeansSuite.java
diff --git a/mllib/src/test/scala/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/spark/mllib/recommendation/JavaALSSuite.java
index 7993629a6d..7993629a6d 100644
--- a/mllib/src/test/scala/spark/mllib/recommendation/JavaALSSuite.java
+++ b/mllib/src/test/java/spark/mllib/recommendation/JavaALSSuite.java
diff --git a/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java
new file mode 100644
index 0000000000..e26d7b385c
--- /dev/null
+++ b/mllib/src/test/java/spark/mllib/regression/JavaLassoSuite.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.regression;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+
+public class JavaLassoSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaLassoSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ int validatePrediction(List<LabeledPoint> validationData, LassoModel model) {
+ int numAccurate = 0;
+ for (LabeledPoint point: validationData) {
+ Double prediction = model.predict(point.features());
+ // A prediction is off if the prediction is more than 0.5 away from expected value.
+ if (Math.abs(prediction - point.label()) <= 0.5) {
+ numAccurate++;
+ }
+ }
+ return numAccurate;
+ }
+
+ @Test
+ public void runLassoUsingConstructor() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0e-2};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17);
+
+ LassoWithSGD svmSGDImpl = new LassoWithSGD();
+ svmSGDImpl.optimizer().setStepSize(1.0)
+ .setRegParam(0.01)
+ .setNumIterations(20);
+ LassoModel model = svmSGDImpl.run(testRDD.rdd());
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+ @Test
+ public void runLassoUsingStaticMethods() {
+ int nPoints = 10000;
+ double A = 2.0;
+ double[] weights = {-1.5, 1.0e-2};
+
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(LassoSuite.generateLassoInputAsList(A,
+ weights, nPoints, 42), 2).cache();
+ List<LabeledPoint> validationData =
+ LassoSuite.generateLassoInputAsList(A, weights, nPoints, 17);
+
+ LassoModel model = LassoWithSGD.train(testRDD.rdd(), 100, 1.0, 0.01, 1.0);
+
+ int numAccurate = validatePrediction(validationData, model);
+ Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
+ }
+
+}
diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
index 8664263935..16bd2c6b38 100644
--- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -18,20 +18,23 @@
package spark.mllib.classification
import scala.util.Random
+import scala.collection.JavaConversions._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import spark.SparkContext
+import spark.mllib.regression._
+object LogisticRegressionSuite {
-class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
- val sc = new SparkContext("local", "test")
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
+ def generateLogisticInputAsList(
+ offset: Double,
+ scale: Double,
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateLogisticInput(offset, scale, nPoints, seed))
}
// Generate input of the form Y = logistic(offset + scale*X)
@@ -39,7 +42,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
offset: Double,
scale: Double,
nPoints: Int,
- seed: Int): Seq[(Int, Array[Double])] = {
+ seed: Int): Seq[LabeledPoint] = {
val rnd = new Random(seed)
val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
@@ -57,13 +60,23 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
if (yVal > 0) 1 else 0
}
- val testData = (0 until nPoints).map(i => (y(i), Array(x1(i))))
+ val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
testData
}
- def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) {
- val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
- (prediction != expected)
+}
+
+class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
+ (prediction != expected.label)
}.size
// At least 83% of the predictions should be on.
((input.length - numOffPredictions).toDouble / input.length) should be > 0.83
@@ -75,26 +88,27 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
val A = 2.0
val B = -1.5
- val testData = generateLogisticInput(A, B, nPoints, 42)
+ val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(20)
+ val lr = new LogisticRegressionWithSGD()
+ lr.optimizer.setStepSize(10.0).setNumIterations(20)
- val model = lr.train(testRDD)
+ val model = lr.run(testRDD)
// Test the weights
val weight0 = model.weights(0)
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
- val validationData = generateLogisticInput(A, B, nPoints, 17)
+ val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17)
val validationRDD = sc.parallelize(validationData, 2)
// Test prediction on RDD.
- validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
// Test prediction on Array.
- validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
test("logistic regression with initial weights") {
@@ -102,7 +116,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
val A = 2.0
val B = -1.5
- val testData = generateLogisticInput(A, B, nPoints, 42)
+ val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42)
val initialB = -1.0
val initialWeights = Array(initialB)
@@ -111,20 +125,21 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with Shoul
testRDD.cache()
// Use half as many iterations as the previous test.
- val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(10)
+ val lr = new LogisticRegressionWithSGD()
+ lr.optimizer.setStepSize(10.0).setNumIterations(10)
- val model = lr.train(testRDD, initialWeights)
+ val model = lr.run(testRDD, initialWeights)
val weight0 = model.weights(0)
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
- val validationData = generateLogisticInput(A, B, nPoints, 17)
+ val validationData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 17)
val validationRDD = sc.parallelize(validationData, 2)
// Test prediction on RDD.
- validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
// Test prediction on Array.
- validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
}
diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
index d546e0729e..9e0970812d 100644
--- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
@@ -19,20 +19,24 @@ package spark.mllib.classification
import scala.util.Random
import scala.math.signum
+import scala.collection.JavaConversions._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import spark.SparkContext
+import spark.mllib.regression._
import org.jblas.DoubleMatrix
-class SVMSuite extends FunSuite with BeforeAndAfterAll {
- val sc = new SparkContext("local", "test")
+object SVMSuite {
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
+ def generateSVMInputAsList(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed))
}
// Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise)
@@ -40,58 +44,75 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
intercept: Double,
weights: Array[Double],
nPoints: Int,
- seed: Int): Seq[(Int, Array[Double])] = {
+ seed: Int): Seq[LabeledPoint] = {
val rnd = new Random(seed)
val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
- val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
- val y = x.map(xi =>
- signum((new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()).toInt
- )
- y zip x
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val y = x.map { xi =>
+ signum(
+ (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) +
+ intercept +
+ 0.1 * rnd.nextGaussian()
+ ).toInt
+ }
+ y.zip(x).map(p => LabeledPoint(p._1, p._2))
}
- def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) {
- val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
- (prediction != expected)
+}
+
+class SVMSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
+ (prediction != expected.label)
}.size
// At least 80% of the predictions should be on.
assert(numOffPredictions < input.length / 5)
}
- test("SVMLocalRandomSGD") {
+
+ test("SVM using local random SGD") {
val nPoints = 10000
val A = 2.0
val B = -1.5
val C = 1.0
- val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+ val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+ val svm = new SVMWithSGD()
+ svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
- val model = svm.train(testRDD)
+ val model = svm.run(testRDD)
- val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17)
+ val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17)
val validationRDD = sc.parallelize(validationData,2)
// Test prediction on RDD.
- validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
// Test prediction on Array.
- validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
- test("SVMLocalRandomSGD with initial weights") {
+ test("SVM local random SGD with initial weights") {
val nPoints = 10000
val A = 2.0
val B = -1.5
val C = 1.0
- val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42)
+ val testData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 42)
val initialB = -1.0
val initialC = -1.0
@@ -100,17 +121,18 @@ class SVMSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+ val svm = new SVMWithSGD()
+ svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
- val model = svm.train(testRDD, initialWeights)
+ val model = svm.run(testRDD, initialWeights)
- val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17)
+ val validationData = SVMSuite.generateSVMInput(A, Array[Double](B,C), nPoints, 17)
val validationRDD = sc.parallelize(validationData,2)
// Test prediction on RDD.
- validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
// Test prediction on Array.
- validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
}
diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
index cf2b067d40..b9ada2b1ec 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
@@ -17,6 +17,7 @@
package spark.mllib.regression
+import scala.collection.JavaConversions._
import scala.util.Random
import org.scalatest.BeforeAndAfterAll
@@ -26,53 +27,68 @@ import spark.SparkContext
import org.jblas.DoubleMatrix
+object LassoSuite {
-class LassoSuite extends FunSuite with BeforeAndAfterAll {
- val sc = new SparkContext("local", "test")
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
+ def generateLassoInputAsList(
+ intercept: Double,
+ weights: Array[Double],
+ nPoints: Int,
+ seed: Int): java.util.List[LabeledPoint] = {
+ seqAsJavaList(generateLassoInput(intercept, weights, nPoints, seed))
}
+
// Generate noisy input of the form Y = x.dot(weights) + intercept + noise
def generateLassoInput(
intercept: Double,
weights: Array[Double],
nPoints: Int,
- seed: Int): Seq[(Double, Array[Double])] = {
+ seed: Int): Seq[LabeledPoint] = {
val rnd = new Random(seed)
val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
- val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian()))
+ val x = Array.fill[Array[Double]](nPoints)(
+ Array.fill[Double](weights.length)(rnd.nextGaussian()))
val y = x.map(xi =>
(new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()
- )
- y zip x
+ )
+ y.zip(x).map(p => LabeledPoint(p._1, p._2))
}
- def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) {
- val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
+}
+
+class LassoSuite extends FunSuite with BeforeAndAfterAll {
+ val sc = new SparkContext("local", "test")
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
+ val numOffPredictions = predictions.zip(input).filter { case (prediction, expected) =>
// A prediction is off if the prediction is more than 0.5 away from expected value.
- math.abs(prediction - expected) > 0.5
+ math.abs(prediction - expected.label) > 0.5
}.size
// At least 80% of the predictions should be on.
assert(numOffPredictions < input.length / 5)
}
- test("LassoLocalRandomSGD") {
+ test("Lasso local random SGD") {
val nPoints = 10000
val A = 2.0
val B = -1.5
val C = 1.0e-2
- val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42)
+ val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
- val model = ls.train(testRDD)
+ val ls = new LassoWithSGD()
+ ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val model = ls.run(testRDD)
val weight0 = model.weights(0)
val weight1 = model.weights(1)
@@ -80,24 +96,24 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
- val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17)
- val validationRDD = sc.parallelize(validationData,2)
+ val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17)
+ val validationRDD = sc.parallelize(validationData, 2)
// Test prediction on RDD.
- validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
// Test prediction on Array.
- validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
- test("LassoLocalRandomSGD with initial weights") {
+ test("Lasso local random SGD with initial weights") {
val nPoints = 10000
val A = 2.0
val B = -1.5
val C = 1.0e-2
- val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42)
+ val testData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 42)
val initialB = -1.0
val initialC = -1.0
@@ -105,9 +121,11 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
- val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
- val model = ls.train(testRDD, initialWeights)
+ val ls = new LassoWithSGD()
+ ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+
+ val model = ls.run(testRDD, initialWeights)
val weight0 = model.weights(0)
val weight1 = model.weights(1)
@@ -115,13 +133,13 @@ class LassoSuite extends FunSuite with BeforeAndAfterAll {
assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
- val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17)
+ val validationData = LassoSuite.generateLassoInput(A, Array[Double](B,C), nPoints, 17)
val validationRDD = sc.parallelize(validationData,2)
// Test prediction on RDD.
- validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData)
+ validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData)
// Test prediction on Array.
- validatePrediction(validationData.map(row => model.predict(row._2)), validationData)
+ validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
}
diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
index 3c588c6162..4c4900658f 100644
--- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -47,7 +47,7 @@ class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
val xMat = (0 until 20).map(i => Array(x1(i), x2(i))).toArray
val y = xMat.map(i => 3 + i(0) + i(1))
- val testData = (0 until 20).map(i => (y(i), xMat(i))).toArray
+ val testData = (0 until 20).map(i => LabeledPoint(y(i), xMat(i))).toArray
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
diff --git a/pom.xml b/pom.xml
index 7d96185775..1d0cb6a2f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -585,7 +585,7 @@
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
<!-- <yarn.version>0.23.7</yarn.version> -->
- <yarn.version>2.0.2-alpha</yarn.version>
+ <yarn.version>2.0.5-alpha</yarn.version>
</properties>
<repositories>