aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/classification.py
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-24 14:58:43 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-24 14:58:43 -0700
commitfb32c388985ce65c1083cb435cf1f7479fecbaac (patch)
tree14b7eee9102c6573bbd8923c71977c4d16df9230 /python/pyspark/mllib/classification.py
parentf04b5672c5a5562f8494df3b0df23235285c9e9e (diff)
downloadspark-fb32c388985ce65c1083cb435cf1f7479fecbaac.tar.gz
spark-fb32c388985ce65c1083cb435cf1f7479fecbaac.tar.bz2
spark-fb32c388985ce65c1083cb435cf1f7479fecbaac.zip
[SPARK-7633] [MLLIB] [PYSPARK] Python bindings for StreamingLogisticRegressionwithSGD
Add Python bindings to StreamingLogisticRegressionwithSGD. No Java wrappers are needed as models are updated directly using train. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6849 from MechCoder/spark-3258 and squashes the following commits: b4376a5 [MechCoder] minor d7e5fc1 [MechCoder] Refactor into StreamingLinearAlgorithm Better docs 9c09d4e [MechCoder] [SPARK-7633] Python bindings for StreamingLogisticRegressionwithSGD
Diffstat (limited to 'python/pyspark/mllib/classification.py')
-rw-r--r--python/pyspark/mllib/classification.py96
1 files changed, 95 insertions, 1 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 758accf4b4..2698f10d06 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,6 +21,7 @@ import numpy
from numpy import array
from pyspark import RDD
+from pyspark.streaming import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
@@ -28,7 +29,8 @@ from pyspark.mllib.util import Saveable, Loader, inherit_doc
__all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'LogisticRegressionWithLBFGS',
- 'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes']
+ 'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
+ 'StreamingLogisticRegressionWithSGD']
class LinearClassificationModel(LinearModel):
@@ -583,6 +585,98 @@ class NaiveBayes(object):
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
+class StreamingLinearAlgorithm(object):
+ """
+ Base class that has to be inherited by any StreamingLinearAlgorithm.
+
+ Prevents reimplementation of methods predictOn and predictOnValues.
+ """
+ def __init__(self, model):
+ self._model = model
+
+ def latestModel(self):
+ """
+ Returns the latest model.
+ """
+ return self._model
+
+ def _validate(self, dstream):
+ if not isinstance(dstream, DStream):
+ raise TypeError(
+ "dstream should be a DStream object, got %s" % type(dstream))
+ if not self._model:
+ raise ValueError(
+ "Model must be intialized using setInitialWeights")
+
+ def predictOn(self, dstream):
+ """
+ Make predictions on a dstream.
+
+ :return: Transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.map(lambda x: self._model.predict(x))
+
+ def predictOnValues(self, dstream):
+ """
+ Make predictions on a keyed dstream.
+
+ :return: Transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.mapValues(lambda x: self._model.predict(x))
+
+
+@inherit_doc
+class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
+ """
+ Run LogisticRegression with SGD on a stream of data.
+
+ The weights obtained at the end of training a stream are used as initial
+ weights for the next stream.
+
+ :param stepSize: Step size for each iteration of gradient descent.
+ :param numIterations: Number of iterations run for each batch of data.
+ :param miniBatchFraction: Fraction of data on which SGD is run for each
+ iteration.
+ :param regParam: L2 Regularization parameter.
+ """
+ def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01):
+ self.stepSize = stepSize
+ self.numIterations = numIterations
+ self.regParam = regParam
+ self.miniBatchFraction = miniBatchFraction
+ self._model = None
+ super(StreamingLogisticRegressionWithSGD, self).__init__(
+ model=self._model)
+
+ def setInitialWeights(self, initialWeights):
+ """
+ Set the initial value of weights.
+
+ This must be set before running trainOn and predictOn.
+ """
+ initialWeights = _convert_to_vector(initialWeights)
+
+ # LogisticRegressionWithSGD does only binary classification.
+ self._model = LogisticRegressionModel(
+ initialWeights, 0, initialWeights.size, 2)
+ return self
+
+ def trainOn(self, dstream):
+ """Train the model on the incoming dstream."""
+ self._validate(dstream)
+
+ def update(rdd):
+ # LogisticRegressionWithSGD.train raises an error for an empty RDD.
+ if not rdd.isEmpty():
+ self._model = LogisticRegressionWithSGD.train(
+ rdd, self.numIterations, self.stepSize,
+ self.miniBatchFraction, self._model.weights)
+
+ dstream.foreachRDD(update)
+
+
def _test():
import doctest
from pyspark import SparkContext