diff options
author | Frank Dai <soulmachine@gmail.com> | 2014-01-14 15:29:17 +0800 |
---|---|---|
committer | Frank Dai <soulmachine@gmail.com> | 2014-01-14 15:29:17 +0800 |
commit | a3da468d8b99565a966745e09830eaa768a9c267 (patch) | |
tree | 1b81e437bbc68afbe494170d329f7fa9c43f065e /mllib | |
parent | c2852cf42e0fa851b6708b6886b0d78ac5b697a6 (diff) | |
parent | fdaabdc67387524ffb84354f87985f48bd31cf60 (diff) | |
download | spark-a3da468d8b99565a966745e09830eaa768a9c267.tar.gz spark-a3da468d8b99565a966745e09830eaa768a9c267.tar.bz2 spark-a3da468d8b99565a966745e09830eaa768a9c267.zip |
Merge remote-tracking branch 'upstream/master' into code-style
Diffstat (limited to 'mllib')
10 files changed, 168 insertions, 12 deletions
diff --git a/mllib/data/sample_naive_bayes_data.txt b/mllib/data/sample_naive_bayes_data.txt new file mode 100644 index 0000000000..f874adbaf4 --- /dev/null +++ b/mllib/data/sample_naive_bayes_data.txt @@ -0,0 +1,6 @@ +0, 1 0 0 +0, 2 0 0 +1, 0 1 0 +1, 0 2 0 +2, 0 0 1 +2, 0 0 2 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 8520756a41..f3656f62c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -182,6 +182,23 @@ class PythonMLLibAPI extends Serializable { } /** + * Java stub for NaiveBayes.train() + */ + def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double) + : java.util.List[java.lang.Object] = + { + val data = dataBytesJRDD.rdd.map(xBytes => { + val x = deserializeDoubleVector(xBytes) + LabeledPoint(x(0), x.slice(1, x.length)) + }) + val model = NaiveBayes.train(data, lambda) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(serializeDoubleVector(model.pi)) + ret.add(serializeDoubleMatrix(model.theta)) + ret + } + + /** * Java stub for Python mllib KMeans.train() */ def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 50aede9c07..a481f52276 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -97,7 +97,7 @@ object LogisticRegressionWithSGD { * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -183,6 +183,8 @@ object LogisticRegressionWithSGD { val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 524300d6ae..6539b2f339 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -21,17 +21,18 @@ import scala.collection.mutable import org.jblas.DoubleMatrix -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.util.MLUtils /** * Model for Naive Bayes Classifiers. * * @param pi Log of class priors, whose dimension is C. - * @param theta Log of class conditional probabilities, whose dimension is CXD. + * @param theta Log of class conditional probabilities, whose dimension is CxD. */ -class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) +class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions @@ -50,10 +51,21 @@ class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * - * @param lambda The smooth parameter + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). */ -class NaiveBayes private (val lambda: Double = 1.0) - extends Serializable with Logging { +class NaiveBayes private (var lambda: Double) + extends Serializable with Logging +{ + def this() = this(1.0) + + /** Set the smoothing parameter. Default: 1.0. */ + def setLambda(lambda: Double): NaiveBayes = { + this.lambda = lambda + this + } /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. @@ -106,14 +118,49 @@ object NaiveBayes { * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for - * document classification. By making every vector a 0-1 vector. it can also be used as + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * This version of the method uses a default smoothing parameter of 1.0. + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + */ + def train(input: RDD[LabeledPoint]): NaiveBayesModel = { + new NaiveBayes().run(input) + } + + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. - * @param lambda The smooth parameter + * @param lambda The smoothing parameter */ - def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } + + def main(args: Array[String]) { + if (args.length != 2 && args.length != 3) { + println("Usage: NaiveBayes <master> <input_dir> [<lambda>]") + System.exit(1) + } + val sc = new SparkContext(args(0), "NaiveBayes") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = if (args.length == 2) { + NaiveBayes.train(data) + } else { + NaiveBayes.train(data, args(2).toDouble) + } + println("Pi: " + model.pi.mkString("[", ", ", "]")) + println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]")) + + sc.stop() + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index 831aa7612f..6dff29dfb4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -181,6 +181,8 @@ object SVMWithSGD { val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)) val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 63240e24dc..1a18292fe3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -23,4 +23,8 @@ package org.apache.spark.mllib.regression * @param label Label for this data point. * @param features List of features for this data point. */ -case class LabeledPoint(val label: Double, val features: Array[Double]) +case class LabeledPoint(label: Double, features: Array[Double]) { + override def toString: String = { + "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]")) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala index d959695325..7c41793722 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala @@ -121,7 +121,7 @@ object LassoWithSGD { * @param stepSize Step size to be used for each iteration of gradient descent. * @param regParam Regularization parameter. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -205,6 +205,8 @@ object LassoWithSGD { val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala index 6aa63b0fac..df599fde76 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -162,6 +162,8 @@ object LinearRegressionWithSGD { val sc = new SparkContext(args(0), "LinearRegression") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala index 41b80cc3fc..0c0e67fb7b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala @@ -208,6 +208,8 @@ object RidgeRegressionWithSGD { val data = MLUtils.loadLabeledData(sc, args(1)) val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java new file mode 100644 index 0000000000..23ea3548b9 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -0,0 +1,72 @@ +package org.apache.spark.mllib.classification; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +public class JavaNaiveBayesSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaNaiveBayesSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + private static final List<LabeledPoint> POINTS = Arrays.asList( + new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}), + new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}), + new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}), + new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}), + new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}), + new LabeledPoint(2, new double[] {0.0, 0.0, 2.0}) + ); + + private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) { + int correct = 0; + for (LabeledPoint p: points) { + if (model.predict(p.features()) == p.label()) { + correct += 1; + } + } + return correct; + } + + @Test + public void runUsingConstructor() { + JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache(); + + NaiveBayes nb = new NaiveBayes().setLambda(1.0); + NaiveBayesModel model = nb.run(testRDD.rdd()); + + int numAccurate = validatePrediction(POINTS, model); + Assert.assertEquals(POINTS.size(), numAccurate); + } + + @Test + public void runUsingStaticMethods() { + JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache(); + + NaiveBayesModel model1 = NaiveBayes.train(testRDD.rdd()); + int numAccurate1 = validatePrediction(POINTS, model1); + Assert.assertEquals(POINTS.size(), numAccurate1); + + NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5); + int numAccurate2 = validatePrediction(POINTS, model2); + Assert.assertEquals(POINTS.size(), numAccurate2); + } +} |