diff options
20 files changed, 297 insertions, 58 deletions
diff --git a/docs/_config.yml b/docs/_config.yml index 11d18f0ac2..ce0fdf5fb4 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -5,6 +5,6 @@ markdown: kramdown # of Spark, Scala, and Mesos. SPARK_VERSION: 0.9.0-incubating-SNAPSHOT SPARK_VERSION_SHORT: 0.9.0 -SCALA_VERSION: 2.10 +SCALA_VERSION: "2.10" MESOS_VERSION: 0.13.0 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 45ee166688..1a5c640d10 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -21,6 +21,8 @@ depends on native Fortran routines. You may need to install the if it is not already present on your nodes. MLlib will throw a linking error if it cannot detect these libraries automatically. +To use MLlib in Python, you will also need [NumPy](http://www.numpy.org) version 1.7 or newer. + # Binary Classification Binary classification is a supervised learning problem in which we want to @@ -316,6 +318,13 @@ other signals), you can use the trainImplicit method to get better results. val model = ALS.trainImplicit(ratings, 1, 20, 0.01) {% endhighlight %} +# Using MLLib in Java + +All of MLlib's methods use Java-friendly types, so you can import and call them there the same +way you do in Scala. The only caveat is that the methods take Scala RDD objects, while the +Spark Java API uses a separate `JavaRDD` class. You can convert a Java RDD to a Scala one by +calling `.rdd()` on your `JavaRDD` object. + # Using MLLib in Python Following examples can be tested in the PySpark shell. @@ -330,7 +339,7 @@ from numpy import array # Load and parse the data data = sc.textFile("mllib/data/sample_svm_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) -model = LogisticRegressionWithSGD.train(sc, parsedData) +model = LogisticRegressionWithSGD.train(parsedData) # Build the model labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)), @@ -356,7 +365,7 @@ data = sc.textFile("mllib/data/ridge-data/lpsa.data") parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')])) # Build the model -model = LinearRegressionWithSGD.train(sc, parsedData) +model = LinearRegressionWithSGD.train(parsedData) # Evaluate the model on training data valuesAndPreds = parsedData.map(lambda point: (point.item(0), @@ -382,7 +391,7 @@ data = sc.textFile("kmeans_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) # Build the model (cluster the data) -clusters = KMeans.train(sc, parsedData, 2, maxIterations=10, +clusters = KMeans.train(parsedData, 2, maxIterations=10, runs=30, initialization_mode="random") # Evaluate clustering by computing Within Set Sum of Squared Errors @@ -411,7 +420,7 @@ data = sc.textFile("mllib/data/als/test.data") ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) # Build the recommendation model using Alternating Least Squares -model = ALS.train(sc, ratings, 1, 20) +model = ALS.train(ratings, 1, 20) # Evaluate the model on training data testdata = ratings.map(lambda p: (int(p[0]), int(p[1]))) @@ -426,5 +435,5 @@ signals), you can use the trainImplicit method to get better results. {% highlight python %} # Build the recommendation model using Alternating Least Squares based on implicit ratings -model = ALS.trainImplicit(sc, ratings, 1, 20) +model = ALS.trainImplicit(ratings, 1, 20) {% endhighlight %} diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index c4236f8312..b07899c2e1 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -52,7 +52,7 @@ In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` # Installing and Configuring PySpark -PySpark requires Python 2.6 or higher. +PySpark requires Python 2.7 or higher. PySpark applications are executed using a standard CPython interpreter in order to support Python modules that use C extensions. We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/). @@ -149,6 +149,12 @@ sc = SparkContext(conf = conf) [API documentation](api/pyspark/index.html) for PySpark is available as Epydoc. Many of the methods also contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples. +# Libraries + +[MLlib](mllib-guide.html) is also available in PySpark. To use it, you'll need +[NumPy](http://www.numpy.org) version 1.7 or newer. The [MLlib guide](mllib-guide.html) contains +some example applications. + # Where to Go from Here PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples). diff --git a/mllib/data/sample_naive_bayes_data.txt b/mllib/data/sample_naive_bayes_data.txt new file mode 100644 index 0000000000..f874adbaf4 --- /dev/null +++ b/mllib/data/sample_naive_bayes_data.txt @@ -0,0 +1,6 @@ +0, 1 0 0 +0, 2 0 0 +1, 0 1 0 +1, 0 2 0 +2, 0 0 1 +2, 0 0 2 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index c972a71349..3fec1a909d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -184,6 +184,23 @@ class PythonMLLibAPI extends Serializable { } /** + * Java stub for NaiveBayes.train() + */ + def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double) + : java.util.List[java.lang.Object] = + { + val data = dataBytesJRDD.rdd.map(xBytes => { + val x = deserializeDoubleVector(xBytes) + LabeledPoint(x(0), x.slice(1, x.length)) + }) + val model = NaiveBayes.train(data, lambda) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(serializeDoubleVector(model.pi)) + ret.add(serializeDoubleMatrix(model.theta)) + ret + } + + /** * Java stub for Python mllib KMeans.train() */ def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 50aede9c07..a481f52276 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -97,7 +97,7 @@ object LogisticRegressionWithSGD { * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -183,6 +183,8 @@ object LogisticRegressionWithSGD { val sc = new SparkContext(args(0), "LogisticRegression") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 524300d6ae..6539b2f339 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -21,17 +21,18 @@ import scala.collection.mutable import org.jblas.DoubleMatrix -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.util.MLUtils /** * Model for Naive Bayes Classifiers. * * @param pi Log of class priors, whose dimension is C. - * @param theta Log of class conditional probabilities, whose dimension is CXD. + * @param theta Log of class conditional probabilities, whose dimension is CxD. */ -class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) +class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]]) extends ClassificationModel with Serializable { // Create a column vector that can be used for predictions @@ -50,10 +51,21 @@ class NaiveBayesModel(pi: Array[Double], theta: Array[Array[Double]]) /** * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. * - * @param lambda The smooth parameter + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). */ -class NaiveBayes private (val lambda: Double = 1.0) - extends Serializable with Logging { +class NaiveBayes private (var lambda: Double) + extends Serializable with Logging +{ + def this() = this(1.0) + + /** Set the smoothing parameter. Default: 1.0. */ + def setLambda(lambda: Double): NaiveBayes = { + this.lambda = lambda + this + } /** * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries. @@ -106,14 +118,49 @@ object NaiveBayes { * * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for - * document classification. By making every vector a 0-1 vector. it can also be used as + * document classification. By making every vector a 0-1 vector, it can also be used as + * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). + * + * This version of the method uses a default smoothing parameter of 1.0. + * + * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency + * vector or a count vector. + */ + def train(input: RDD[LabeledPoint]): NaiveBayesModel = { + new NaiveBayes().run(input) + } + + /** + * Trains a Naive Bayes model given an RDD of `(label, features)` pairs. + * + * This is the Multinomial NB ([[http://tinyurl.com/lsdw6p]]) which can handle all kinds of + * discrete data. For example, by converting documents into TF-IDF vectors, it can be used for + * document classification. By making every vector a 0-1 vector, it can also be used as * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]). * * @param input RDD of `(label, array of features)` pairs. Every vector should be a frequency * vector or a count vector. - * @param lambda The smooth parameter + * @param lambda The smoothing parameter */ - def train(input: RDD[LabeledPoint], lambda: Double = 1.0): NaiveBayesModel = { + def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { new NaiveBayes(lambda).run(input) } + + def main(args: Array[String]) { + if (args.length != 2 && args.length != 3) { + println("Usage: NaiveBayes <master> <input_dir> [<lambda>]") + System.exit(1) + } + val sc = new SparkContext(args(0), "NaiveBayes") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = if (args.length == 2) { + NaiveBayes.train(data) + } else { + NaiveBayes.train(data, args(2).toDouble) + } + println("Pi: " + model.pi.mkString("[", ", ", "]")) + println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]")) + + sc.stop() + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index 3b8f8550d0..f2964ea446 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -183,6 +183,8 @@ object SVMWithSGD { val sc = new SparkContext(args(0), "SVM") val data = MLUtils.loadLabeledData(sc, args(1)) val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 63240e24dc..1a18292fe3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -23,4 +23,8 @@ package org.apache.spark.mllib.regression * @param label Label for this data point. * @param features List of features for this data point. */ -case class LabeledPoint(val label: Double, val features: Array[Double]) +case class LabeledPoint(label: Double, features: Array[Double]) { + override def toString: String = { + "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]")) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala index d959695325..7c41793722 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala @@ -121,7 +121,7 @@ object LassoWithSGD { * @param stepSize Step size to be used for each iteration of gradient descent. * @param regParam Regularization parameter. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -205,6 +205,8 @@ object LassoWithSGD { val sc = new SparkContext(args(0), "Lasso") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala index 597d55e0bb..fe5cce064b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -162,6 +162,8 @@ object LinearRegressionWithSGD { val sc = new SparkContext(args(0), "LinearRegression") val data = MLUtils.loadLabeledData(sc, args(1)) val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala index b29508d2b9..c125c6797a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala @@ -122,7 +122,7 @@ object RidgeRegressionWithSGD { * @param stepSize Step size to be used for each iteration of gradient descent. * @param regParam Regularization parameter. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * @param initialWeights Initial set of weights to be used. Array should be equal in size to * the number of features in the data. */ def train( @@ -208,6 +208,8 @@ object RidgeRegressionWithSGD { val data = MLUtils.loadLabeledData(sc, args(1)) val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + println("Weights: " + model.weights.mkString("[", ", ", "]")) + println("Intercept: " + model.intercept) sc.stop() } diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java new file mode 100644 index 0000000000..23ea3548b9 --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -0,0 +1,72 @@ +package org.apache.spark.mllib.classification; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +public class JavaNaiveBayesSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaNaiveBayesSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + System.clearProperty("spark.driver.port"); + } + + private static final List<LabeledPoint> POINTS = Arrays.asList( + new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}), + new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}), + new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}), + new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}), + new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}), + new LabeledPoint(2, new double[] {0.0, 0.0, 2.0}) + ); + + private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) { + int correct = 0; + for (LabeledPoint p: points) { + if (model.predict(p.features()) == p.label()) { + correct += 1; + } + } + return correct; + } + + @Test + public void runUsingConstructor() { + JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache(); + + NaiveBayes nb = new NaiveBayes().setLambda(1.0); + NaiveBayesModel model = nb.run(testRDD.rdd()); + + int numAccurate = validatePrediction(POINTS, model); + Assert.assertEquals(POINTS.size(), numAccurate); + } + + @Test + public void runUsingStaticMethods() { + JavaRDD<LabeledPoint> testRDD = sc.parallelize(POINTS, 2).cache(); + + NaiveBayesModel model1 = NaiveBayes.train(testRDD.rdd()); + int numAccurate1 = validatePrediction(POINTS, model1); + Assert.assertEquals(POINTS.size(), numAccurate1); + + NaiveBayesModel model2 = NaiveBayes.train(testRDD.rdd(), 0.5); + int numAccurate2 = validatePrediction(POINTS, model2); + Assert.assertEquals(POINTS.size(), numAccurate2); + } +} diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 769d88dfb9..20a0e309d1 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -16,7 +16,7 @@ # from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape -from pyspark import SparkContext +from pyspark import SparkContext, RDD from pyspark.serializers import Serializer import struct diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 70de332d34..19b90dfd6e 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -15,6 +15,8 @@ # limitations under the License. # +import numpy + from numpy import array, dot, shape from pyspark import SparkContext from pyspark.mllib._common import \ @@ -29,8 +31,8 @@ class LogisticRegressionModel(LinearModel): """A linear binary classification model derived from logistic regression. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data)) - >>> lrm.predict(array([1.0])) != None + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) + >>> lrm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -41,20 +43,21 @@ class LogisticRegressionModel(LinearModel): class LogisticRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, - mini_batch_fraction=1.0, initial_weights=None): + def train(cls, data, iterations=100, step=1.0, + miniBatchFraction=1.0, initialWeights=None): """Train a logistic regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd, - iterations, step, mini_batch_fraction, i), - LogisticRegressionModel, data, initial_weights) + iterations, step, miniBatchFraction, i), + LogisticRegressionModel, data, initialWeights) class SVMModel(LinearModel): """A support vector machine. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> svm = SVMWithSGD.train(sc, sc.parallelize(data)) - >>> svm.predict(array([1.0])) != None + >>> svm = SVMWithSGD.train(sc.parallelize(data)) + >>> svm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -64,13 +67,63 @@ class SVMModel(LinearModel): class SVMWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, - mini_batch_fraction=1.0, initial_weights=None): + def train(cls, data, iterations=100, step=1.0, regParam=1.0, + miniBatchFraction=1.0, initialWeights=None): """Train a support vector machine on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd, - iterations, step, reg_param, mini_batch_fraction, i), - SVMModel, data, initial_weights) + iterations, step, regParam, miniBatchFraction, i), + SVMModel, data, initialWeights) + +class NaiveBayesModel(object): + """ + Model for Naive Bayes classifiers. + + Contains two parameters: + - pi: vector of logs of class priors (dimension C) + - theta: matrix of logs of class conditional probabilities (CxD) + + >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3) + >>> model = NaiveBayes.train(sc.parallelize(data)) + >>> model.predict(array([0.0, 1.0])) + 0 + >>> model.predict(array([1.0, 0.0])) + 1 + """ + + def __init__(self, pi, theta): + self.pi = pi + self.theta = theta + + def predict(self, x): + """Return the most likely class for a data vector x""" + return numpy.argmax(self.pi + dot(x, self.theta)) + +class NaiveBayes(object): + @classmethod + def train(cls, data, lambda_=1.0): + """ + Train a Naive Bayes model given an RDD of (label, features) vectors. + + This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can + handle all kinds of discrete data. For example, by converting + documents into TF-IDF vectors, it can be used for document + classification. By making every vector a 0-1 vector, it can also be + used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). + + @param data: RDD of NumPy vectors, one per element, where the first + coordinate is the label and the rest is the feature vector + (e.g. a count vector). + @param lambda_: The smoothing parameter + """ + sc = data.context + dataBytes = _get_unmangled_double_vector_rdd(data) + ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) + return NaiveBayesModel( + _deserialize_double_vector(ans[0]), + _deserialize_double_matrix(ans[1])) + def _test(): import doctest diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 8cf20e591a..30862918c3 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -28,12 +28,12 @@ class KMeansModel(object): """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random") + >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0])) True >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0])) True - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2) + >>> clusters = KMeans.train(sc.parallelize(data), 2) """ def __init__(self, centers_): self.centers = centers_ @@ -52,12 +52,13 @@ class KMeansModel(object): class KMeans(object): @classmethod - def train(cls, sc, data, k, maxIterations=100, runs=1, - initialization_mode="k-means||"): + def train(cls, data, k, maxIterations=100, runs=1, + initializationMode="k-means||"): """Train a k-means clustering model.""" + sc = data.context dataBytes = _get_unmangled_double_vector_rdd(data) ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd, - k, maxIterations, runs, initialization_mode) + k, maxIterations, runs, initializationMode) if len(ans) != 1: raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 0eeb5bb66b..f4a83f0209 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -32,11 +32,11 @@ class MatrixFactorizationModel(object): >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> ratings = sc.parallelize([r1, r2, r3]) - >>> model = ALS.trainImplicit(sc, ratings, 1) + >>> model = ALS.trainImplicit(ratings, 1) >>> model.predict(2,2) is not None True >>> testset = sc.parallelize([(1, 2), (1, 1)]) - >>> model.predictAll(testset).count == 2 + >>> model.predictAll(testset).count() == 2 True """ @@ -57,14 +57,16 @@ class MatrixFactorizationModel(object): class ALS(object): @classmethod - def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks) return MatrixFactorizationModel(sc, mod) @classmethod - def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index a3a68b29e0..7656db07f6 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -47,54 +47,57 @@ class LinearRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) """ class LinearRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, - mini_batch_fraction=1.0, initial_weights=None): + def train(cls, data, iterations=100, step=1.0, + miniBatchFraction=1.0, initialWeights=None): """Train a linear regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( - d._jrdd, iterations, step, mini_batch_fraction, i), - LinearRegressionModel, data, initial_weights) + d._jrdd, iterations, step, miniBatchFraction, i), + LinearRegressionModel, data, initialWeights) class LassoModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit with an l_1 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) """ - + class LassoWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, - mini_batch_fraction=1.0, initial_weights=None): + def train(cls, data, iterations=100, step=1.0, regParam=1.0, + miniBatchFraction=1.0, initialWeights=None): """Train a Lasso regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd, - iterations, step, reg_param, mini_batch_fraction, i), - LassoModel, data, initial_weights) + iterations, step, regParam, miniBatchFraction, i), + LassoModel, data, initialWeights) class RidgeRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit with an l_2 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) """ class RidgeRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, - mini_batch_fraction=1.0, initial_weights=None): + def train(cls, data, iterations=100, step=1.0, regParam=1.0, + miniBatchFraction=1.0, initialWeights=None): """Train a ridge regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd, - iterations, step, reg_param, mini_batch_fraction, i), - RidgeRegressionModel, data, initial_weights) + iterations, step, regParam, miniBatchFraction, i), + RidgeRegressionModel, data, initialWeights) def _test(): import doctest diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index f2b3f3c142..d77981f61f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -76,6 +76,10 @@ def main(infile, outfile): iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) except Exception as e: + # Write the error to stderr in addition to trying to passi t back to + # Java, in case it happened while serializing a record + print >> sys.stderr, "PySpark worker failed with exception:" + print >> sys.stderr, traceback.format_exc() write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) write_with_length(traceback.format_exc(), outfile) sys.exit(-1) diff --git a/python/run-tests b/python/run-tests index feba97cee0..2005f610b4 100755 --- a/python/run-tests +++ b/python/run-tests @@ -40,6 +40,11 @@ run_test "-m doctest pyspark/broadcast.py" run_test "-m doctest pyspark/accumulators.py" run_test "-m doctest pyspark/serializers.py" run_test "pyspark/tests.py" +#run_test "pyspark/mllib/_common.py" +#run_test "pyspark/mllib/classification.py" +#run_test "pyspark/mllib/clustering.py" +#run_test "pyspark/mllib/recommendation.py" +#run_test "pyspark/mllib/regression.py" if [[ $FAILED != 0 ]]; then echo -en "\033[31m" # Red |