aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-04-02 14:01:12 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-02 14:01:12 -0700
commit9c65fa76f9d413e311a80f29d35d3ff7722e9476 (patch)
treeb98bac526f3d1bb5954c187745a7e4112b0fbf05
parented730c95026d322f4b24d3d9fe92050ffa74cf4a (diff)
downloadspark-9c65fa76f9d413e311a80f29d35d3ff7722e9476.tar.gz
spark-9c65fa76f9d413e311a80f29d35d3ff7722e9476.tar.bz2
spark-9c65fa76f9d413e311a80f29d35d3ff7722e9476.zip
[SPARK-1212, Part II] Support sparse data in MLlib
In PR https://github.com/apache/spark/pull/117, we added dense/sparse vector data model and updated KMeans to support sparse input. This PR is to replace all other `Array[Double]` usage by `Vector` in generalized linear models (GLMs) and Naive Bayes. Major changes: 1. `LabeledPoint` becomes `LabeledPoint(Double, Vector)`. 2. Methods that accept `RDD[Array[Double]]` now accept `RDD[Vector]`. We cannot support both in an elegant way because of type erasure. 3. Mark 'createModel' and 'predictPoint' protected because they are not for end users. 4. Add libSVMFile to MLContext. 5. NaiveBayes can accept arbitrary labels (introducing a breaking change to Python's `NaiveBayesModel`). 6. Gradient computation no longer creates temp vectors. 7. Column normalization and centering are removed from Lasso and Ridge because the operation will densify the data. Simple feature transformation can be done before training. TODO: 1. ~~Use axpy when possible.~~ 2. ~~Optimize Naive Bayes.~~ Author: Xiangrui Meng <meng@databricks.com> Closes #245 from mengxr/vector and squashes the following commits: eb6e793 [Xiangrui Meng] move libSVMFile to MLUtils and rename to loadLibSVMData c26c4fc [Xiangrui Meng] update DecisionTree to use RDD[Vector] 11999c7 [Xiangrui Meng] Merge branch 'master' into vector f7da54b [Xiangrui Meng] add minSplits to libSVMFile da25e24 [Xiangrui Meng] revert the change to default addIntercept because it might change the behavior of existing code without warning 493f26f [Xiangrui Meng] Merge branch 'master' into vector 7c1bc01 [Xiangrui Meng] add a TODO to NB b9b7ef7 [Xiangrui Meng] change default value of addIntercept to false b01df54 [Xiangrui Meng] allow to change or clear threshold in LR and SVM 4addc50 [Xiangrui Meng] merge master 4ca5b1b [Xiangrui Meng] remove normalization from Lasso and update tests f04fe8a [Xiangrui Meng] remove normalization from RidgeRegression and update tests d088552 [Xiangrui Meng] use static constructor for MLContext 6f59eed [Xiangrui Meng] update libSVMFile to determine number of features automatically 3432e84 [Xiangrui Meng] update NaiveBayes to support sparse data 0f8759b [Xiangrui Meng] minor updates to NB b11659c [Xiangrui Meng] style update 78c4671 [Xiangrui Meng] add libSVMFile to MLContext f0fe616 [Xiangrui Meng] add a test for sparse linear regression 44733e1 [Xiangrui Meng] use in-place gradient computation e981396 [Xiangrui Meng] use axpy in Updater db808a1 [Xiangrui Meng] update JavaLR example befa592 [Xiangrui Meng] passed scala/java tests 75c83a4 [Xiangrui Meng] passed test compile 1859701 [Xiangrui Meng] passed compile 834ada2 [Xiangrui Meng] optimized MLUtils.computeStats update some ml algorithms to use Vector (cont.) 135ab72 [Xiangrui Meng] merge glm 0e57aa4 [Xiangrui Meng] update Lasso and RidgeRegression to parse the weights correctly from GLM mark createModel protected mark predictPoint protected d7f629f [Xiangrui Meng] fix a bug in GLM when intercept is not used 3f346ba [Xiangrui Meng] update some ml algorithms to use Vector
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java14
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala154
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala13
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala84
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala116
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala69
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala117
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala58
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala7
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala69
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala69
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala79
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala37
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala75
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala7
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala193
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala3
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java13
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java3
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java6
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java4
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java38
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala10
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala51
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala54
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala27
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala9
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala59
-rw-r--r--python/pyspark/mllib/classification.py12
40 files changed, 926 insertions, 591 deletions
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 667c72f379..cd8879ff88 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -17,6 +17,7 @@
package org.apache.spark.mllib.examples;
+import java.util.regex.Pattern;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -24,11 +25,9 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
+import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
/**
* Logistic regression based classification using ML Lib.
*/
@@ -47,14 +46,10 @@ public final class JavaLR {
for (int i = 0; i < tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
- return new LabeledPoint(y, x);
+ return new LabeledPoint(y, Vectors.dense(x));
}
}
- public static void printWeights(double[] a) {
- System.out.println(Arrays.toString(a));
- }
-
public static void main(String[] args) {
if (args.length != 4) {
System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
@@ -80,8 +75,7 @@ public final class JavaLR {
LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
iterations, stepSize);
- System.out.print("Final w: ");
- printWeights(model.weights());
+ System.out.print("Final w: " + model.weights());
System.exit(0);
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 3449c698da..2df5b0d02b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -110,16 +110,16 @@ class PythonMLLibAPI extends Serializable {
private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
- dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
- java.util.LinkedList[java.lang.Object] = {
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
- LabeledPoint(x(0), x.slice(1, x.length))
+ LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val initialWeights = deserializeDoubleVector(initialWeightsBA)
val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(serializeDoubleVector(model.weights))
+ ret.add(serializeDoubleVector(model.weights.toArray))
ret.add(model.intercept: java.lang.Double)
ret
}
@@ -127,75 +127,127 @@ class PythonMLLibAPI extends Serializable {
/**
* Java stub for Python mllib LinearRegressionWithSGD.train()
*/
- def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
- numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ def trainLinearRegressionModelWithSGD(
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- trainRegressionModel((data, initialWeights) =>
- LinearRegressionWithSGD.train(data, numIterations, stepSize,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
+ trainRegressionModel(
+ (data, initialWeights) =>
+ LinearRegressionWithSGD.train(
+ data,
+ numIterations,
+ stepSize,
+ miniBatchFraction,
+ Vectors.dense(initialWeights)),
+ dataBytesJRDD,
+ initialWeightsBA)
}
/**
* Java stub for Python mllib LassoWithSGD.train()
*/
- def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
- stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ def trainLassoModelWithSGD(
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- trainRegressionModel((data, initialWeights) =>
- LassoWithSGD.train(data, numIterations, stepSize, regParam,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
+ trainRegressionModel(
+ (data, initialWeights) =>
+ LassoWithSGD.train(
+ data,
+ numIterations,
+ stepSize,
+ regParam,
+ miniBatchFraction,
+ Vectors.dense(initialWeights)),
+ dataBytesJRDD,
+ initialWeightsBA)
}
/**
* Java stub for Python mllib RidgeRegressionWithSGD.train()
*/
- def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
- stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ def trainRidgeModelWithSGD(
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- trainRegressionModel((data, initialWeights) =>
- RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
+ trainRegressionModel(
+ (data, initialWeights) =>
+ RidgeRegressionWithSGD.train(
+ data,
+ numIterations,
+ stepSize,
+ regParam,
+ miniBatchFraction,
+ Vectors.dense(initialWeights)),
+ dataBytesJRDD,
+ initialWeightsBA)
}
/**
* Java stub for Python mllib SVMWithSGD.train()
*/
- def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
- stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ def trainSVMModelWithSGD(
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int,
+ stepSize: Double,
+ regParam: Double,
+ miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- trainRegressionModel((data, initialWeights) =>
- SVMWithSGD.train(data, numIterations, stepSize, regParam,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
+ trainRegressionModel(
+ (data, initialWeights) =>
+ SVMWithSGD.train(
+ data,
+ numIterations,
+ stepSize,
+ regParam,
+ miniBatchFraction,
+ Vectors.dense(initialWeights)),
+ dataBytesJRDD,
+ initialWeightsBA)
}
/**
* Java stub for Python mllib LogisticRegressionWithSGD.train()
*/
- def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
- numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ def trainLogisticRegressionModelWithSGD(
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int,
+ stepSize: Double,
+ miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- trainRegressionModel((data, initialWeights) =>
- LogisticRegressionWithSGD.train(data, numIterations, stepSize,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
+ trainRegressionModel(
+ (data, initialWeights) =>
+ LogisticRegressionWithSGD.train(
+ data,
+ numIterations,
+ stepSize,
+ miniBatchFraction,
+ Vectors.dense(initialWeights)),
+ dataBytesJRDD,
+ initialWeightsBA)
}
/**
* Java stub for NaiveBayes.train()
*/
- def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double)
- : java.util.List[java.lang.Object] =
- {
+ def trainNaiveBayes(
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ lambda: Double): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
- LabeledPoint(x(0), x.slice(1, x.length))
+ LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleVector(model.labels))
ret.add(serializeDoubleVector(model.pi))
ret.add(serializeDoubleMatrix(model.theta))
ret
@@ -204,9 +256,12 @@ class PythonMLLibAPI extends Serializable {
/**
* Java stub for Python mllib KMeans.train()
*/
- def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
- maxIterations: Int, runs: Int, initializationMode: String):
- java.util.List[java.lang.Object] = {
+ def trainKMeansModel(
+ dataBytesJRDD: JavaRDD[Array[Byte]],
+ k: Int,
+ maxIterations: Int,
+ runs: Int,
+ initializationMode: String): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
val ret = new java.util.LinkedList[java.lang.Object]()
@@ -259,8 +314,12 @@ class PythonMLLibAPI extends Serializable {
* needs to be taken in the Python code to ensure it gets freed on exit; see
* the Py4J documentation.
*/
- def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
- iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+ def trainALSModel(
+ ratingsBytesJRDD: JavaRDD[Array[Byte]],
+ rank: Int,
+ iterations: Int,
+ lambda: Double,
+ blocks: Int): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
ALS.train(ratings, rank, iterations, lambda, blocks)
}
@@ -271,8 +330,13 @@ class PythonMLLibAPI extends Serializable {
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
*/
- def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
- iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+ def trainImplicitALSModel(
+ ratingsBytesJRDD: JavaRDD[Array[Byte]],
+ rank: Int,
+ iterations: Int,
+ lambda: Double,
+ blocks: Int,
+ alpha: Double): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
index 391f5b9b7a..bd10e2e9e1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
@@ -17,22 +17,27 @@
package org.apache.spark.mllib.classification
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
+/**
+ * Represents a classification model that predicts to which of a set of categories an example
+ * belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc.
+ */
trait ClassificationModel extends Serializable {
/**
* Predict values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
- * @return RDD[Int] where each entry contains the corresponding prediction
+ * @return an RDD[Double] where each entry contains the corresponding prediction
*/
- def predict(testData: RDD[Array[Double]]): RDD[Double]
+ def predict(testData: RDD[Vector]): RDD[Double]
/**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
- * @return Int prediction from the trained model
+ * @return predicted category from the trained model
*/
- def predict(testData: Array[Double]): Double
+ def predict(testData: Vector): Double
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index a481f52276..798f3a5c94 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -17,16 +17,12 @@
package org.apache.spark.mllib.classification
-import scala.math.round
-
import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.util.DataValidators
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.util.{DataValidators, MLUtils}
+import org.apache.spark.rdd.RDD
/**
* Classification model trained using Logistic Regression.
@@ -35,15 +31,38 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class LogisticRegressionModel(
- override val weights: Array[Double],
+ override val weights: Vector,
override val intercept: Double)
- extends GeneralizedLinearModel(weights, intercept)
- with ClassificationModel with Serializable {
+ extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
+
+ private var threshold: Option[Double] = Some(0.5)
+
+ /**
+ * Sets the threshold that separates positive predictions from negative predictions. An example
+ * with prediction score greater than or equal to this threshold is identified as an positive,
+ * and negative otherwise. The default value is 0.5.
+ */
+ def setThreshold(threshold: Double): this.type = {
+ this.threshold = Some(threshold)
+ this
+ }
- override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ /**
+ * Clears the threshold so that `predict` will output raw prediction scores.
+ */
+ def clearThreshold(): this.type = {
+ threshold = None
+ this
+ }
+
+ override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
- val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
- round(1.0/ (1.0 + math.exp(margin * -1)))
+ val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+ val score = 1.0/ (1.0 + math.exp(-margin))
+ threshold match {
+ case Some(t) => if (score < t) 0.0 else 1.0
+ case None => score
+ }
}
}
@@ -56,16 +75,15 @@ class LogisticRegressionWithSGD private (
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
- extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
- with Serializable {
+ extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {
val gradient = new LogisticGradient()
val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
- .setStepSize(stepSize)
- .setNumIterations(numIterations)
- .setRegParam(regParam)
- .setMiniBatchFraction(miniBatchFraction)
+ .setStepSize(stepSize)
+ .setNumIterations(numIterations)
+ .setRegParam(regParam)
+ .setMiniBatchFraction(miniBatchFraction)
override val validators = List(DataValidators.classificationLabels)
/**
@@ -73,7 +91,7 @@ class LogisticRegressionWithSGD private (
*/
def this() = this(1.0, 100, 0.0, 1.0)
- def createModel(weights: Array[Double], intercept: Double) = {
+ def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
}
}
@@ -105,11 +123,9 @@ object LogisticRegressionWithSGD {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
- initialWeights: Array[Double])
- : LogisticRegressionModel =
- {
- new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
- input, initialWeights)
+ initialWeights: Vector): LogisticRegressionModel = {
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
+ .run(input, initialWeights)
}
/**
@@ -128,11 +144,9 @@ object LogisticRegressionWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
- miniBatchFraction: Double)
- : LogisticRegressionModel =
- {
- new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
- input)
+ miniBatchFraction: Double): LogisticRegressionModel = {
+ new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
+ .run(input)
}
/**
@@ -150,9 +164,7 @@ object LogisticRegressionWithSGD {
def train(
input: RDD[LabeledPoint],
numIterations: Int,
- stepSize: Double)
- : LogisticRegressionModel =
- {
+ stepSize: Double): LogisticRegressionModel = {
train(input, numIterations, stepSize, 1.0)
}
@@ -168,9 +180,7 @@ object LogisticRegressionWithSGD {
*/
def train(
input: RDD[LabeledPoint],
- numIterations: Int)
- : LogisticRegressionModel =
- {
+ numIterations: Int): LogisticRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}
@@ -183,7 +193,7 @@ object LogisticRegressionWithSGD {
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
- println("Weights: " + model.weights.mkString("[", ", ", "]"))
+ println("Weights: " + model.weights)
println("Intercept: " + model.intercept)
sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 6539b2f339..e956185319 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -17,14 +17,14 @@
package org.apache.spark.mllib.classification
-import scala.collection.mutable
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
-import org.jblas.DoubleMatrix
-
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.rdd.RDD
/**
* Model for Naive Bayes Classifiers.
@@ -32,19 +32,28 @@ import org.apache.spark.mllib.util.MLUtils
* @param pi Log of class priors, whose dimension is C.
* @param theta Log of class conditional probabilities, whose dimension is CxD.
*/
-class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
- extends ClassificationModel with Serializable {
-
- // Create a column vector that can be used for predictions
- private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
- private val _theta = new DoubleMatrix(theta)
+class NaiveBayesModel(
+ val labels: Array[Double],
+ val pi: Array[Double],
+ val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
+
+ private val brzPi = new BDV[Double](pi)
+ private val brzTheta = new BDM[Double](theta.length, theta(0).length)
+
+ var i = 0
+ while (i < theta.length) {
+ var j = 0
+ while (j < theta(i).length) {
+ brzTheta(i, j) = theta(i)(j)
+ j += 1
+ }
+ i += 1
+ }
- def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
+ override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
- def predict(testData: Array[Double]): Double = {
- val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
- val result = _pi.add(_theta.mmul(dataMatrix))
- result.argmax()
+ override def predict(testData: Vector): Double = {
+ labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
}
}
@@ -56,9 +65,8 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
* document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*/
-class NaiveBayes private (var lambda: Double)
- extends Serializable with Logging
-{
+class NaiveBayes private (var lambda: Double) extends Serializable with Logging {
+
def this() = this(1.0)
/** Set the smoothing parameter. Default: 1.0. */
@@ -70,45 +78,42 @@ class NaiveBayes private (var lambda: Double)
/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
*
- * @param data RDD of (label, array of features) pairs.
+ * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
def run(data: RDD[LabeledPoint]) = {
- // Aggregates all sample points to driver side to get sample count and summed feature vector
- // for each label. The shape of `zeroCombiner` & `aggregated` is:
- //
- // label: Int -> (count: Int, featuresSum: DoubleMatrix)
- val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
- val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
- point match {
- case LabeledPoint(label, features) =>
- val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
- val fs = new DoubleMatrix(features.length, 1, features: _*)
- combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
- }
- }, { (lhs, rhs) =>
- for ((label, (c, fs)) <- rhs) {
- val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
- lhs(label) = (count + c, featuresSum.addi(fs))
+ // Aggregates term frequencies per label.
+ // TODO: Calling combineByKey and collect creates two stages, we can implement something
+ // TODO: similar to reduceByKeyLocally to save one stage.
+ val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](
+ createCombiner = (v: Vector) => (1L, v.toBreeze.toDenseVector),
+ mergeValue = (c: (Long, BDV[Double]), v: Vector) => (c._1 + 1L, c._2 += v.toBreeze),
+ mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>
+ (c1._1 + c2._1, c1._2 += c2._2)
+ ).collect()
+ val numLabels = aggregated.length
+ var numDocuments = 0L
+ aggregated.foreach { case (_, (n, _)) =>
+ numDocuments += n
+ }
+ val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
+ val labels = new Array[Double](numLabels)
+ val pi = new Array[Double](numLabels)
+ val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
+ val piLogDenom = math.log(numDocuments + numLabels * lambda)
+ var i = 0
+ aggregated.foreach { case (label, (n, sumTermFreqs)) =>
+ labels(i) = label
+ val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda)
+ pi(i) = math.log(n + lambda) - piLogDenom
+ var j = 0
+ while (j < numFeatures) {
+ theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
+ j += 1
}
- lhs
- })
-
- // Kinds of label
- val C = aggregated.size
- // Total sample count
- val N = aggregated.values.map(_._1).sum
-
- val pi = new Array[Double](C)
- val theta = new Array[Array[Double]](C)
- val piLogDenom = math.log(N + C * lambda)
-
- for ((label, (count, fs)) <- aggregated) {
- val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
- pi(label) = math.log(count + lambda) - piLogDenom
- theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
+ i += 1
}
- new NaiveBayesModel(pi, theta)
+ new NaiveBayesModel(labels, pi, theta)
}
}
@@ -158,8 +163,9 @@ object NaiveBayes {
} else {
NaiveBayes.train(data, args(2).toDouble)
}
- println("Pi: " + model.pi.mkString("[", ", ", "]"))
- println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]"))
+
+ println("Pi\n: " + model.pi)
+ println("Theta:\n" + model.theta)
sc.stop()
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
index 6dff29dfb4..e31a08899f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
@@ -18,13 +18,11 @@
package org.apache.spark.mllib.classification
import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.util.DataValidators
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.util.{DataValidators, MLUtils}
+import org.apache.spark.rdd.RDD
/**
* Model for Support Vector Machines (SVMs).
@@ -33,15 +31,37 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class SVMModel(
- override val weights: Array[Double],
+ override val weights: Vector,
override val intercept: Double)
- extends GeneralizedLinearModel(weights, intercept)
- with ClassificationModel with Serializable {
+ extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
+
+ private var threshold: Option[Double] = Some(0.0)
+
+ /**
+ * Sets the threshold that separates positive predictions from negative predictions. An example
+ * with prediction score greater than or equal to this threshold is identified as an positive,
+ * and negative otherwise. The default value is 0.0.
+ */
+ def setThreshold(threshold: Double): this.type = {
+ this.threshold = Some(threshold)
+ this
+ }
- override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+ /**
+ * Clears the threshold so that `predict` will output raw prediction scores.
+ */
+ def clearThreshold(): this.type = {
+ threshold = None
+ this
+ }
+
+ override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
- val margin = dataMatrix.dot(weightMatrix) + intercept
- if (margin < 0) 0.0 else 1.0
+ val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+ threshold match {
+ case Some(t) => if (margin < 0) 0.0 else 1.0
+ case None => margin
+ }
}
}
@@ -71,7 +91,7 @@ class SVMWithSGD private (
*/
def this() = this(1.0, 100, 1.0, 1.0)
- def createModel(weights: Array[Double], intercept: Double) = {
+ def createModel(weights: Vector, intercept: Double) = {
new SVMModel(weights, intercept)
}
}
@@ -103,11 +123,9 @@ object SVMWithSGD {
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
- initialWeights: Array[Double])
- : SVMModel =
- {
- new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
- initialWeights)
+ initialWeights: Vector): SVMModel = {
+ new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction)
+ .run(input, initialWeights)
}
/**
@@ -127,9 +145,7 @@ object SVMWithSGD {
numIterations: Int,
stepSize: Double,
regParam: Double,
- miniBatchFraction: Double)
- : SVMModel =
- {
+ miniBatchFraction: Double): SVMModel = {
new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
}
@@ -149,9 +165,7 @@ object SVMWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
- regParam: Double)
- : SVMModel =
- {
+ regParam: Double): SVMModel = {
train(input, numIterations, stepSize, regParam, 1.0)
}
@@ -165,11 +179,7 @@ object SVMWithSGD {
* @param numIterations Number of iterations of gradient descent to run.
* @return a SVMModel which has the weights and offset from training.
*/
- def train(
- input: RDD[LabeledPoint],
- numIterations: Int)
- : SVMModel =
- {
+ def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel = {
train(input, numIterations, 1.0, 1.0, 1.0)
}
@@ -181,7 +191,8 @@ object SVMWithSGD {
val sc = new SparkContext(args(0), "SVM")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
- println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+ println("Weights: " + model.weights)
println("Intercept: " + model.intercept)
sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index b412738e3f..a78503df31 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -42,8 +42,7 @@ class KMeans private (
var runs: Int,
var initializationMode: String,
var initializationSteps: Int,
- var epsilon: Double)
- extends Serializable with Logging {
+ var epsilon: Double) extends Serializable with Logging {
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
/** Set the number of clusters to create (k). Default: 2. */
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 01c1501548..2cea58cd3f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -54,6 +54,12 @@ trait Vector extends Serializable {
* Converts the instance to a breeze vector.
*/
private[mllib] def toBreeze: BV[Double]
+
+ /**
+ * Gets the value of the ith element.
+ * @param i index
+ */
+ private[mllib] def apply(i: Int): Double = toBreeze(i)
}
/**
@@ -145,6 +151,8 @@ class DenseVector(val values: Array[Double]) extends Vector {
override def toArray: Array[Double] = values
private[mllib] override def toBreeze: BV[Double] = new BDV[Double](values)
+
+ override def apply(i: Int) = values(i)
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
index 82124703da..2065428496 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
@@ -17,7 +17,9 @@
package org.apache.spark.mllib.optimization
-import org.jblas.DoubleMatrix
+import breeze.linalg.{axpy => brzAxpy}
+
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
* Class used to compute the gradient for a loss function, given a single data point.
@@ -26,17 +28,26 @@ abstract class Gradient extends Serializable {
/**
* Compute the gradient and loss given the features of a single data point.
*
- * @param data - Feature values for one data point. Column matrix of size dx1
- * where d is the number of features.
- * @param label - Label for this data item.
- * @param weights - Column matrix containing weights for every feature.
+ * @param data features for one data point
+ * @param label label for this data point
+ * @param weights weights/coefficients corresponding to features
*
- * @return A tuple of 2 elements. The first element is a column matrix containing the computed
- * gradient and the second element is the loss computed at this data point.
+ * @return (gradient: Vector, loss: Double)
+ */
+ def compute(data: Vector, label: Double, weights: Vector): (Vector, Double)
+
+ /**
+ * Compute the gradient and loss given the features of a single data point,
+ * add the gradient to a provided vector to avoid creating new objects, and return loss.
*
+ * @param data features for one data point
+ * @param label label for this data point
+ * @param weights weights/coefficients corresponding to features
+ * @param cumGradient the computed gradient will be added to this vector
+ *
+ * @return loss
*/
- def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
- (DoubleMatrix, Double)
+ def compute(data: Vector, label: Double, weights: Vector, cumGradient: Vector): Double
}
/**
@@ -44,12 +55,12 @@ abstract class Gradient extends Serializable {
* See also the documentation for the precise formulation.
*/
class LogisticGradient extends Gradient {
- override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
- (DoubleMatrix, Double) = {
- val margin: Double = -1.0 * data.dot(weights)
+ override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+ val brzData = data.toBreeze
+ val brzWeights = weights.toBreeze
+ val margin: Double = -1.0 * brzWeights.dot(brzData)
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
-
- val gradient = data.mul(gradientMultiplier)
+ val gradient = brzData * gradientMultiplier
val loss =
if (label > 0) {
math.log(1 + math.exp(margin))
@@ -57,7 +68,26 @@ class LogisticGradient extends Gradient {
math.log(1 + math.exp(margin)) - margin
}
- (gradient, loss)
+ (Vectors.fromBreeze(gradient), loss)
+ }
+
+ override def compute(
+ data: Vector,
+ label: Double,
+ weights: Vector,
+ cumGradient: Vector): Double = {
+ val brzData = data.toBreeze
+ val brzWeights = weights.toBreeze
+ val margin: Double = -1.0 * brzWeights.dot(brzData)
+ val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
+
+ brzAxpy(gradientMultiplier, brzData, cumGradient.toBreeze)
+
+ if (label > 0) {
+ math.log(1 + math.exp(margin))
+ } else {
+ math.log(1 + math.exp(margin)) - margin
+ }
}
}
@@ -68,14 +98,28 @@ class LogisticGradient extends Gradient {
* See also the documentation for the precise formulation.
*/
class LeastSquaresGradient extends Gradient {
- override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
- (DoubleMatrix, Double) = {
- val diff: Double = data.dot(weights) - label
-
+ override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+ val brzData = data.toBreeze
+ val brzWeights = weights.toBreeze
+ val diff = brzWeights.dot(brzData) - label
val loss = diff * diff
- val gradient = data.mul(2.0 * diff)
+ val gradient = brzData * (2.0 * diff)
- (gradient, loss)
+ (Vectors.fromBreeze(gradient), loss)
+ }
+
+ override def compute(
+ data: Vector,
+ label: Double,
+ weights: Vector,
+ cumGradient: Vector): Double = {
+ val brzData = data.toBreeze
+ val brzWeights = weights.toBreeze
+ val diff = brzWeights.dot(brzData) - label
+
+ brzAxpy(2.0 * diff, brzData, cumGradient.toBreeze)
+
+ diff * diff
}
}
@@ -85,19 +129,40 @@ class LeastSquaresGradient extends Gradient {
* NOTE: This assumes that the labels are {0,1}
*/
class HingeGradient extends Gradient {
- override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
- (DoubleMatrix, Double) = {
+ override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+ val brzData = data.toBreeze
+ val brzWeights = weights.toBreeze
+ val dotProduct = brzWeights.dot(brzData)
+
+ // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
+ // Therefore the gradient is -(2y - 1)*x
+ val labelScaled = 2 * label - 1.0
+
+ if (1.0 > labelScaled * dotProduct) {
+ (Vectors.fromBreeze(brzData * (-labelScaled)), 1.0 - labelScaled * dotProduct)
+ } else {
+ (Vectors.dense(new Array[Double](weights.size)), 0.0)
+ }
+ }
- val dotProduct = data.dot(weights)
+ override def compute(
+ data: Vector,
+ label: Double,
+ weights: Vector,
+ cumGradient: Vector): Double = {
+ val brzData = data.toBreeze
+ val brzWeights = weights.toBreeze
+ val dotProduct = brzWeights.dot(brzData)
// Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = 2 * label - 1.0
if (1.0 > labelScaled * dotProduct) {
- (data.mul(-labelScaled), 1.0 - labelScaled * dotProduct)
+ brzAxpy(-labelScaled, brzData, cumGradient.toBreeze)
+ 1.0 - labelScaled * dotProduct
} else {
- (DoubleMatrix.zeros(1, weights.length), 0.0)
+ 0.0
}
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index b967b22e81..d0777ffd63 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -17,12 +17,13 @@
package org.apache.spark.mllib.optimization
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
+import scala.collection.mutable.ArrayBuffer
-import org.jblas.DoubleMatrix
+import breeze.linalg.{Vector => BV, DenseVector => BDV}
-import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
* Class used to solve an optimization problem using Gradient Descent.
@@ -91,18 +92,16 @@ class GradientDescent(var gradient: Gradient, var updater: Updater)
this
}
- def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
- : Array[Double] = {
-
- val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
- data,
- gradient,
- updater,
- stepSize,
- numIterations,
- regParam,
- miniBatchFraction,
- initialWeights)
+ def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
+ val (weights, _) = GradientDescent.runMiniBatchSGD(
+ data,
+ gradient,
+ updater,
+ stepSize,
+ numIterations,
+ regParam,
+ miniBatchFraction,
+ initialWeights)
weights
}
@@ -133,14 +132,14 @@ object GradientDescent extends Logging {
* stochastic loss computed for every iteration.
*/
def runMiniBatchSGD(
- data: RDD[(Double, Array[Double])],
+ data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
- initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
+ initialWeights: Vector): (Vector, Array[Double]) = {
val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
@@ -148,24 +147,27 @@ object GradientDescent extends Logging {
val miniBatchSize = nexamples * miniBatchFraction
// Initialize weights as a column vector
- var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
+ var weights = Vectors.dense(initialWeights.toArray)
/**
* For the first iteration, the regVal will be initialized as sum of sqrt of
* weights if it's L2 update; for L1 update; the same logic is followed.
*/
var regVal = updater.compute(
- weights, new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2
+ weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
for (i <- 1 to numIterations) {
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map {
- case (y, features) =>
- val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
- val (grad, loss) = gradient.compute(featuresCol, y, weights)
- (grad, loss)
- }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
+ val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
+ .aggregate((BDV.zeros[Double](weights.size), 0.0))(
+ seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
+ val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad))
+ (grad, loss + l)
+ },
+ combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
+ (grad1 += grad2, loss1 + loss2)
+ })
/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
@@ -173,7 +175,7 @@ object GradientDescent extends Logging {
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
- weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
+ weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2
}
@@ -181,6 +183,6 @@ object GradientDescent extends Logging {
logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
- (weights.toArray, stochasticLossHistory.toArray)
+ (weights, stochasticLossHistory.toArray)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
index 94d30b56f2..f9ce908a5f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
@@ -19,11 +19,12 @@ package org.apache.spark.mllib.optimization
import org.apache.spark.rdd.RDD
-trait Optimizer {
+import org.apache.spark.mllib.linalg.Vector
+
+trait Optimizer extends Serializable {
/**
* Solve the provided convex optimization problem.
*/
- def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double]
-
+ def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
index bf8f731459..3b7754cd7a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
@@ -18,7 +18,10 @@
package org.apache.spark.mllib.optimization
import scala.math._
-import org.jblas.DoubleMatrix
+
+import breeze.linalg.{norm => brzNorm, axpy => brzAxpy, Vector => BV}
+
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
* Class used to perform steps (weight update) using Gradient Descent methods.
@@ -47,8 +50,12 @@ abstract class Updater extends Serializable {
* @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
* and the second element is the regularization value computed using updated weights.
*/
- def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int,
- regParam: Double): (DoubleMatrix, Double)
+ def compute(
+ weightsOld: Vector,
+ gradient: Vector,
+ stepSize: Double,
+ iter: Int,
+ regParam: Double): (Vector, Double)
}
/**
@@ -56,11 +63,17 @@ abstract class Updater extends Serializable {
* Uses a step-size decreasing with the square root of the number of iterations.
*/
class SimpleUpdater extends Updater {
- override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
- stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+ override def compute(
+ weightsOld: Vector,
+ gradient: Vector,
+ stepSize: Double,
+ iter: Int,
+ regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
- val step = gradient.mul(thisIterStepSize)
- (weightsOld.sub(step), 0)
+ val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+ brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
+
+ (Vectors.fromBreeze(brzWeights), 0)
}
}
@@ -83,19 +96,26 @@ class SimpleUpdater extends Updater {
* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
*/
class L1Updater extends Updater {
- override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
- stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+ override def compute(
+ weightsOld: Vector,
+ gradient: Vector,
+ stepSize: Double,
+ iter: Int,
+ regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
- val step = gradient.mul(thisIterStepSize)
// Take gradient step
- val newWeights = weightsOld.sub(step)
+ val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+ brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
// Apply proximal operator (soft thresholding)
val shrinkageVal = regParam * thisIterStepSize
- (0 until newWeights.length).foreach { i =>
- val wi = newWeights.get(i)
- newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal))
+ var i = 0
+ while (i < brzWeights.length) {
+ val wi = brzWeights(i)
+ brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal)
+ i += 1
}
- (newWeights, newWeights.norm1 * regParam)
+
+ (Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam)
}
}
@@ -105,16 +125,23 @@ class L1Updater extends Updater {
* Uses a step-size decreasing with the square root of the number of iterations.
*/
class SquaredL2Updater extends Updater {
- override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
- stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
- val thisIterStepSize = stepSize / math.sqrt(iter)
- val step = gradient.mul(thisIterStepSize)
+ override def compute(
+ weightsOld: Vector,
+ gradient: Vector,
+ stepSize: Double,
+ iter: Int,
+ regParam: Double): (Vector, Double) = {
// add up both updates from the gradient of the loss (= step) as well as
// the gradient of the regularizer (= regParam * weightsOld)
// w' = w - thisIterStepSize * (gradient + regParam * w)
// w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
- val newWeights = weightsOld.mul(1.0 - thisIterStepSize * regParam).sub(step)
- (newWeights, 0.5 * pow(newWeights.norm2, 2.0) * regParam)
+ val thisIterStepSize = stepSize / math.sqrt(iter)
+ val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+ brzWeights :*= (1.0 - thisIterStepSize * regParam)
+ brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
+ val norm = brzNorm(brzWeights, 2.0)
+
+ (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 3e1ed91bf6..80dc0f12ff 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -17,11 +17,12 @@
package org.apache.spark.mllib.regression
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
+
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
* GeneralizedLinearModel (GLM) represents a model trained using
@@ -31,12 +32,9 @@ import org.jblas.DoubleMatrix
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
-abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double)
+abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double)
extends Serializable {
- // Create a column vector that can be used for predictions
- private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
/**
* Predict the result given a data point and the weights learned.
*
@@ -44,8 +42,7 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
* @param weightMatrix Column vector containing the weights of the model
* @param intercept Intercept of the model.
*/
- def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
- intercept: Double): Double
+ protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
/**
* Predict values for the given data set using the model trained.
@@ -53,16 +50,13 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
* @param testData RDD representing data points to be predicted
* @return RDD[Double] where each entry contains the corresponding prediction
*/
- def predict(testData: RDD[Array[Double]]): RDD[Double] = {
+ def predict(testData: RDD[Vector]): RDD[Double] = {
// A small optimization to avoid serializing the entire model. Only the weightsMatrix
// and intercept is needed.
- val localWeights = weightsMatrix
+ val localWeights = weights
val localIntercept = intercept
- testData.map { x =>
- val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
- predictPoint(dataMatrix, localWeights, localIntercept)
- }
+ testData.map(v => predictPoint(v, localWeights, localIntercept))
}
/**
@@ -71,14 +65,13 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
* @param testData array representing a single data point
* @return Double prediction from the trained model
*/
- def predict(testData: Array[Double]): Double = {
- val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
- predictPoint(dataMat, weightsMatrix, intercept)
+ def predict(testData: Vector): Double = {
+ predictPoint(testData, weights, intercept)
}
}
/**
- * GeneralizedLinearAlgorithm implements methods to train a Genearalized Linear Model (GLM).
+ * GeneralizedLinearAlgorithm implements methods to train a Generalized Linear Model (GLM).
* This class should be extended with an Optimizer to create a new GLM.
*/
abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
@@ -88,6 +81,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
val optimizer: Optimizer
+ /** Whether to add intercept (default: true). */
protected var addIntercept: Boolean = true
protected var validateData: Boolean = true
@@ -95,7 +89,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
/**
* Create a model given the weights and intercept
*/
- protected def createModel(weights: Array[Double], intercept: Double): M
+ protected def createModel(weights: Vector, intercept: Double): M
/**
* Set if the algorithm should add an intercept. Default true.
@@ -117,17 +111,27 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
* Run the algorithm with the configured parameters on an input
* RDD of LabeledPoint entries.
*/
- def run(input: RDD[LabeledPoint]) : M = {
- val nfeatures: Int = input.first().features.length
- val initialWeights = new Array[Double](nfeatures)
+ def run(input: RDD[LabeledPoint]): M = {
+ val numFeatures: Int = input.first().features.size
+ val initialWeights = Vectors.dense(new Array[Double](numFeatures))
run(input, initialWeights)
}
+ /** Prepends one to the input vector. */
+ private def prependOne(vector: Vector): Vector = {
+ val vector1 = vector.toBreeze match {
+ case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv)
+ case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv)
+ case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+ }
+ Vectors.fromBreeze(vector1)
+ }
+
/**
* Run the algorithm with the configured parameters on an input RDD
* of LabeledPoint entries starting from the initial weights provided.
*/
- def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {
+ def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
// Check the data properties before running the optimizer
if (validateData && !validators.forall(func => func(input))) {
@@ -136,27 +140,26 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
// Prepend an extra variable consisting of all 1.0's for the intercept.
val data = if (addIntercept) {
- input.map(labeledPoint => (labeledPoint.label, 1.0 +: labeledPoint.features))
+ input.map(labeledPoint => (labeledPoint.label, prependOne(labeledPoint.features)))
} else {
input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
}
val initialWeightsWithIntercept = if (addIntercept) {
- 0.0 +: initialWeights
+ prependOne(initialWeights)
} else {
initialWeights
}
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
- val (intercept, weights) = if (addIntercept) {
- (weightsWithIntercept(0), weightsWithIntercept.tail)
- } else {
- (0.0, weightsWithIntercept)
- }
-
- logInfo("Final weights " + weights.mkString(","))
- logInfo("Final intercept " + intercept)
+ val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0
+ val weights =
+ if (addIntercept) {
+ Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size))
+ } else {
+ weightsWithIntercept
+ }
createModel(weights, intercept)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 1a18292fe3..3deab1ab78 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -17,14 +17,16 @@
package org.apache.spark.mllib.regression
+import org.apache.spark.mllib.linalg.Vector
+
/**
* Class that represents the features and labels of a data point.
*
* @param label Label for this data point.
* @param features List of features for this data point.
*/
-case class LabeledPoint(label: Double, features: Array[Double]) {
+case class LabeledPoint(label: Double, features: Vector) {
override def toString: String = {
- "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]"))
+ "LabeledPoint(%s, %s)".format(label, features)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
index be63ce8538..25920d0dc9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
@@ -17,12 +17,11 @@
package org.apache.spark.mllib.regression
-import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.util.MLUtils
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.rdd.RDD
/**
* Regression model trained using Lasso.
@@ -31,16 +30,16 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class LassoModel(
- override val weights: Array[Double],
+ override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with RegressionModel with Serializable {
- override def predictPoint(
- dataMatrix: DoubleMatrix,
- weightMatrix: DoubleMatrix,
+ override protected def predictPoint(
+ dataMatrix: Vector,
+ weightMatrix: Vector,
intercept: Double): Double = {
- dataMatrix.dot(weightMatrix) + intercept
+ weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}
@@ -57,8 +56,7 @@ class LassoWithSGD private (
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
- extends GeneralizedLinearAlgorithm[LassoModel]
- with Serializable {
+ extends GeneralizedLinearAlgorithm[LassoModel] with Serializable {
val gradient = new LeastSquaresGradient()
val updater = new L1Updater()
@@ -70,10 +68,6 @@ class LassoWithSGD private (
// We don't want to penalize the intercept, so set this to false.
super.setIntercept(false)
- var yMean = 0.0
- var xColMean: DoubleMatrix = _
- var xColSd: DoubleMatrix = _
-
/**
* Construct a Lasso object with default parameters
*/
@@ -85,36 +79,8 @@ class LassoWithSGD private (
this
}
- override def createModel(weights: Array[Double], intercept: Double) = {
- val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*)
- val weightsScaled = weightsMat.div(xColSd)
- val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)
-
- new LassoModel(weightsScaled.data, interceptScaled)
- }
-
- override def run(
- input: RDD[LabeledPoint],
- initialWeights: Array[Double])
- : LassoModel =
- {
- val nfeatures: Int = input.first.features.length
- val nexamples: Long = input.count()
-
- // To avoid penalizing the intercept, we center and scale the data.
- val stats = MLUtils.computeStats(input, nfeatures, nexamples)
- yMean = stats._1
- xColMean = stats._2
- xColSd = stats._3
-
- val normalizedData = input.map { point =>
- val yNormalized = point.label - yMean
- val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
- val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
- LabeledPoint(yNormalized, featuresNormalized.toArray)
- }
-
- super.run(normalizedData, initialWeights)
+ override protected def createModel(weights: Vector, intercept: Double) = {
+ new LassoModel(weights, intercept)
}
}
@@ -144,11 +110,9 @@ object LassoWithSGD {
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
- initialWeights: Array[Double])
- : LassoModel =
- {
- new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
- initialWeights)
+ initialWeights: Vector): LassoModel = {
+ new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction)
+ .run(input, initialWeights)
}
/**
@@ -168,9 +132,7 @@ object LassoWithSGD {
numIterations: Int,
stepSize: Double,
regParam: Double,
- miniBatchFraction: Double)
- : LassoModel =
- {
+ miniBatchFraction: Double): LassoModel = {
new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
}
@@ -190,9 +152,7 @@ object LassoWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
- regParam: Double)
- : LassoModel =
- {
+ regParam: Double): LassoModel = {
train(input, numIterations, stepSize, regParam, 1.0)
}
@@ -208,9 +168,7 @@ object LassoWithSGD {
*/
def train(
input: RDD[LabeledPoint],
- numIterations: Int)
- : LassoModel =
- {
+ numIterations: Int): LassoModel = {
train(input, numIterations, 1.0, 1.0, 1.0)
}
@@ -222,7 +180,8 @@ object LassoWithSGD {
val sc = new SparkContext(args(0), "Lasso")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
- println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+ println("Weights: " + model.weights)
println("Intercept: " + model.intercept)
sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
index f5f15d1a33..9ed927994e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
@@ -19,11 +19,10 @@ package org.apache.spark.mllib.regression
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.util.MLUtils
-import org.jblas.DoubleMatrix
-
/**
* Regression model trained using LinearRegression.
*
@@ -31,15 +30,15 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class LinearRegressionModel(
- override val weights: Array[Double],
+ override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
- override def predictPoint(
- dataMatrix: DoubleMatrix,
- weightMatrix: DoubleMatrix,
+ override protected def predictPoint(
+ dataMatrix: Vector,
+ weightMatrix: Vector,
intercept: Double): Double = {
- dataMatrix.dot(weightMatrix) + intercept
+ weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}
@@ -69,7 +68,7 @@ class LinearRegressionWithSGD private (
*/
def this() = this(1.0, 100, 1.0)
- override def createModel(weights: Array[Double], intercept: Double) = {
+ override protected def createModel(weights: Vector, intercept: Double) = {
new LinearRegressionModel(weights, intercept)
}
}
@@ -98,11 +97,9 @@ object LinearRegressionWithSGD {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
- initialWeights: Array[Double])
- : LinearRegressionModel =
- {
- new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input,
- initialWeights)
+ initialWeights: Vector): LinearRegressionModel = {
+ new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
+ .run(input, initialWeights)
}
/**
@@ -120,9 +117,7 @@ object LinearRegressionWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
- miniBatchFraction: Double)
- : LinearRegressionModel =
- {
+ miniBatchFraction: Double): LinearRegressionModel = {
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
}
@@ -140,9 +135,7 @@ object LinearRegressionWithSGD {
def train(
input: RDD[LabeledPoint],
numIterations: Int,
- stepSize: Double)
- : LinearRegressionModel =
- {
+ stepSize: Double): LinearRegressionModel = {
train(input, numIterations, stepSize, 1.0)
}
@@ -158,9 +151,7 @@ object LinearRegressionWithSGD {
*/
def train(
input: RDD[LabeledPoint],
- numIterations: Int)
- : LinearRegressionModel =
- {
+ numIterations: Int): LinearRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}
@@ -172,7 +163,7 @@ object LinearRegressionWithSGD {
val sc = new SparkContext(args(0), "LinearRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
- println("Weights: " + model.weights.mkString("[", ", ", "]"))
+ println("Weights: " + model.weights)
println("Intercept: " + model.intercept)
sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
index 423afc32d6..5e4b8a345b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.regression
import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
trait RegressionModel extends Serializable {
/**
@@ -26,7 +27,7 @@ trait RegressionModel extends Serializable {
* @param testData RDD representing data points to be predicted
* @return RDD[Double] where each entry contains the corresponding prediction
*/
- def predict(testData: RDD[Array[Double]]): RDD[Double]
+ def predict(testData: RDD[Vector]): RDD[Double]
/**
* Predict values for a single data point using the model trained.
@@ -34,5 +35,5 @@ trait RegressionModel extends Serializable {
* @param testData array representing a single data point
* @return Double prediction from the trained model
*/
- def predict(testData: Array[Double]): Double
+ def predict(testData: Vector): Double
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
index feb100f218..1f17d2107f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
@@ -21,8 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.util.MLUtils
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.linalg.Vector
/**
* Regression model trained using RidgeRegression.
@@ -31,16 +30,16 @@ import org.jblas.DoubleMatrix
* @param intercept Intercept computed for this model.
*/
class RidgeRegressionModel(
- override val weights: Array[Double],
+ override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept)
with RegressionModel with Serializable {
- override def predictPoint(
- dataMatrix: DoubleMatrix,
- weightMatrix: DoubleMatrix,
+ override protected def predictPoint(
+ dataMatrix: Vector,
+ weightMatrix: Vector,
intercept: Double): Double = {
- dataMatrix.dot(weightMatrix) + intercept
+ weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}
@@ -57,8 +56,7 @@ class RidgeRegressionWithSGD private (
var numIterations: Int,
var regParam: Double,
var miniBatchFraction: Double)
- extends GeneralizedLinearAlgorithm[RidgeRegressionModel]
- with Serializable {
+ extends GeneralizedLinearAlgorithm[RidgeRegressionModel] with Serializable {
val gradient = new LeastSquaresGradient()
val updater = new SquaredL2Updater()
@@ -71,10 +69,6 @@ class RidgeRegressionWithSGD private (
// We don't want to penalize the intercept in RidgeRegression, so set this to false.
super.setIntercept(false)
- var yMean = 0.0
- var xColMean: DoubleMatrix = _
- var xColSd: DoubleMatrix = _
-
/**
* Construct a RidgeRegression object with default parameters
*/
@@ -86,36 +80,8 @@ class RidgeRegressionWithSGD private (
this
}
- override def createModel(weights: Array[Double], intercept: Double) = {
- val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*)
- val weightsScaled = weightsMat.div(xColSd)
- val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)
-
- new RidgeRegressionModel(weightsScaled.data, interceptScaled)
- }
-
- override def run(
- input: RDD[LabeledPoint],
- initialWeights: Array[Double])
- : RidgeRegressionModel =
- {
- val nfeatures: Int = input.first().features.length
- val nexamples: Long = input.count()
-
- // To avoid penalizing the intercept, we center and scale the data.
- val stats = MLUtils.computeStats(input, nfeatures, nexamples)
- yMean = stats._1
- xColMean = stats._2
- xColSd = stats._3
-
- val normalizedData = input.map { point =>
- val yNormalized = point.label - yMean
- val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
- val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
- LabeledPoint(yNormalized, featuresNormalized.toArray)
- }
-
- super.run(normalizedData, initialWeights)
+ override protected def createModel(weights: Vector, intercept: Double) = {
+ new RidgeRegressionModel(weights, intercept)
}
}
@@ -144,9 +110,7 @@ object RidgeRegressionWithSGD {
stepSize: Double,
regParam: Double,
miniBatchFraction: Double,
- initialWeights: Array[Double])
- : RidgeRegressionModel =
- {
+ initialWeights: Vector): RidgeRegressionModel = {
new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(
input, initialWeights)
}
@@ -167,9 +131,7 @@ object RidgeRegressionWithSGD {
numIterations: Int,
stepSize: Double,
regParam: Double,
- miniBatchFraction: Double)
- : RidgeRegressionModel =
- {
+ miniBatchFraction: Double): RidgeRegressionModel = {
new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
}
@@ -188,9 +150,7 @@ object RidgeRegressionWithSGD {
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
- regParam: Double)
- : RidgeRegressionModel =
- {
+ regParam: Double): RidgeRegressionModel = {
train(input, numIterations, stepSize, regParam, 1.0)
}
@@ -205,23 +165,22 @@ object RidgeRegressionWithSGD {
*/
def train(
input: RDD[LabeledPoint],
- numIterations: Int)
- : RidgeRegressionModel =
- {
+ numIterations: Int): RidgeRegressionModel = {
train(input, numIterations, 1.0, 1.0, 1.0)
}
def main(args: Array[String]) {
if (args.length != 5) {
- println("Usage: RidgeRegression <master> <input_dir> <step_size> <regularization_parameter>" +
- " <niters>")
+ println("Usage: RidgeRegression <master> <input_dir> <step_size> " +
+ "<regularization_parameter> <niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "RidgeRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble,
args(3).toDouble)
- println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+ println("Weights: " + model.weights)
println("Intercept: " + model.intercept)
sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index 33205b919d..dee9594a9d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -30,6 +30,7 @@ import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance}
import org.apache.spark.mllib.tree.model._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
/**
* A class that implements a decision tree algorithm for classification and regression. It
@@ -295,7 +296,7 @@ object DecisionTree extends Serializable with Logging {
val numNodes = scala.math.pow(2, level).toInt
logDebug("numNodes = " + numNodes)
// Find the number of features by looking at the first sample.
- val numFeatures = input.first().features.length
+ val numFeatures = input.first().features.size
logDebug("numFeatures = " + numFeatures)
val numBins = bins(0).length
logDebug("numBins = " + numBins)
@@ -902,7 +903,7 @@ object DecisionTree extends Serializable with Logging {
val count = input.count()
// Find the number of features by looking at the first sample
- val numFeatures = input.take(1)(0).features.length
+ val numFeatures = input.take(1)(0).features.size
val maxBins = strategy.maxBins
val numBins = if (maxBins <= count) maxBins else count.toInt
@@ -1116,7 +1117,7 @@ object DecisionTree extends Serializable with Logging {
sc.textFile(dir).map { line =>
val parts = line.trim().split(",")
val label = parts(0).toDouble
- val features = parts.slice(1,parts.length).map(_.toDouble)
+ val features = Vectors.dense(parts.slice(1,parts.length).map(_.toDouble))
LabeledPoint(label, features)
}
}
@@ -1127,7 +1128,7 @@ object DecisionTree extends Serializable with Logging {
*/
private def accuracyScore(model: DecisionTreeModel, data: RDD[LabeledPoint],
threshold: Double = 0.5): Double = {
- def predictedValue(features: Array[Double]) = {
+ def predictedValue(features: Vector) = {
if (model.predict(features) < threshold) 0.0 else 1.0
}
val correctCount = data.filter(y => predictedValue(y.features) == y.label).count()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index a8bbf21dae..a6dca84a2c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.tree.model
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
/**
* Model to store the decision tree parameters
@@ -33,7 +34,7 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable
* @param features array representing a single data point
* @return Double prediction from the trained model
*/
- def predict(features: Array[Double]): Double = {
+ def predict(features: Vector): Double = {
topNode.predictIfLeaf(features)
}
@@ -43,7 +44,7 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable
* @param features RDD representing data points to be predicted
* @return RDD[Int] where each entry contains the corresponding prediction
*/
- def predict(features: RDD[Array[Double]]): RDD[Double] = {
+ def predict(features: RDD[Vector]): RDD[Double] = {
features.map(x => predict(x))
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
index ea4693c5c2..aac3f9ce30 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.tree.model
import org.apache.spark.Logging
import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.mllib.linalg.Vector
/**
* Node in a decision tree
@@ -54,8 +55,8 @@ class Node (
logDebug("stats = " + stats)
logDebug("predict = " + predict)
if (!isLeaf) {
- val leftNodeIndex = id*2 + 1
- val rightNodeIndex = id*2 + 2
+ val leftNodeIndex = id * 2 + 1
+ val rightNodeIndex = id * 2 + 2
leftNode = Some(nodes(leftNodeIndex))
rightNode = Some(nodes(rightNodeIndex))
leftNode.get.build(nodes)
@@ -68,7 +69,7 @@ class Node (
* @param feature feature value
* @return predicted value
*/
- def predictIfLeaf(feature: Array[Double]) : Double = {
+ def predictIfLeaf(feature: Vector) : Double = {
if (isLeaf) {
predict
} else{
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index 2e03684e62..81e4eda2a6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -24,6 +24,7 @@ import org.jblas.DoubleMatrix
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
/**
@@ -74,7 +75,7 @@ object LinearDataGenerator {
val y = x.map { xi =>
new DoubleMatrix(1, xi.length, xi: _*).dot(weightsMat) + intercept + eps * rnd.nextGaussian()
}
- y.zip(x).map(p => LabeledPoint(p._1, p._2))
+ y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
index 52c4a71d62..61498dcc2b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -22,6 +22,7 @@ import scala.util.Random
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
/**
* Generate test data for LogisticRegression. This class chooses positive labels
@@ -54,7 +55,7 @@ object LogisticRegressionDataGenerator {
val x = Array.fill[Double](nfeatures) {
rnd.nextGaussian() + (y * eps)
}
- LabeledPoint(y, x)
+ LabeledPoint(y, Vectors.dense(x))
}
data
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 08cd9ab055..cb85e433bf 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -17,15 +17,13 @@
package org.apache.spark.mllib.util
+import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,
+ squaredDistance => breezeSquaredDistance}
+
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
-
-import org.jblas.DoubleMatrix
-
import org.apache.spark.mllib.regression.LabeledPoint
-
-import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
/**
* Helper methods to load, save and pre-process data used in ML Lib.
@@ -41,6 +39,107 @@ object MLUtils {
}
/**
+ * Multiclass label parser, which parses a string into double.
+ */
+ val multiclassLabelParser: String => Double = _.toDouble
+
+ /**
+ * Binary label parser, which outputs 1.0 (positive) if the value is greater than 0.5,
+ * or 0.0 (negative) otherwise.
+ */
+ val binaryLabelParser: String => Double = label => if (label.toDouble > 0.5) 1.0 else 0.0
+
+ /**
+ * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint].
+ * The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR.
+ * Each line represents a labeled sparse feature vector using the following format:
+ * {{{label index1:value1 index2:value2 ...}}}
+ * where the indices are one-based and in ascending order.
+ * This method parses each line into a [[org.apache.spark.mllib.regression.LabeledPoint]],
+ * where the feature indices are converted to zero-based.
+ *
+ * @param sc Spark context
+ * @param path file or directory path in any Hadoop-supported file system URI
+ * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise
+ * @param numFeatures number of features, which will be determined from the input data if a
+ * negative value is given. The default value is -1.
+ * @param minSplits min number of partitions, default: sc.defaultMinSplits
+ * @return labeled data stored as an RDD[LabeledPoint]
+ */
+ def loadLibSVMData(
+ sc: SparkContext,
+ path: String,
+ labelParser: String => Double,
+ numFeatures: Int,
+ minSplits: Int): RDD[LabeledPoint] = {
+ val parsed = sc.textFile(path, minSplits)
+ .map(_.trim)
+ .filter(!_.isEmpty)
+ .map(_.split(' '))
+ // Determine number of features.
+ val d = if (numFeatures >= 0) {
+ numFeatures
+ } else {
+ parsed.map { items =>
+ if (items.length > 1) {
+ items.last.split(':')(0).toInt
+ } else {
+ 0
+ }
+ }.reduce(math.max)
+ }
+ parsed.map { items =>
+ val label = labelParser(items.head)
+ val (indices, values) = items.tail.map { item =>
+ val indexAndValue = item.split(':')
+ val index = indexAndValue(0).toInt - 1
+ val value = indexAndValue(1).toDouble
+ (index, value)
+ }.unzip
+ LabeledPoint(label, Vectors.sparse(d, indices.toArray, values.toArray))
+ }
+ }
+
+ // Convenient methods for calling from Java.
+
+ /**
+ * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+ * with number of features determined automatically and the default number of partitions.
+ */
+ def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
+ loadLibSVMData(sc, path, binaryLabelParser, -1, sc.defaultMinSplits)
+
+ /**
+ * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+ * with number of features specified explicitly and the default number of partitions.
+ */
+ def loadLibSVMData(sc: SparkContext, path: String, numFeatures: Int): RDD[LabeledPoint] =
+ loadLibSVMData(sc, path, binaryLabelParser, numFeatures, sc.defaultMinSplits)
+
+ /**
+ * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+ * with the given label parser, number of features determined automatically,
+ * and the default number of partitions.
+ */
+ def loadLibSVMData(
+ sc: SparkContext,
+ path: String,
+ labelParser: String => Double): RDD[LabeledPoint] =
+ loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits)
+
+ /**
+ * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+ * with the given label parser, number of features specified explicitly,
+ * and the default number of partitions.
+ */
+ def loadLibSVMData(
+ sc: SparkContext,
+ path: String,
+ labelParser: String => Double,
+ numFeatures: Int): RDD[LabeledPoint] =
+ loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits)
+
+ /**
* Load labeled data from a file. The data format used here is
* <L>, <f1> <f2> ...
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
@@ -54,7 +153,7 @@ object MLUtils {
sc.textFile(dir).map { line =>
val parts = line.split(',')
val label = parts(0).toDouble
- val features = parts(1).trim().split(' ').map(_.toDouble)
+ val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble))
LabeledPoint(label, features)
}
}
@@ -68,7 +167,7 @@ object MLUtils {
* @param dir Directory to save the data.
*/
def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
- val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
+ val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" "))
dataStr.saveAsTextFile(dir)
}
@@ -76,44 +175,52 @@ object MLUtils {
* Utility function to compute mean and standard deviation on a given dataset.
*
* @param data - input data set whose statistics are computed
- * @param nfeatures - number of features
- * @param nexamples - number of examples in input dataset
+ * @param numFeatures - number of features
+ * @param numExamples - number of examples in input dataset
*
* @return (yMean, xColMean, xColSd) - Tuple consisting of
* yMean - mean of the labels
* xColMean - Row vector with mean for every column (or feature) of the input data
* xColSd - Row vector standard deviation for every column (or feature) of the input data.
*/
- def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long):
- (Double, DoubleMatrix, DoubleMatrix) = {
- val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples
-
- // NOTE: We shuffle X by column here to compute column sum and sum of squares.
- val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint =>
- val nCols = labeledPoint.features.length
- // Traverse over every column and emit (col, value, value^2)
- Iterator.tabulate(nCols) { i =>
- (i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i)))
- }
- }.reduceByKey { case(x1, x2) =>
- (x1._1 + x2._1, x1._2 + x2._2)
+ def computeStats(
+ data: RDD[LabeledPoint],
+ numFeatures: Int,
+ numExamples: Long): (Double, Vector, Vector) = {
+ val brzData = data.map { case LabeledPoint(label, features) =>
+ (label, features.toBreeze)
}
- val xColSumsMap = xColSumSq.collectAsMap()
-
- val xColMean = DoubleMatrix.zeros(nfeatures, 1)
- val xColSd = DoubleMatrix.zeros(nfeatures, 1)
-
- // Compute mean and unbiased variance using column sums
- var col = 0
- while (col < nfeatures) {
- xColMean.put(col, xColSumsMap(col)._1 / nexamples)
- val variance =
- (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / nexamples
- xColSd.put(col, math.sqrt(variance))
- col += 1
+ val aggStats = brzData.aggregate(
+ (0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures))
+ )(
+ seqOp = (c, v) => (c, v) match {
+ case ((n, sumLabel, sum, sumSq), (label, features)) =>
+ features.activeIterator.foreach { case (i, x) =>
+ sumSq(i) += x * x
+ }
+ (n + 1L, sumLabel + label, sum += features, sumSq)
+ },
+ combOp = (c1, c2) => (c1, c2) match {
+ case ((n1, sumLabel1, sum1, sumSq1), (n2, sumLabel2, sum2, sumSq2)) =>
+ (n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2)
+ }
+ )
+ val (nl, sumLabel, sum, sumSq) = aggStats
+
+ require(nl > 0, "Input data is empty.")
+ require(nl == numExamples)
+
+ val n = nl.toDouble
+ val yMean = sumLabel / n
+ val mean = sum / n
+ val std = new Array[Double](sum.length)
+ var i = 0
+ while (i < numFeatures) {
+ std(i) = sumSq(i) / n - mean(i) * mean(i)
+ i += 1
}
- (yMean, xColMean, xColSd)
+ (yMean, Vectors.fromBreeze(mean), Vectors.dense(std))
}
/**
@@ -144,6 +251,18 @@ object MLUtils {
val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
val normDiff = norm1 - norm2
var sqDist = 0.0
+ /*
+ * The relative error is
+ * <pre>
+ * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
+ * </pre>
+ * which is bounded by
+ * <pre>
+ * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
+ * </pre>
+ * The bound doesn't need the inner product, so we can use it as a sufficient condition to
+ * check quickly whether the inner product approach is accurate.
+ */
val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
if (precisionBound1 < precision) {
sqDist = sumSquaredNorm - 2.0 * v1.dot(v2)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index c96c94f70e..e300c3dbe1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -23,6 +23,7 @@ import org.jblas.DoubleMatrix
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
/**
@@ -58,7 +59,7 @@ object SVMDataGenerator {
}
val yD = new DoubleMatrix(1, x.length, x: _*).dot(trueWeights) + rnd.nextGaussian() * 0.1
val y = if (yD < 0) 0.0 else 1.0
- LabeledPoint(y, x)
+ LabeledPoint(y, Vectors.dense(x))
}
MLUtils.saveLabeledData(data, outputPath)
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
index 073ded6f36..c80b1134ed 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.classification;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.junit.After;
import org.junit.Assert;
@@ -45,12 +46,12 @@ public class JavaNaiveBayesSuite implements Serializable {
}
private static final List<LabeledPoint> POINTS = Arrays.asList(
- new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}),
- new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}),
- new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}),
- new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}),
- new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}),
- new LabeledPoint(2, new double[] {0.0, 0.0, 2.0})
+ new LabeledPoint(0, Vectors.dense(1.0, 0.0, 0.0)),
+ new LabeledPoint(0, Vectors.dense(2.0, 0.0, 0.0)),
+ new LabeledPoint(1, Vectors.dense(0.0, 1.0, 0.0)),
+ new LabeledPoint(1, Vectors.dense(0.0, 2.0, 0.0)),
+ new LabeledPoint(2, Vectors.dense(0.0, 0.0, 1.0)),
+ new LabeledPoint(2, Vectors.dense(0.0, 0.0, 2.0))
);
private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) {
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
index 117e5eaa8b..4701a5e545 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.mllib.classification;
-
import java.io.Serializable;
import java.util.List;
@@ -28,7 +27,6 @@ import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.mllib.regression.LabeledPoint;
public class JavaSVMSuite implements Serializable {
@@ -94,5 +92,4 @@ public class JavaSVMSuite implements Serializable {
int numAccurate = validatePrediction(validationData, model);
Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
}
-
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
index 2c4d795f96..c6d8425ffc 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg;
import java.io.Serializable;
-import com.google.common.collect.Lists;
-
import scala.Tuple2;
+import com.google.common.collect.Lists;
+
import org.junit.Test;
import static org.junit.Assert.*;
@@ -36,7 +36,7 @@ public class JavaVectorsSuite implements Serializable {
@Test
public void sparseArrayConstruction() {
- Vector v = Vectors.sparse(3, Lists.newArrayList(
+ Vector v = Vectors.sparse(3, Lists.<Tuple2<Integer, Double>>newArrayList(
new Tuple2<Integer, Double>(0, 2.0),
new Tuple2<Integer, Double>(2, 3.0)));
assertArrayEquals(new double[]{2.0, 0.0, 3.0}, v.toArray(), 0.0);
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
index f44b25cd44..f725924a2d 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
@@ -59,7 +59,7 @@ public class JavaLassoSuite implements Serializable {
@Test
public void runLassoUsingConstructor() {
int nPoints = 10000;
- double A = 2.0;
+ double A = 0.0;
double[] weights = {-1.5, 1.0e-2};
JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
@@ -80,7 +80,7 @@ public class JavaLassoSuite implements Serializable {
@Test
public void runLassoUsingStaticMethods() {
int nPoints = 10000;
- double A = 2.0;
+ double A = 0.0;
double[] weights = {-1.5, 1.0e-2};
JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
index 2fdd5fc8fd..03714ae7e4 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
@@ -55,30 +55,27 @@ public class JavaRidgeRegressionSuite implements Serializable {
return errorSum / validationData.size();
}
- List<LabeledPoint> generateRidgeData(int numPoints, int nfeatures, double eps) {
+ List<LabeledPoint> generateRidgeData(int numPoints, int numFeatures, double std) {
org.jblas.util.Random.seed(42);
// Pick weights as random values distributed uniformly in [-0.5, 0.5]
- DoubleMatrix w = DoubleMatrix.rand(nfeatures, 1).subi(0.5);
- // Set first two weights to eps
- w.put(0, 0, eps);
- w.put(1, 0, eps);
- return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, eps);
+ DoubleMatrix w = DoubleMatrix.rand(numFeatures, 1).subi(0.5);
+ return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, std);
}
@Test
public void runRidgeRegressionUsingConstructor() {
- int nexamples = 200;
- int nfeatures = 20;
- double eps = 10.0;
- List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+ int numExamples = 50;
+ int numFeatures = 20;
+ List<LabeledPoint> data = generateRidgeData(2*numExamples, numFeatures, 10.0);
- JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
- List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, numExamples));
+ List<LabeledPoint> validationData = data.subList(numExamples, 2 * numExamples);
RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD();
- ridgeSGDImpl.optimizer().setStepSize(1.0)
- .setRegParam(0.0)
- .setNumIterations(200);
+ ridgeSGDImpl.optimizer()
+ .setStepSize(1.0)
+ .setRegParam(0.0)
+ .setNumIterations(200);
RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd());
double unRegularizedErr = predictionError(validationData, model);
@@ -91,13 +88,12 @@ public class JavaRidgeRegressionSuite implements Serializable {
@Test
public void runRidgeRegressionUsingStaticMethods() {
- int nexamples = 200;
- int nfeatures = 20;
- double eps = 10.0;
- List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+ int numExamples = 50;
+ int numFeatures = 20;
+ List<LabeledPoint> data = generateRidgeData(2 * numExamples, numFeatures, 10.0);
- JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
- List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, numExamples));
+ List<LabeledPoint> validationData = data.subList(numExamples, 2 * numExamples);
RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.0);
double unRegularizedErr = predictionError(validationData, model);
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 05322b024d..1e03c9df82 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -20,11 +20,10 @@ package org.apache.spark.mllib.classification
import scala.util.Random
import scala.collection.JavaConversions._
-import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.LocalSparkContext
@@ -61,7 +60,7 @@ object LogisticRegressionSuite {
if (yVal > 0) 1 else 0
}
- val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
+ val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i)))))
testData
}
@@ -113,7 +112,7 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Shoul
val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42)
val initialB = -1.0
- val initialWeights = Array(initialB)
+ val initialWeights = Vectors.dense(initialB)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index 9dd6c79ee6..516895d042 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.mllib.classification
import scala.util.Random
-import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
+import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.LocalSparkContext
@@ -54,7 +54,7 @@ object NaiveBayesSuite {
if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
}
- LabeledPoint(y, xi)
+ LabeledPoint(y, Vectors.dense(xi))
}
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index bc7abb568a..dfacbfeee6 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.mllib.classification
import scala.util.Random
import scala.collection.JavaConversions._
-import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.jblas.DoubleMatrix
@@ -28,6 +27,7 @@ import org.jblas.DoubleMatrix
import org.apache.spark.SparkException
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
object SVMSuite {
@@ -54,7 +54,7 @@ object SVMSuite {
intercept + 0.01 * rnd.nextGaussian()
if (yD < 0) 0.0 else 1.0
}
- y.zip(x).map(p => LabeledPoint(p._1, p._2))
+ y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
}
}
@@ -110,7 +110,7 @@ class SVMSuite extends FunSuite with LocalSparkContext {
val initialB = -1.0
val initialC = -1.0
- val initialWeights = Array(initialB,initialC)
+ val initialWeights = Vectors.dense(initialB, initialC)
val testRDD = sc.parallelize(testData, 2)
testRDD.cache()
@@ -150,10 +150,10 @@ class SVMSuite extends FunSuite with LocalSparkContext {
}
intercept[SparkException] {
- val model = SVMWithSGD.train(testRDDInvalid, 100)
+ SVMWithSGD.train(testRDDInvalid, 100)
}
// Turning off data validation should not throw an exception
- val noValidationModel = new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
+ new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
index 631d0e2ad9..c4b433499a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.mllib.optimization
import scala.util.Random
import scala.collection.JavaConversions._
-import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
object GradientDescentSuite {
@@ -58,8 +57,7 @@ object GradientDescentSuite {
if (yVal > 0) 1 else 0
}
- val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
- testData
+ (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i))))
}
}
@@ -83,11 +81,11 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
// Add a extra variable consisting of all 1.0's for the intercept.
val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42)
val data = testData.map { case LabeledPoint(label, features) =>
- label -> Array(1.0, features: _*)
+ label -> Vectors.dense(1.0, features.toArray: _*)
}
val dataRDD = sc.parallelize(data, 2).cache()
- val initialWeightsWithIntercept = Array(1.0, initialWeights: _*)
+ val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*)
val (_, loss) = GradientDescent.runMiniBatchSGD(
dataRDD,
@@ -113,13 +111,13 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
// Add a extra variable consisting of all 1.0's for the intercept.
val testData = GradientDescentSuite.generateGDInput(2.0, -1.5, 10000, 42)
val data = testData.map { case LabeledPoint(label, features) =>
- label -> Array(1.0, features: _*)
+ label -> Vectors.dense(1.0, features.toArray: _*)
}
val dataRDD = sc.parallelize(data, 2).cache()
// Prepare non-zero weights
- val initialWeightsWithIntercept = Array(1.0, 0.5)
+ val initialWeightsWithIntercept = Vectors.dense(1.0, 0.5)
val regParam0 = 0
val (newWeights0, loss0) = GradientDescent.runMiniBatchSGD(
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index 2cebac943e..6aad9eb84e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
import org.scalatest.FunSuite
+import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
class LassoSuite extends FunSuite with LocalSparkContext {
@@ -33,29 +34,33 @@ class LassoSuite extends FunSuite with LocalSparkContext {
}
test("Lasso local random SGD") {
- val nPoints = 10000
+ val nPoints = 1000
val A = 2.0
val B = -1.5
val C = 1.0e-2
- val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
-
- val testRDD = sc.parallelize(testData, 2)
- testRDD.cache()
+ val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
+ .map { case LabeledPoint(label, features) =>
+ LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+ }
+ val testRDD = sc.parallelize(testData, 2).cache()
val ls = new LassoWithSGD()
- ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+ ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
val model = ls.run(testRDD)
-
val weight0 = model.weights(0)
val weight1 = model.weights(1)
- assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
- assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
- assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+ val weight2 = model.weights(2)
+ assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+ assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, -1.4]")
+ assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in [-0.001, 0.001]")
val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+ .map { case LabeledPoint(label, features) =>
+ LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+ }
val validationRDD = sc.parallelize(validationData, 2)
// Test prediction on RDD.
@@ -66,33 +71,39 @@ class LassoSuite extends FunSuite with LocalSparkContext {
}
test("Lasso local random SGD with initial weights") {
- val nPoints = 10000
+ val nPoints = 1000
val A = 2.0
val B = -1.5
val C = 1.0e-2
- val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
+ val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
+ .map { case LabeledPoint(label, features) =>
+ LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+ }
+ val initialA = -1.0
val initialB = -1.0
val initialC = -1.0
- val initialWeights = Array(initialB,initialC)
+ val initialWeights = Vectors.dense(initialA, initialB, initialC)
- val testRDD = sc.parallelize(testData, 2)
- testRDD.cache()
+ val testRDD = sc.parallelize(testData, 2).cache()
val ls = new LassoWithSGD()
- ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+ ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
val model = ls.run(testRDD, initialWeights)
-
val weight0 = model.weights(0)
val weight1 = model.weights(1)
- assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
- assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
- assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+ val weight2 = model.weights(2)
+ assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+ assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, -1.4]")
+ assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in [-0.001, 0.001]")
val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+ .map { case LabeledPoint(label, features) =>
+ LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+ }
val validationRDD = sc.parallelize(validationData,2)
// Test prediction on RDD.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 5d251bcbf3..2f7d30708c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
import org.scalatest.FunSuite
+import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
class LinearRegressionSuite extends FunSuite with LocalSparkContext {
@@ -40,11 +41,12 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
val model = linReg.run(testRDD)
-
assert(model.intercept >= 2.5 && model.intercept <= 3.5)
- assert(model.weights.length === 2)
- assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
- assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+ val weights = model.weights
+ assert(weights.size === 2)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(1) >= 9.0 && weights(1) <= 11.0)
val validationData = LinearDataGenerator.generateLinearInput(
3.0, Array(10.0, 10.0), 100, 17)
@@ -67,9 +69,11 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
val model = linReg.run(testRDD)
assert(model.intercept === 0.0)
- assert(model.weights.length === 2)
- assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
- assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+ val weights = model.weights
+ assert(weights.size === 2)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(1) >= 9.0 && weights(1) <= 11.0)
val validationData = LinearDataGenerator.generateLinearInput(
0.0, Array(10.0, 10.0), 100, 17)
@@ -81,4 +85,40 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
// Test prediction on Array.
validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
}
+
+ // Test if we can correctly learn Y = 10*X1 + 10*X10000
+ test("sparse linear regression without intercept") {
+ val denseRDD = sc.parallelize(
+ LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2)
+ val sparseRDD = denseRDD.map { case LabeledPoint(label, v) =>
+ val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+ LabeledPoint(label, sv)
+ }.cache()
+ val linReg = new LinearRegressionWithSGD().setIntercept(false)
+ linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+ val model = linReg.run(sparseRDD)
+
+ assert(model.intercept === 0.0)
+
+ val weights = model.weights
+ assert(weights.size === 10000)
+ assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+ assert(weights(9999) >= 9.0 && weights(9999) <= 11.0)
+
+ val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17)
+ val sparseValidationData = validationData.map { case LabeledPoint(label, v) =>
+ val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+ LabeledPoint(label, sv)
+ }
+ val sparseValidationRDD = sc.parallelize(sparseValidationData, 2)
+
+ // Test prediction on RDD.
+ validatePrediction(
+ model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData)
+
+ // Test prediction on Array.
+ validatePrediction(
+ sparseValidationData.map(row => model.predict(row.features)), sparseValidationData)
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index b2044ed0d8..f66fc6ea6c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -17,9 +17,10 @@
package org.apache.spark.mllib.regression
-import org.jblas.DoubleMatrix
import org.scalatest.FunSuite
+import org.jblas.DoubleMatrix
+
import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
@@ -30,22 +31,22 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
}.reduceLeft(_ + _) / predictions.size
}
- test("regularization with skewed weights") {
- val nexamples = 200
- val nfeatures = 20
- val eps = 10
+ test("ridge regression can help avoid overfitting") {
+
+ // For small number of examples and large variance of error distribution,
+ // ridge regression should give smaller generalization error that linear regression.
+
+ val numExamples = 50
+ val numFeatures = 20
org.jblas.util.Random.seed(42)
// Pick weights as random values distributed uniformly in [-0.5, 0.5]
- val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
- // Set first two weights to eps
- w.put(0, 0, eps)
- w.put(1, 0, eps)
+ val w = DoubleMatrix.rand(numFeatures, 1).subi(0.5)
// Use half of data for training and other half for validation
- val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps)
- val testData = data.take(nexamples)
- val validationData = data.takeRight(nexamples)
+ val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2 * numExamples, 42, 10.0)
+ val testData = data.take(numExamples)
+ val validationData = data.takeRight(numExamples)
val testRDD = sc.parallelize(testData, 2).cache()
val validationRDD = sc.parallelize(validationData, 2).cache()
@@ -67,7 +68,7 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
val ridgeErr = predictionError(
ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData)
- // Ridge CV-error should be lower than linear regression
+ // Ridge validation error should be lower than linear regression.
assert(ridgeErr < linearErr,
"ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")")
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
index 4349c7000a..350130c914 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.mllib.tree.model.Filter
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.mllib.linalg.Vectors
class DecisionTreeSuite extends FunSuite with BeforeAndAfterAll {
@@ -396,7 +397,7 @@ object DecisionTreeSuite {
def generateOrderedLabeledPointsWithLabel0(): Array[LabeledPoint] = {
val arr = new Array[LabeledPoint](1000)
for (i <- 0 until 1000){
- val lp = new LabeledPoint(0.0,Array(i.toDouble,1000.0-i))
+ val lp = new LabeledPoint(0.0, Vectors.dense(i.toDouble, 1000.0 - i))
arr(i) = lp
}
arr
@@ -405,7 +406,7 @@ object DecisionTreeSuite {
def generateOrderedLabeledPointsWithLabel1(): Array[LabeledPoint] = {
val arr = new Array[LabeledPoint](1000)
for (i <- 0 until 1000){
- val lp = new LabeledPoint(1.0,Array(i.toDouble,999.0-i))
+ val lp = new LabeledPoint(1.0, Vectors.dense(i.toDouble, 999.0 - i))
arr(i) = lp
}
arr
@@ -415,9 +416,9 @@ object DecisionTreeSuite {
val arr = new Array[LabeledPoint](1000)
for (i <- 0 until 1000){
if (i < 600){
- arr(i) = new LabeledPoint(1.0,Array(0.0,1.0))
+ arr(i) = new LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
} else {
- arr(i) = new LabeledPoint(0.0,Array(1.0,0.0))
+ arr(i) = new LabeledPoint(0.0, Vectors.dense(1.0, 0.0))
}
}
arr
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 60f053b381..27d41c7869 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -17,14 +17,20 @@
package org.apache.spark.mllib.util
+import java.io.File
+
import org.scalatest.FunSuite
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNorm,
squaredDistance => breezeSquaredDistance}
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils._
-class MLUtilsSuite extends FunSuite {
+class MLUtilsSuite extends FunSuite with LocalSparkContext {
test("epsilon computation") {
assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
@@ -49,4 +55,55 @@ class MLUtilsSuite extends FunSuite {
assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
}
}
+
+ test("compute stats") {
+ val data = Seq.fill(3)(Seq(
+ LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)),
+ LabeledPoint(0.0, Vectors.dense(3.0, 4.0, 5.0))
+ )).flatten
+ val rdd = sc.parallelize(data, 2)
+ val (meanLabel, mean, std) = MLUtils.computeStats(rdd, 3, 6)
+ assert(meanLabel === 0.5)
+ assert(mean === Vectors.dense(2.0, 3.0, 4.0))
+ assert(std === Vectors.dense(1.0, 1.0, 1.0))
+ }
+
+ test("loadLibSVMData") {
+ val lines =
+ """
+ |+1 1:1.0 3:2.0 5:3.0
+ |-1
+ |-1 2:4.0 4:5.0 6:6.0
+ """.stripMargin
+ val tempDir = Files.createTempDir()
+ val file = new File(tempDir.getPath, "part-00000")
+ Files.write(lines, file, Charsets.US_ASCII)
+ val path = tempDir.toURI.toString
+
+ val pointsWithNumFeatures = MLUtils.loadLibSVMData(sc, path, 6).collect()
+ val pointsWithoutNumFeatures = MLUtils.loadLibSVMData(sc, path).collect()
+
+ for (points <- Seq(pointsWithNumFeatures, pointsWithoutNumFeatures)) {
+ assert(points.length === 3)
+ assert(points(0).label === 1.0)
+ assert(points(0).features === Vectors.sparse(6, Seq((0, 1.0), (2, 2.0), (4, 3.0))))
+ assert(points(1).label == 0.0)
+ assert(points(1).features == Vectors.sparse(6, Seq()))
+ assert(points(2).label === 0.0)
+ assert(points(2).features === Vectors.sparse(6, Seq((1, 4.0), (3, 5.0), (5, 6.0))))
+ }
+
+ val multiclassPoints = MLUtils.loadLibSVMData(sc, path, MLUtils.multiclassLabelParser).collect()
+ assert(multiclassPoints.length === 3)
+ assert(multiclassPoints(0).label === 1.0)
+ assert(multiclassPoints(1).label === -1.0)
+ assert(multiclassPoints(2).label === -1.0)
+
+ try {
+ file.delete()
+ tempDir.delete()
+ } catch {
+ case t: Throwable =>
+ }
+ }
}
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 19b90dfd6e..d2f9cdb3f4 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -87,18 +87,19 @@ class NaiveBayesModel(object):
>>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
- 0
+ 0.0
>>> model.predict(array([1.0, 0.0]))
- 1
+ 1.0
"""
- def __init__(self, pi, theta):
+ def __init__(self, labels, pi, theta):
+ self.labels = labels
self.pi = pi
self.theta = theta
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return numpy.argmax(self.pi + dot(x, self.theta))
+ return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
class NaiveBayes(object):
@classmethod
@@ -122,7 +123,8 @@ class NaiveBayes(object):
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),
- _deserialize_double_matrix(ans[1]))
+ _deserialize_double_vector(ans[1]),
+ _deserialize_double_matrix(ans[2]))
def _test():