diff options
author | MechCoder <manojkumarsivaraj334@gmail.com> | 2015-06-30 10:25:59 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-06-30 10:25:59 -0700 |
commit | 45281664e0d3b22cd63660ca8ad6dd574f10e21f (patch) | |
tree | 15ee52c5ef70165f6492ea1f5246c6a0b5d71d1a /python/pyspark/mllib/classification.py | |
parent | ada384b785c663392a0b69fad5bfe7a0a0584ee0 (diff) | |
download | spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.gz spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.bz2 spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.zip |
[SPARK-4127] [MLLIB] [PYSPARK] Python bindings for StreamingLinearRegressionWithSGD
Python bindings for StreamingLinearRegressionWithSGD
Author: MechCoder <manojkumarsivaraj334@gmail.com>
Closes #6744 from MechCoder/spark-4127 and squashes the following commits:
d8f6457 [MechCoder] Moved StreamingLinearAlgorithm to pyspark.mllib.regression
d47cc24 [MechCoder] Inherit from StreamingLinearAlgorithm
1b4ddd6 [MechCoder] minor
4de6c68 [MechCoder] Minor refactor
5e85a3b [MechCoder] Add tests for simultaneous training and prediction
fb27889 [MechCoder] Add example and docs
505380b [MechCoder] Add tests
d42bdae [MechCoder] [SPARK-4127] Python bindings for StreamingLinearRegressionWithSGD
Diffstat (limited to 'python/pyspark/mllib/classification.py')
-rw-r--r-- | python/pyspark/mllib/classification.py | 50 |
1 files changed, 5 insertions, 45 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 735d45ba03..8f27c446a6 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -24,7 +24,9 @@ from pyspark import RDD from pyspark.streaming import DStream from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector -from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper +from pyspark.mllib.regression import ( + LabeledPoint, LinearModel, _regression_train_wrapper, + StreamingLinearAlgorithm) from pyspark.mllib.util import Saveable, Loader, inherit_doc @@ -585,55 +587,13 @@ class NaiveBayes(object): return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta)) -class StreamingLinearAlgorithm(object): - """ - Base class that has to be inherited by any StreamingLinearAlgorithm. - - Prevents reimplementation of methods predictOn and predictOnValues. - """ - def __init__(self, model): - self._model = model - - def latestModel(self): - """ - Returns the latest model. - """ - return self._model - - def _validate(self, dstream): - if not isinstance(dstream, DStream): - raise TypeError( - "dstream should be a DStream object, got %s" % type(dstream)) - if not self._model: - raise ValueError( - "Model must be intialized using setInitialWeights") - - def predictOn(self, dstream): - """ - Make predictions on a dstream. - - :return: Transformed dstream object. - """ - self._validate(dstream) - return dstream.map(lambda x: self._model.predict(x)) - - def predictOnValues(self, dstream): - """ - Make predictions on a keyed dstream. - - :return: Transformed dstream object. - """ - self._validate(dstream) - return dstream.mapValues(lambda x: self._model.predict(x)) - - @inherit_doc class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm): """ - Run LogisticRegression with SGD on a stream of data. + Run LogisticRegression with SGD on a batch of data. The weights obtained at the end of training a stream are used as initial - weights for the next stream. + weights for the next batch. :param stepSize: Step size for each iteration of gradient descent. :param numIterations: Number of iterations run for each batch of data. |