aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/regression.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib/regression.py')
-rw-r--r--python/pyspark/mllib/regression.py128
1 files changed, 118 insertions, 10 deletions
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 7656db07f6..266b31d3fa 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,41 +15,98 @@
# limitations under the License.
#
-from numpy import array, dot
+from numpy import array, ndarray
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _linear_predictor_typecheck
+ _linear_predictor_typecheck, _have_scipy, _scipy_issparse
+from pyspark.mllib.linalg import SparseVector
+
+
+class LabeledPoint(object):
+ """
+ The features and labels of a data point.
+
+ @param label: Label for this data point.
+ @param features: Vector of features for this point (NumPy array, list,
+ pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
+ """
+ def __init__(self, label, features):
+ self.label = label
+ if (type(features) == ndarray or type(features) == SparseVector
+ or (_have_scipy and _scipy_issparse(features))):
+ self.features = features
+ elif type(features) == list:
+ self.features = array(features)
+ else:
+ raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+
class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
+ """A linear model that has a vector of coefficients and an intercept."""
+ def __init__(self, weights, intercept):
+ self._coeff = weights
self._intercept = intercept
+ @property
+ def weights(self):
+ return self._coeff
+
+ @property
+ def intercept(self):
+ return self._intercept
+
+
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
+ >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
+ True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ return _dot(x, self._coeff) + self._intercept
+
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -61,14 +118,39 @@ class LinearRegressionWithSGD(object):
d._jrdd, iterations, step, miniBatchFraction, i),
LinearRegressionModel, data, initialWeights)
+
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -80,14 +162,39 @@ class LassoWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
LassoModel, data, initialWeights)
+
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -99,6 +206,7 @@ class RidgeRegressionWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
RidgeRegressionModel, data, initialWeights)
+
def _test():
import doctest
globs = globals().copy()