aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/classification.py
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-30 10:25:59 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-30 10:25:59 -0700
commit45281664e0d3b22cd63660ca8ad6dd574f10e21f (patch)
tree15ee52c5ef70165f6492ea1f5246c6a0b5d71d1a /python/pyspark/mllib/classification.py
parentada384b785c663392a0b69fad5bfe7a0a0584ee0 (diff)
downloadspark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.gz
spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.bz2
spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.zip
[SPARK-4127] [MLLIB] [PYSPARK] Python bindings for StreamingLinearRegressionWithSGD
Python bindings for StreamingLinearRegressionWithSGD Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6744 from MechCoder/spark-4127 and squashes the following commits: d8f6457 [MechCoder] Moved StreamingLinearAlgorithm to pyspark.mllib.regression d47cc24 [MechCoder] Inherit from StreamingLinearAlgorithm 1b4ddd6 [MechCoder] minor 4de6c68 [MechCoder] Minor refactor 5e85a3b [MechCoder] Add tests for simultaneous training and prediction fb27889 [MechCoder] Add example and docs 505380b [MechCoder] Add tests d42bdae [MechCoder] [SPARK-4127] Python bindings for StreamingLinearRegressionWithSGD
Diffstat (limited to 'python/pyspark/mllib/classification.py')
-rw-r--r--python/pyspark/mllib/classification.py50
1 files changed, 5 insertions, 45 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 735d45ba03..8f27c446a6 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -24,7 +24,9 @@ from pyspark import RDD
from pyspark.streaming import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
-from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
+from pyspark.mllib.regression import (
+ LabeledPoint, LinearModel, _regression_train_wrapper,
+ StreamingLinearAlgorithm)
from pyspark.mllib.util import Saveable, Loader, inherit_doc
@@ -585,55 +587,13 @@ class NaiveBayes(object):
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
-class StreamingLinearAlgorithm(object):
- """
- Base class that has to be inherited by any StreamingLinearAlgorithm.
-
- Prevents reimplementation of methods predictOn and predictOnValues.
- """
- def __init__(self, model):
- self._model = model
-
- def latestModel(self):
- """
- Returns the latest model.
- """
- return self._model
-
- def _validate(self, dstream):
- if not isinstance(dstream, DStream):
- raise TypeError(
- "dstream should be a DStream object, got %s" % type(dstream))
- if not self._model:
- raise ValueError(
- "Model must be intialized using setInitialWeights")
-
- def predictOn(self, dstream):
- """
- Make predictions on a dstream.
-
- :return: Transformed dstream object.
- """
- self._validate(dstream)
- return dstream.map(lambda x: self._model.predict(x))
-
- def predictOnValues(self, dstream):
- """
- Make predictions on a keyed dstream.
-
- :return: Transformed dstream object.
- """
- self._validate(dstream)
- return dstream.mapValues(lambda x: self._model.predict(x))
-
-
@inherit_doc
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
"""
- Run LogisticRegression with SGD on a stream of data.
+ Run LogisticRegression with SGD on a batch of data.
The weights obtained at the end of training a stream are used as initial
- weights for the next stream.
+ weights for the next batch.
:param stepSize: Step size for each iteration of gradient descent.
:param numIterations: Number of iterations run for each batch of data.