aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/classification.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib/classification.py')
-rw-r--r--python/pyspark/mllib/classification.py75
1 files changed, 65 insertions, 10 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index d2f9cdb3f4..3a23e0801f 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -17,30 +17,55 @@
import numpy
-from numpy import array, dot, shape
+from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- LinearModel, _linear_predictor_typecheck
+ _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict(array([1.0])) > 0
True
+ >>> lrm.predict(array([0.0])) <= 0
+ True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+ >>> lrm.predict(array([0.0, 1.0])) > 0
+ True
+ >>> lrm.predict(array([0.0, 0.0])) <= 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
+
class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -55,14 +80,30 @@ class LogisticRegressionWithSGD(object):
class SVMModel(LinearModel):
"""A support vector machine.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+ >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> svm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0
class SVMWithSGD(object):
@@ -84,12 +125,26 @@ class NaiveBayesModel(object):
- pi: vector of logs of class priors (dimension C)
- theta: matrix of logs of class conditional probabilities (CxD)
- >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0, 0.0]),
+ ... LabeledPoint(0.0, [0.0, 1.0]),
+ ... LabeledPoint(1.0, [1.0, 0.0]),
+ ... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
+ ... ]
+ >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
+ >>> model.predict(SparseVector(2, {1: 1.0}))
+ 0.0
+ >>> model.predict(SparseVector(2, {0: 1.0}))
+ 1.0
"""
def __init__(self, labels, pi, theta):
@@ -99,7 +154,7 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
+ return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]
class NaiveBayes(object):
@classmethod
@@ -119,7 +174,7 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),