diff options
author | MechCoder <manojkumarsivaraj334@gmail.com> | 2015-06-30 10:25:59 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-06-30 10:25:59 -0700 |
commit | 45281664e0d3b22cd63660ca8ad6dd574f10e21f (patch) | |
tree | 15ee52c5ef70165f6492ea1f5246c6a0b5d71d1a /python/pyspark/mllib/regression.py | |
parent | ada384b785c663392a0b69fad5bfe7a0a0584ee0 (diff) | |
download | spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.gz spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.bz2 spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.zip |
[SPARK-4127] [MLLIB] [PYSPARK] Python bindings for StreamingLinearRegressionWithSGD
Python bindings for StreamingLinearRegressionWithSGD
Author: MechCoder <manojkumarsivaraj334@gmail.com>
Closes #6744 from MechCoder/spark-4127 and squashes the following commits:
d8f6457 [MechCoder] Moved StreamingLinearAlgorithm to pyspark.mllib.regression
d47cc24 [MechCoder] Inherit from StreamingLinearAlgorithm
1b4ddd6 [MechCoder] minor
4de6c68 [MechCoder] Minor refactor
5e85a3b [MechCoder] Add tests for simultaneous training and prediction
fb27889 [MechCoder] Add example and docs
505380b [MechCoder] Add tests
d42bdae [MechCoder] [SPARK-4127] Python bindings for StreamingLinearRegressionWithSGD
Diffstat (limited to 'python/pyspark/mllib/regression.py')
-rw-r--r-- | python/pyspark/mllib/regression.py | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 5ddbbee4ba..8e90adee5f 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -19,6 +19,7 @@ import numpy as np from numpy import array from pyspark import RDD +from pyspark.streaming.dstream import DStream from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector from pyspark.mllib.util import Saveable, Loader @@ -570,6 +571,95 @@ class IsotonicRegression(object): return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic) +class StreamingLinearAlgorithm(object): + """ + Base class that has to be inherited by any StreamingLinearAlgorithm. + + Prevents reimplementation of methods predictOn and predictOnValues. + """ + def __init__(self, model): + self._model = model + + def latestModel(self): + """ + Returns the latest model. + """ + return self._model + + def _validate(self, dstream): + if not isinstance(dstream, DStream): + raise TypeError( + "dstream should be a DStream object, got %s" % type(dstream)) + if not self._model: + raise ValueError( + "Model must be intialized using setInitialWeights") + + def predictOn(self, dstream): + """ + Make predictions on a dstream. + + :return: Transformed dstream object. + """ + self._validate(dstream) + return dstream.map(lambda x: self._model.predict(x)) + + def predictOnValues(self, dstream): + """ + Make predictions on a keyed dstream. + + :return: Transformed dstream object. + """ + self._validate(dstream) + return dstream.mapValues(lambda x: self._model.predict(x)) + + +@inherit_doc +class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm): + """ + Run LinearRegression with SGD on a batch of data. + + The problem minimized is (1 / n_samples) * (y - weights'X)**2. + After training on a batch of data, the weights obtained at the end of + training are used as initial weights for the next batch. + + :param: stepSize Step size for each iteration of gradient descent. + :param: numIterations Total number of iterations run. + :param: miniBatchFraction Fraction of data on which SGD is run for each + iteration. + """ + def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0): + self.stepSize = stepSize + self.numIterations = numIterations + self.miniBatchFraction = miniBatchFraction + self._model = None + super(StreamingLinearRegressionWithSGD, self).__init__( + model=self._model) + + def setInitialWeights(self, initialWeights): + """ + Set the initial value of weights. + + This must be set before running trainOn and predictOn + """ + initialWeights = _convert_to_vector(initialWeights) + self._model = LinearRegressionModel(initialWeights, 0) + return self + + def trainOn(self, dstream): + """Train the model on the incoming dstream.""" + self._validate(dstream) + + def update(rdd): + # LinearRegressionWithSGD.train raises an error for an empty RDD. + if not rdd.isEmpty(): + self._model = LinearRegressionWithSGD.train( + rdd, self.numIterations, self.stepSize, + self.miniBatchFraction, self._model.weights, + self._model.intercept) + + dstream.foreachRDD(update) + + def _test(): import doctest from pyspark import SparkContext |