aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorFrank Dai <soulmachine@gmail.com>2014-01-14 15:29:17 +0800
committerFrank Dai <soulmachine@gmail.com>2014-01-14 15:29:17 +0800
commita3da468d8b99565a966745e09830eaa768a9c267 (patch)
tree1b81e437bbc68afbe494170d329f7fa9c43f065e /mllib
parentc2852cf42e0fa851b6708b6886b0d78ac5b697a6 (diff)
parentfdaabdc67387524ffb84354f87985f48bd31cf60 (diff)
downloadspark-a3da468d8b99565a966745e09830eaa768a9c267.tar.gz
spark-a3da468d8b99565a966745e09830eaa768a9c267.tar.bz2
spark-a3da468d8b99565a966745e09830eaa768a9c267.zip
Merge remote-tracking branch 'upstream/master' into code-style
Diffstat (limited to 'mllib')
-rw-r--r--mllib/data/sample_naive_bayes_data.txt6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala17
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala65
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala2
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java72
10 files changed, 168 insertions, 12 deletions
diff --git a/mllib/data/sample_naive_bayes_data.txt b/mllib/data/sample_naive_bayes_data.txt
new file mode 100644
index 0000000000..f874adbaf4
--- /dev/null
+++ b/mllib/data/sample_naive_bayes_data.txt
@@ -0,0 +1,6 @@
+0, 1 0 0
+0, 2 0 0
+1, 0 1 0
+1, 0 2 0
+2, 0 0 1
+2, 0 0 2
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 8520756a41..f3656f62c7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -182,6 +182,23 @@ class PythonMLLibAPI extends Serializable {
}
/**
+ * Java stub for NaiveBayes.train()
+ */
+ def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double)
+ : java.util.List[java.lang.Object] =
+ {
+ val data = dataBytesJRDD.rdd.map(xBytes => {
+ val x = deserializeDoubleVector(xBytes)
+ LabeledPoint(x(0), x.slice(1, x.length))
+ })
+ val model = NaiveBayes.train(data, lambda)
+ val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleVector(model.pi))
+ ret.add(serializeDoubleMatrix(model.theta))
+ ret
+ }
+
+ /**
* Java stub for Python mllib KMeans.train()
*/
def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index 50aede9c07..a481f52276 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -97,7 +97,7 @@ object LogisticRegressionWithSGD {
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
- * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
@@ -183,6 +183,8 @@ object LogisticRegressionWithSGD {
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
+ println("Weights: " + model.weights.mkString("[", ", ", "]"))
+ println("Intercept: " + model.intercept)
sc.stop()
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 524300d6ae..6539b2f339 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -21,17 +21,18 @@ import scala.collection.mutable
import org.jblas.DoubleMatrix
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.util.MLUtils
/**
* Model for Naive Bayes Classifiers.
*
* @param pi Log of class priors, whose dimension is C.
- * @param theta Log of class conditional probabilities, whose dimension is CXD.
+ * @param theta Log of class conditional probabilities, whose dimension is CxD.
*/
-class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]])
+class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
extends ClassificationModel with Serializable {
// Create a column vector that can be used for predictions
@@ -50,10 +51,21 @@ class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]])
/**
* Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
*
- * @param lambda The smooth parameter
+ * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
+ * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
+ * document classification. By making every vector a 0-1 vector, it can also be used as
+ * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*/
-class NaiveBayes private (val lambda: Double = 1.0)
- extends Serializable with Logging {
+class NaiveBayes private (var lambda: Double)
+ extends Serializable with Logging
+{
+ def this() = this(1.0)
+
+ /** Set the smoothing parameter. Default: 1.0. */
+ def setLambda(lambda: Double): NaiveBayes = {
+ this.lambda = lambda
+ this
+ }
/**
* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
@@ -106,14 +118,49 @@ object NaiveBayes {
*
* This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
* discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
- * document classification. By making every vector a 0-1 vector. it can also be used as
+ * document classification. By making every vector a 0-1 vector, it can also be used as
+ * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
+ *
+ * This version of the method uses a default smoothing parameter of 1.0.
+ *
+ * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
+ * vector or a count vector.
+ */
+ def train(input: RDD[LabeledPoint]): NaiveBayesModel = {
+ new NaiveBayes().run(input)
+ }
+
+ /**
+ * Trains a Naive Bayes model given an RDD of `(label, features)` pairs.
+ *
+ * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of
+ * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for
+ * document classification. By making every vector a 0-1 vector, it can also be used as
* Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
*
* @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency
* vector or a count vector.
- * @param lambda The smooth parameter
+ * @param lambda The smoothing parameter
*/
- def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = {
+ def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {
new NaiveBayes(lambda).run(input)
}
+
+ def main(args: Array[String]) {
+ if (args.length != 2 && args.length != 3) {
+ println("Usage: NaiveBayes <master> <input_dir> [<lambda>]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "NaiveBayes")
+ val data = MLUtils.loadLabeledData(sc, args(1))
+ val model = if (args.length == 2) {
+ NaiveBayes.train(data)
+ } else {
+ NaiveBayes.train(data, args(2).toDouble)
+ }
+ println("Pi: " + model.pi.mkString("[", ", ", "]"))
+ println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]"))
+
+ sc.stop()
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
index 831aa7612f..6dff29dfb4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
@@ -181,6 +181,8 @@ object SVMWithSGD {
val sc = new SparkContext(args(0), "SVM")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ println("Weights: " + model.weights.mkString("[", ", ", "]"))
+ println("Intercept: " + model.intercept)
sc.stop()
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 63240e24dc..1a18292fe3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -23,4 +23,8 @@ package org.apache.spark.mllib.regression
* @param label Label for this data point.
* @param features List of features for this data point.
*/
-case class LabeledPoint(val label: Double, val features: Array[Double])
+case class LabeledPoint(label: Double, features: Array[Double]) {
+ override def toString: String = {
+ "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]"))
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
index d959695325..7c41793722 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
@@ -121,7 +121,7 @@ object LassoWithSGD {
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param regParam Regularization parameter.
* @param miniBatchFraction Fraction of data to be used per iteration.
- * @param initialWeights Initial set of weights to be used. Array should be equal in size to
+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
@@ -205,6 +205,8 @@ object LassoWithSGD {
val sc = new SparkContext(args(0), "Lasso")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ println("Weights: " + model.weights.mkString("[", ", ", "]"))
+ println("Intercept: " + model.intercept)
sc.stop()
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
index 6aa63b0fac..df599fde76 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
@@ -162,6 +162,8 @@ object LinearRegressionWithSGD {
val sc = new SparkContext(args(0), "LinearRegression")
val data = MLUtils.loadLabeledData(sc, args(1))
val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
+ println("Weights: " + model.weights.mkString("[", ", ", "]"))
+ println("Intercept: " + model.intercept)
sc.stop()
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
index 41b80cc3fc..0c0e67fb7b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
@@ -208,6 +208,8 @@ object RidgeRegressionWithSGD {
val data = MLUtils.loadLabeledData(sc, args(1))
val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble,
args(3).toDouble)
+ println("Weights: " + model.weights.mkString("[", ", ", "]"))
+ println("Intercept: " + model.intercept)
sc.stop()
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
new file mode 100644
index 0000000000..23ea3548b9
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
@@ -0,0 +1,72 @@
+package org.apache.spark.mllib.classification;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.regression.LabeledPoint;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+public class JavaNaiveBayesSuite implements Serializable {
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaNaiveBayesSuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ private static final List<LabeledPoint> POINTS = Arrays.asList(
+ new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}),
+ new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}),
+ new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}),
+ new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}),
+ new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}),
+ new LabeledPoint(2, new double[] {0.0, 0.0, 2.0})
+ );
+
+ private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) {
+ int correct = 0;
+ for (LabeledPoint p: points) {
+ if (model.predict(p.features()) == p.label()) {
+ correct += 1;
+ }
+ }
+ return correct;
+ }
+
+ @Test
+ public void runUsingConstructor() {
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache();
+
+ NaiveBayes nb = new NaiveBayes().setLambda(1.0);
+ NaiveBayesModel model = nb.run(testRDD.rdd());
+
+ int numAccurate = validatePrediction(POINTS, model);
+ Assert.assertEquals(POINTS.size(), numAccurate);
+ }
+
+ @Test
+ public void runUsingStaticMethods() {
+ JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache();
+
+ NaiveBayesModel model1 = NaiveBayes.train(testRDD.rdd());
+ int numAccurate1 = validatePrediction(POINTS, model1);
+ Assert.assertEquals(POINTS.size(), numAccurate1);
+
+ NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5);
+ int numAccurate2 = validatePrediction(POINTS, model2);
+ Assert.assertEquals(POINTS.size(), numAccurate2);
+ }
+}