aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-01-09 23:55:06 -0800
committerMatei Zaharia <matei@databricks.com>2014-01-11 22:30:48 -0800
commit9a0dfdf868187fb9a2e1656e4cf5f29d952ce5db (patch)
tree82e54a0b5c7f502893c2f6bdd96aba6f04147707 /python
parent288a878999848adb130041d1e40c14bfc879cec6 (diff)
downloadspark-9a0dfdf868187fb9a2e1656e4cf5f29d952ce5db.tar.gz
spark-9a0dfdf868187fb9a2e1656e4cf5f29d952ce5db.tar.bz2
spark-9a0dfdf868187fb9a2e1656e4cf5f29d952ce5db.zip
Add Naive Bayes to Python MLlib, and some API fixes
- Added a Python wrapper for Naive Bayes - Updated the Scala Naive Bayes to match the style of our other algorithms better and in particular make it easier to call from Java (added builder pattern, removed default value in train method) - Updated Python MLlib functions to not require a SparkContext; we can get that from the RDD the user gives - Added a toString method in LabeledPoint - Made the Python MLlib tests run as part of run-tests as well (before they could only be run individually through each file)
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/_common.py2
-rw-r--r--python/pyspark/mllib/classification.py65
-rw-r--r--python/pyspark/mllib/clustering.py11
-rw-r--r--python/pyspark/mllib/recommendation.py10
-rw-r--r--python/pyspark/mllib/regression.py17
-rwxr-xr-xpython/run-tests5
6 files changed, 87 insertions, 23 deletions
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index 769d88dfb9..20a0e309d1 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -16,7 +16,7 @@
#
from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape
-from pyspark import SparkContext
+from pyspark import SparkContext, RDD
from pyspark.serializers import Serializer
import struct
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 70de332d34..03ff5a572e 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+import numpy
+
from numpy import array, dot, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
@@ -29,8 +31,8 @@ class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
- >>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data))
- >>> lrm.predict(array([1.0])) != None
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
+ >>> lrm.predict(array([1.0])) > 0
True
"""
def predict(self, x):
@@ -41,9 +43,10 @@ class LogisticRegressionModel(LinearModel):
class LogisticRegressionWithSGD(object):
@classmethod
- def train(cls, sc, data, iterations=100, step=1.0,
+ def train(cls, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a logistic regression model on the given data."""
+ sc = data.context
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd,
iterations, step, mini_batch_fraction, i),
@@ -53,8 +56,8 @@ class SVMModel(LinearModel):
"""A support vector machine.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
- >>> svm = SVMWithSGD.train(sc, sc.parallelize(data))
- >>> svm.predict(array([1.0])) != None
+ >>> svm = SVMWithSGD.train(sc.parallelize(data))
+ >>> svm.predict(array([1.0])) > 0
True
"""
def predict(self, x):
@@ -64,14 +67,64 @@ class SVMModel(LinearModel):
class SVMWithSGD(object):
@classmethod
- def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ def train(cls, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a support vector machine on the given data."""
+ sc = data.context
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
SVMModel, data, initial_weights)
+class NaiveBayesModel(object):
+ """
+ Model for Naive Bayes classifiers.
+
+ Contains two parameters:
+ - pi: vector of logs of class priors (dimension C)
+ - theta: matrix of logs of class conditional probabilities (CxD)
+
+ >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
+ >>> model = NaiveBayes.train(sc.parallelize(data))
+ >>> model.predict(array([0.0, 1.0]))
+ 0
+ >>> model.predict(array([1.0, 0.0]))
+ 1
+ """
+
+ def __init__(self, pi, theta):
+ self.pi = pi
+ self.theta = theta
+
+ def predict(self, x):
+ """Return the most likely class for a data vector x"""
+ return numpy.argmax(self.pi + dot(x, self.theta))
+
+class NaiveBayes(object):
+ @classmethod
+ def train(cls, data, lambda_=1.0):
+ """
+ Train a Naive Bayes model given an RDD of (label, features) vectors.
+
+ This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can
+ handle all kinds of discrete data. For example, by converting
+ documents into TF-IDF vectors, it can be used for document
+ classification. By making every vector a 0-1 vector, it can also be
+ used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
+
+ @param data: RDD of NumPy vectors, one per element, where the first
+ coordinate is the label and the rest is the feature vector
+ (e.g. a count vector).
+ @param lambda_: The smoothing parameter
+ """
+ sc = data.context
+ dataBytes = _get_unmangled_double_vector_rdd(data)
+ ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
+ return NaiveBayesModel(
+ _deserialize_double_vector(ans[0]),
+ _deserialize_double_matrix(ans[1]))
+
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 8cf20e591a..30862918c3 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -28,12 +28,12 @@ class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
- >>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
+ >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
True
>>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
True
- >>> clusters = KMeans.train(sc, sc.parallelize(data), 2)
+ >>> clusters = KMeans.train(sc.parallelize(data), 2)
"""
def __init__(self, centers_):
self.centers = centers_
@@ -52,12 +52,13 @@ class KMeansModel(object):
class KMeans(object):
@classmethod
- def train(cls, sc, data, k, maxIterations=100, runs=1,
- initialization_mode="k-means||"):
+ def train(cls, data, k, maxIterations=100, runs=1,
+ initializationMode="k-means||"):
"""Train a k-means clustering model."""
+ sc = data.context
dataBytes = _get_unmangled_double_vector_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
- k, maxIterations, runs, initialization_mode)
+ k, maxIterations, runs, initializationMode)
if len(ans) != 1:
raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 0eeb5bb66b..f4a83f0209 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -32,11 +32,11 @@ class MatrixFactorizationModel(object):
>>> r2 = (1, 2, 2.0)
>>> r3 = (2, 1, 2.0)
>>> ratings = sc.parallelize([r1, r2, r3])
- >>> model = ALS.trainImplicit(sc, ratings, 1)
+ >>> model = ALS.trainImplicit(ratings, 1)
>>> model.predict(2,2) is not None
True
>>> testset = sc.parallelize([(1, 2), (1, 1)])
- >>> model.predictAll(testset).count == 2
+ >>> model.predictAll(testset).count() == 2
True
"""
@@ -57,14 +57,16 @@ class MatrixFactorizationModel(object):
class ALS(object):
@classmethod
- def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
+ def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
+ sc = ratings.context
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks)
return MatrixFactorizationModel(sc, mod)
@classmethod
- def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
+ def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
+ sc = ratings.context
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks, alpha)
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index a3a68b29e0..e90b72893f 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -47,14 +47,15 @@ class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
- >>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0]))
"""
class LinearRegressionWithSGD(object):
@classmethod
- def train(cls, sc, data, iterations=100, step=1.0,
+ def train(cls, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a linear regression model on the given data."""
+ sc = data.context
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
d._jrdd, iterations, step, mini_batch_fraction, i),
@@ -65,14 +66,15 @@ class LassoModel(LinearRegressionModelBase):
l_1 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
- >>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ >>> lrm = LassoWithSGD.train(sc.parallelize(data), initial_weights=array([1.0]))
"""
-
+
class LassoWithSGD(object):
@classmethod
- def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ def train(cls, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a Lasso regression model on the given data."""
+ sc = data.context
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
@@ -83,14 +85,15 @@ class RidgeRegressionModel(LinearRegressionModelBase):
l_2 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
- >>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initial_weights=array([1.0]))
"""
class RidgeRegressionWithSGD(object):
@classmethod
- def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ def train(cls, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
"""Train a ridge regression model on the given data."""
+ sc = data.context
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
diff --git a/python/run-tests b/python/run-tests
index feba97cee0..a986ac9380 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -40,6 +40,11 @@ run_test "-m doctest pyspark/broadcast.py"
run_test "-m doctest pyspark/accumulators.py"
run_test "-m doctest pyspark/serializers.py"
run_test "pyspark/tests.py"
+run_test "pyspark/mllib/_common.py"
+run_test "pyspark/mllib/classification.py"
+run_test "pyspark/mllib/clustering.py"
+run_test "pyspark/mllib/recommendation.py"
+run_test "pyspark/mllib/regression.py"
if [[ $FAILED != 0 ]]; then
echo -en "\033[31m" # Red