aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/classification.py
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-04-15 20:33:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-15 20:33:24 -0700
commit63ca581d9c84176549b1ea0a1d8d7c0cca982acc (patch)
tree9dc5f04a355117578e31b4b431d34da075b34ea3 /python/pyspark/mllib/classification.py
parent8517911efb89aade61c8b8c54fee216dae9a4b4f (diff)
downloadspark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.tar.gz
spark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.tar.bz2
spark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.zip
[WIP] SPARK-1430: Support sparse data in Python MLlib
This PR adds a SparseVector class in PySpark and updates all the regression, classification and clustering algorithms and models to support sparse data, similar to MLlib. I chose to add this class because SciPy is quite difficult to install in many environments (more so than NumPy), but I plan to add support for SciPy sparse vectors later too, and make the methods work transparently on objects of either type. On the Scala side, we keep Python sparse vectors sparse and pass them to MLlib. We always return dense vectors from our models. Some to-do items left: - [x] Support SciPy's scipy.sparse matrix objects when SciPy is available. We can easily add a function to convert these to our own SparseVector. - [x] MLlib currently uses a vector with one extra column on the left to represent what we call LabeledPoint in Scala. Do we really want this? It may get annoying once you deal with sparse data since you must add/subtract 1 to each feature index when training. We can remove this API in 1.0 and use tuples for labeling. - [x] Explain how to use these in the Python MLlib docs. CC @mengxr, @joshrosen Author: Matei Zaharia <matei@databricks.com> Closes #341 from mateiz/py-ml-update and squashes the following commits: d52e763 [Matei Zaharia] Remove no-longer-needed slice code and handle review comments ea5a25a [Matei Zaharia] Fix remaining uses of copyto() after merge b9f97a3 [Matei Zaharia] Fix test 1e1bd0f [Matei Zaharia] Add MLlib logistic regression example in Python 88bc01f [Matei Zaharia] Clean up inheritance of LinearModel in Python, and expose its parametrs 37ab747 [Matei Zaharia] Fix some examples and docs due to changes in MLlib API da0f27e [Matei Zaharia] Added a MLlib K-means example and updated docs to discuss sparse data c48e85a [Matei Zaharia] Added some tests for passing lists as input, and added mllib/tests.py to run-tests script. a07ba10 [Matei Zaharia] Fix some typos and calculation of initial weights 74eefe7 [Matei Zaharia] Added LabeledPoint class in Python 889dde8 [Matei Zaharia] Support scipy.sparse matrices in all our algorithms and models ab244d1 [Matei Zaharia] Allow SparseVectors to be initialized using a dict a5d6426 [Matei Zaharia] Add linalg.py to run-tests script 0e7a3d8 [Matei Zaharia] Keep vectors sparse in Java when reading LabeledPoints eaee759 [Matei Zaharia] Update regression, classification and clustering models for sparse data 2abbb44 [Matei Zaharia] Further work to get linear models working with sparse data 154f45d [Matei Zaharia] Update docs, name some magic values 881fef7 [Matei Zaharia] Added a sparse vector in Python and made Java-Python format more compact
Diffstat (limited to 'python/pyspark/mllib/classification.py')
-rw-r--r--python/pyspark/mllib/classification.py75
1 files changed, 65 insertions, 10 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index d2f9cdb3f4..3a23e0801f 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -17,30 +17,55 @@
import numpy
-from numpy import array, dot, shape
+from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- LinearModel, _linear_predictor_typecheck
+ _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict(array([1.0])) > 0
True
+ >>> lrm.predict(array([0.0])) <= 0
+ True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+ >>> lrm.predict(array([0.0, 1.0])) > 0
+ True
+ >>> lrm.predict(array([0.0, 0.0])) <= 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
+
class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -55,14 +80,30 @@ class LogisticRegressionWithSGD(object):
class SVMModel(LinearModel):
"""A support vector machine.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+ >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> svm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0
class SVMWithSGD(object):
@@ -84,12 +125,26 @@ class NaiveBayesModel(object):
- pi: vector of logs of class priors (dimension C)
- theta: matrix of logs of class conditional probabilities (CxD)
- >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0, 0.0]),
+ ... LabeledPoint(0.0, [0.0, 1.0]),
+ ... LabeledPoint(1.0, [1.0, 0.0]),
+ ... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
+ ... ]
+ >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
+ >>> model.predict(SparseVector(2, {1: 1.0}))
+ 0.0
+ >>> model.predict(SparseVector(2, {0: 1.0}))
+ 1.0
"""
def __init__(self, labels, pi, theta):
@@ -99,7 +154,7 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
+ return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]
class NaiveBayes(object):
@classmethod
@@ -119,7 +174,7 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),