aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-24 14:58:43 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-24 14:58:43 -0700
commitfb32c388985ce65c1083cb435cf1f7479fecbaac (patch)
tree14b7eee9102c6573bbd8923c71977c4d16df9230
parentf04b5672c5a5562f8494df3b0df23235285c9e9e (diff)
downloadspark-fb32c388985ce65c1083cb435cf1f7479fecbaac.tar.gz
spark-fb32c388985ce65c1083cb435cf1f7479fecbaac.tar.bz2
spark-fb32c388985ce65c1083cb435cf1f7479fecbaac.zip
[SPARK-7633] [MLLIB] [PYSPARK] Python bindings for StreamingLogisticRegressionwithSGD
Add Python bindings to StreamingLogisticRegressionwithSGD. No Java wrappers are needed as models are updated directly using train. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6849 from MechCoder/spark-3258 and squashes the following commits: b4376a5 [MechCoder] minor d7e5fc1 [MechCoder] Refactor into StreamingLinearAlgorithm Better docs 9c09d4e [MechCoder] [SPARK-7633] Python bindings for StreamingLogisticRegressionwithSGD
-rw-r--r--python/pyspark/mllib/classification.py96
-rw-r--r--python/pyspark/mllib/tests.py135
2 files changed, 229 insertions, 2 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 758accf4b4..2698f10d06 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,6 +21,7 @@ import numpy
from numpy import array
from pyspark import RDD
+from pyspark.streaming import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
@@ -28,7 +29,8 @@ from pyspark.mllib.util import Saveable, Loader, inherit_doc
__all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'LogisticRegressionWithLBFGS',
- 'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes']
+ 'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
+ 'StreamingLogisticRegressionWithSGD']
class LinearClassificationModel(LinearModel):
@@ -583,6 +585,98 @@ class NaiveBayes(object):
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
+class StreamingLinearAlgorithm(object):
+ """
+ Base class that has to be inherited by any StreamingLinearAlgorithm.
+
+ Prevents reimplementation of methods predictOn and predictOnValues.
+ """
+ def __init__(self, model):
+ self._model = model
+
+ def latestModel(self):
+ """
+ Returns the latest model.
+ """
+ return self._model
+
+ def _validate(self, dstream):
+ if not isinstance(dstream, DStream):
+ raise TypeError(
+ "dstream should be a DStream object, got %s" % type(dstream))
+ if not self._model:
+ raise ValueError(
+ "Model must be intialized using setInitialWeights")
+
+ def predictOn(self, dstream):
+ """
+ Make predictions on a dstream.
+
+ :return: Transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.map(lambda x: self._model.predict(x))
+
+ def predictOnValues(self, dstream):
+ """
+ Make predictions on a keyed dstream.
+
+ :return: Transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.mapValues(lambda x: self._model.predict(x))
+
+
+@inherit_doc
+class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
+ """
+ Run LogisticRegression with SGD on a stream of data.
+
+ The weights obtained at the end of training a stream are used as initial
+ weights for the next stream.
+
+ :param stepSize: Step size for each iteration of gradient descent.
+ :param numIterations: Number of iterations run for each batch of data.
+ :param miniBatchFraction: Fraction of data on which SGD is run for each
+ iteration.
+ :param regParam: L2 Regularization parameter.
+ """
+ def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.01):
+ self.stepSize = stepSize
+ self.numIterations = numIterations
+ self.regParam = regParam
+ self.miniBatchFraction = miniBatchFraction
+ self._model = None
+ super(StreamingLogisticRegressionWithSGD, self).__init__(
+ model=self._model)
+
+ def setInitialWeights(self, initialWeights):
+ """
+ Set the initial value of weights.
+
+ This must be set before running trainOn and predictOn.
+ """
+ initialWeights = _convert_to_vector(initialWeights)
+
+ # LogisticRegressionWithSGD does only binary classification.
+ self._model = LogisticRegressionModel(
+ initialWeights, 0, initialWeights.size, 2)
+ return self
+
+ def trainOn(self, dstream):
+ """Train the model on the incoming dstream."""
+ self._validate(dstream)
+
+ def update(rdd):
+ # LogisticRegressionWithSGD.train raises an error for an empty RDD.
+ if not rdd.isEmpty():
+ self._model = LogisticRegressionWithSGD.train(
+ rdd, self.numIterations, self.stepSize,
+ self.miniBatchFraction, self._model.weights)
+
+ dstream.foreachRDD(update)
+
+
def _test():
import doctest
from pyspark import SparkContext
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 509faa11df..cd80c3e07a 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -26,7 +26,8 @@ import array as pyarray
from time import time, sleep
from shutil import rmtree
-from numpy import array, array_equal, zeros, inf, all, random
+from numpy import (
+ array, array_equal, zeros, inf, random, exp, dot, all, mean)
from numpy import sum as array_sum
from py4j.protocol import Py4JJavaError
@@ -45,6 +46,7 @@ from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
from pyspark.mllib.feature import Word2Vec
@@ -1037,6 +1039,137 @@ class LinearDataGeneratorTests(MLlibTestCase):
self.assertEqual(len(point.features), 2)
+class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
+
+ @staticmethod
+ def generateLogisticInput(offset, scale, nPoints, seed):
+ """
+ Generate 1 / (1 + exp(-x * scale + offset))
+
+ where,
+ x is randomnly distributed and the threshold
+ and labels for each sample in x is obtained from a random uniform
+ distribution.
+ """
+ rng = random.RandomState(seed)
+ x = rng.randn(nPoints)
+ sigmoid = 1. / (1 + exp(-(dot(x, scale) + offset)))
+ y_p = rng.rand(nPoints)
+ cut_off = y_p <= sigmoid
+ y_p[cut_off] = 1.0
+ y_p[~cut_off] = 0.0
+ return [
+ LabeledPoint(y_p[i], Vectors.dense([x[i]]))
+ for i in range(nPoints)]
+
+ def test_parameter_accuracy(self):
+ """
+ Test that the final value of weights is close to the desired value.
+ """
+ input_batches = [
+ self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
+ for i in range(20)]
+ input_stream = self.ssc.queueStream(input_batches)
+
+ slr = StreamingLogisticRegressionWithSGD(
+ stepSize=0.2, numIterations=25)
+ slr.setInitialWeights([0.0])
+ slr.trainOn(input_stream)
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 20.0, 0.01)
+ rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
+ self.assertAlmostEqual(rel, 0.1, 1)
+
+ def test_convergence(self):
+ """
+ Test that weights converge to the required value on toy data.
+ """
+ input_batches = [
+ self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
+ for i in range(20)]
+ input_stream = self.ssc.queueStream(input_batches)
+ models = []
+
+ slr = StreamingLogisticRegressionWithSGD(
+ stepSize=0.2, numIterations=25)
+ slr.setInitialWeights([0.0])
+ slr.trainOn(input_stream)
+ input_stream.foreachRDD(
+ lambda x: models.append(slr.latestModel().weights[0]))
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 15.0, 0.01)
+ t_models = array(models)
+ diff = t_models[1:] - t_models[:-1]
+
+ # Test that weights improve with a small tolerance,
+ self.assertTrue(all(diff >= -0.1))
+ self.assertTrue(array_sum(diff > 0) > 1)
+
+ @staticmethod
+ def calculate_accuracy_error(true, predicted):
+ return sum(abs(array(true) - array(predicted))) / len(true)
+
+ def test_predictions(self):
+ """Test predicted values on a toy model."""
+ input_batches = []
+ for i in range(20):
+ batch = self.sc.parallelize(
+ self.generateLogisticInput(0, 1.5, 100, 42 + i))
+ input_batches.append(batch.map(lambda x: (x.label, x.features)))
+ input_stream = self.ssc.queueStream(input_batches)
+
+ slr = StreamingLogisticRegressionWithSGD(
+ stepSize=0.2, numIterations=25)
+ slr.setInitialWeights([1.5])
+ predict_stream = slr.predictOnValues(input_stream)
+ true_predicted = []
+ predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 5.0, 0.01)
+
+ # Test that the accuracy error is no more than 0.4 on each batch.
+ for batch in true_predicted:
+ true, predicted = zip(*batch)
+ self.assertTrue(
+ self.calculate_accuracy_error(true, predicted) < 0.4)
+
+ def test_training_and_prediction(self):
+ """Test that the model improves on toy data with no. of batches"""
+ input_batches = [
+ self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
+ for i in range(20)]
+ predict_batches = [
+ b.map(lambda lp: (lp.label, lp.features)) for b in input_batches]
+
+ slr = StreamingLogisticRegressionWithSGD(
+ stepSize=0.01, numIterations=25)
+ slr.setInitialWeights([-0.1])
+ errors = []
+
+ def collect_errors(rdd):
+ true, predicted = zip(*rdd.collect())
+ errors.append(self.calculate_accuracy_error(true, predicted))
+
+ true_predicted = []
+ input_stream = self.ssc.queueStream(input_batches)
+ predict_stream = self.ssc.queueStream(predict_batches)
+ slr.trainOn(input_stream)
+ ps = slr.predictOnValues(predict_stream)
+ ps.foreachRDD(lambda x: collect_errors(x))
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 20.0, 0.01)
+
+ # Test that the improvement in error is atleast 0.3
+ self.assertTrue(errors[1] - errors[-1] > 0.3)
+
+
if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")