aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/regression.py
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-04-15 20:33:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-15 20:33:24 -0700
commit63ca581d9c84176549b1ea0a1d8d7c0cca982acc (patch)
tree9dc5f04a355117578e31b4b431d34da075b34ea3 /python/pyspark/mllib/regression.py
parent8517911efb89aade61c8b8c54fee216dae9a4b4f (diff)
downloadspark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.tar.gz
spark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.tar.bz2
spark-63ca581d9c84176549b1ea0a1d8d7c0cca982acc.zip
[WIP] SPARK-1430: Support sparse data in Python MLlib
This PR adds a SparseVector class in PySpark and updates all the regression, classification and clustering algorithms and models to support sparse data, similar to MLlib. I chose to add this class because SciPy is quite difficult to install in many environments (more so than NumPy), but I plan to add support for SciPy sparse vectors later too, and make the methods work transparently on objects of either type. On the Scala side, we keep Python sparse vectors sparse and pass them to MLlib. We always return dense vectors from our models. Some to-do items left: - [x] Support SciPy's scipy.sparse matrix objects when SciPy is available. We can easily add a function to convert these to our own SparseVector. - [x] MLlib currently uses a vector with one extra column on the left to represent what we call LabeledPoint in Scala. Do we really want this? It may get annoying once you deal with sparse data since you must add/subtract 1 to each feature index when training. We can remove this API in 1.0 and use tuples for labeling. - [x] Explain how to use these in the Python MLlib docs. CC @mengxr, @joshrosen Author: Matei Zaharia <matei@databricks.com> Closes #341 from mateiz/py-ml-update and squashes the following commits: d52e763 [Matei Zaharia] Remove no-longer-needed slice code and handle review comments ea5a25a [Matei Zaharia] Fix remaining uses of copyto() after merge b9f97a3 [Matei Zaharia] Fix test 1e1bd0f [Matei Zaharia] Add MLlib logistic regression example in Python 88bc01f [Matei Zaharia] Clean up inheritance of LinearModel in Python, and expose its parametrs 37ab747 [Matei Zaharia] Fix some examples and docs due to changes in MLlib API da0f27e [Matei Zaharia] Added a MLlib K-means example and updated docs to discuss sparse data c48e85a [Matei Zaharia] Added some tests for passing lists as input, and added mllib/tests.py to run-tests script. a07ba10 [Matei Zaharia] Fix some typos and calculation of initial weights 74eefe7 [Matei Zaharia] Added LabeledPoint class in Python 889dde8 [Matei Zaharia] Support scipy.sparse matrices in all our algorithms and models ab244d1 [Matei Zaharia] Allow SparseVectors to be initialized using a dict a5d6426 [Matei Zaharia] Add linalg.py to run-tests script 0e7a3d8 [Matei Zaharia] Keep vectors sparse in Java when reading LabeledPoints eaee759 [Matei Zaharia] Update regression, classification and clustering models for sparse data 2abbb44 [Matei Zaharia] Further work to get linear models working with sparse data 154f45d [Matei Zaharia] Update docs, name some magic values 881fef7 [Matei Zaharia] Added a sparse vector in Python and made Java-Python format more compact
Diffstat (limited to 'python/pyspark/mllib/regression.py')
-rw-r--r--python/pyspark/mllib/regression.py128
1 files changed, 118 insertions, 10 deletions
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 7656db07f6..266b31d3fa 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,41 +15,98 @@
# limitations under the License.
#
-from numpy import array, dot
+from numpy import array, ndarray
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _linear_predictor_typecheck
+ _linear_predictor_typecheck, _have_scipy, _scipy_issparse
+from pyspark.mllib.linalg import SparseVector
+
+
+class LabeledPoint(object):
+ """
+ The features and labels of a data point.
+
+ @param label: Label for this data point.
+ @param features: Vector of features for this point (NumPy array, list,
+ pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
+ """
+ def __init__(self, label, features):
+ self.label = label
+ if (type(features) == ndarray or type(features) == SparseVector
+ or (_have_scipy and _scipy_issparse(features))):
+ self.features = features
+ elif type(features) == list:
+ self.features = array(features)
+ else:
+ raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+
class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
+ """A linear model that has a vector of coefficients and an intercept."""
+ def __init__(self, weights, intercept):
+ self._coeff = weights
self._intercept = intercept
+ @property
+ def weights(self):
+ return self._coeff
+
+ @property
+ def intercept(self):
+ return self._intercept
+
+
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
+ >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
+ True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ return _dot(x, self._coeff) + self._intercept
+
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -61,14 +118,39 @@ class LinearRegressionWithSGD(object):
d._jrdd, iterations, step, miniBatchFraction, i),
LinearRegressionModel, data, initialWeights)
+
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -80,14 +162,39 @@ class LassoWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
LassoModel, data, initialWeights)
+
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -99,6 +206,7 @@ class RidgeRegressionWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
RidgeRegressionModel, data, initialWeights)
+
def _test():
import doctest
globs = globals().copy()