diff options
Diffstat (limited to 'python/pyspark/mllib')
-rw-r--r-- | python/pyspark/mllib/_common.py | 2 | ||||
-rw-r--r-- | python/pyspark/mllib/classification.py | 65 | ||||
-rw-r--r-- | python/pyspark/mllib/clustering.py | 11 | ||||
-rw-r--r-- | python/pyspark/mllib/recommendation.py | 10 | ||||
-rw-r--r-- | python/pyspark/mllib/regression.py | 17 |
5 files changed, 82 insertions, 23 deletions
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 769d88dfb9..20a0e309d1 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -16,7 +16,7 @@ # from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape -from pyspark import SparkContext +from pyspark import SparkContext, RDD from pyspark.serializers import Serializer import struct diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 70de332d34..03ff5a572e 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -15,6 +15,8 @@ # limitations under the License. # +import numpy + from numpy import array, dot, shape from pyspark import SparkContext from pyspark.mllib._common import \ @@ -29,8 +31,8 @@ class LogisticRegressionModel(LinearModel): """A linear binary classification model derived from logistic regression. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data)) - >>> lrm.predict(array([1.0])) != None + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) + >>> lrm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -41,9 +43,10 @@ class LogisticRegressionModel(LinearModel): class LogisticRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, + def train(cls, data, iterations=100, step=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a logistic regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd, iterations, step, mini_batch_fraction, i), @@ -53,8 +56,8 @@ class SVMModel(LinearModel): """A support vector machine. >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2) - >>> svm = SVMWithSGD.train(sc, sc.parallelize(data)) - >>> svm.predict(array([1.0])) != None + >>> svm = SVMWithSGD.train(sc.parallelize(data)) + >>> svm.predict(array([1.0])) > 0 True """ def predict(self, x): @@ -64,14 +67,64 @@ class SVMModel(LinearModel): class SVMWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a support vector machine on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), SVMModel, data, initial_weights) +class NaiveBayesModel(object): + """ + Model for Naive Bayes classifiers. + + Contains two parameters: + - pi: vector of logs of class priors (dimension C) + - theta: matrix of logs of class conditional probabilities (CxD) + + >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3) + >>> model = NaiveBayes.train(sc.parallelize(data)) + >>> model.predict(array([0.0, 1.0])) + 0 + >>> model.predict(array([1.0, 0.0])) + 1 + """ + + def __init__(self, pi, theta): + self.pi = pi + self.theta = theta + + def predict(self, x): + """Return the most likely class for a data vector x""" + return numpy.argmax(self.pi + dot(x, self.theta)) + +class NaiveBayes(object): + @classmethod + def train(cls, data, lambda_=1.0): + """ + Train a Naive Bayes model given an RDD of (label, features) vectors. + + This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can + handle all kinds of discrete data. For example, by converting + documents into TF-IDF vectors, it can be used for document + classification. By making every vector a 0-1 vector, it can also be + used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). + + @param data: RDD of NumPy vectors, one per element, where the first + coordinate is the label and the rest is the feature vector + (e.g. a count vector). + @param lambda_: The smoothing parameter + """ + sc = data.context + dataBytes = _get_unmangled_double_vector_rdd(data) + ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) + return NaiveBayesModel( + _deserialize_double_vector(ans[0]), + _deserialize_double_matrix(ans[1])) + + def _test(): import doctest globs = globals().copy() diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 8cf20e591a..30862918c3 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -28,12 +28,12 @@ class KMeansModel(object): """A clustering model derived from the k-means method. >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random") + >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0])) True >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0])) True - >>> clusters = KMeans.train(sc, sc.parallelize(data), 2) + >>> clusters = KMeans.train(sc.parallelize(data), 2) """ def __init__(self, centers_): self.centers = centers_ @@ -52,12 +52,13 @@ class KMeansModel(object): class KMeans(object): @classmethod - def train(cls, sc, data, k, maxIterations=100, runs=1, - initialization_mode="k-means||"): + def train(cls, data, k, maxIterations=100, runs=1, + initializationMode="k-means||"): """Train a k-means clustering model.""" + sc = data.context dataBytes = _get_unmangled_double_vector_rdd(data) ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd, - k, maxIterations, runs, initialization_mode) + k, maxIterations, runs, initializationMode) if len(ans) != 1: raise RuntimeError("JVM call result had unexpected length") elif type(ans[0]) != bytearray: diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 0eeb5bb66b..f4a83f0209 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -32,11 +32,11 @@ class MatrixFactorizationModel(object): >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> ratings = sc.parallelize([r1, r2, r3]) - >>> model = ALS.trainImplicit(sc, ratings, 1) + >>> model = ALS.trainImplicit(ratings, 1) >>> model.predict(2,2) is not None True >>> testset = sc.parallelize([(1, 2), (1, 1)]) - >>> model.predictAll(testset).count == 2 + >>> model.predictAll(testset).count() == 2 True """ @@ -57,14 +57,16 @@ class MatrixFactorizationModel(object): class ALS(object): @classmethod - def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks) return MatrixFactorizationModel(sc, mod) @classmethod - def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01): + sc = ratings.context ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating) mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index a3a68b29e0..e90b72893f 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -47,14 +47,15 @@ class LinearRegressionModel(LinearRegressionModelBase): """A linear regression model derived from a least-squares fit. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ class LinearRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, + def train(cls, data, iterations=100, step=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a linear regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( d._jrdd, iterations, step, mini_batch_fraction, i), @@ -65,14 +66,15 @@ class LassoModel(LinearRegressionModelBase): l_1 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = LassoWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ - + class LassoWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a Lasso regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), @@ -83,14 +85,15 @@ class RidgeRegressionModel(LinearRegressionModelBase): l_2 penalty term. >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2) - >>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0])) + >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0])) """ class RidgeRegressionWithSGD(object): @classmethod - def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0, + def train(cls, data, iterations=100, step=1.0, reg_param=1.0, mini_batch_fraction=1.0, initial_weights=None): """Train a ridge regression model on the given data.""" + sc = data.context return _regression_train_wrapper(sc, lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd, iterations, step, reg_param, mini_batch_fraction, i), |