aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-30 10:25:59 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-30 10:25:59 -0700
commit45281664e0d3b22cd63660ca8ad6dd574f10e21f (patch)
tree15ee52c5ef70165f6492ea1f5246c6a0b5d71d1a /python
parentada384b785c663392a0b69fad5bfe7a0a0584ee0 (diff)
downloadspark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.gz
spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.bz2
spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.zip
[SPARK-4127] [MLLIB] [PYSPARK] Python bindings for StreamingLinearRegressionWithSGD
Python bindings for StreamingLinearRegressionWithSGD Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6744 from MechCoder/spark-4127 and squashes the following commits: d8f6457 [MechCoder] Moved StreamingLinearAlgorithm to pyspark.mllib.regression d47cc24 [MechCoder] Inherit from StreamingLinearAlgorithm 1b4ddd6 [MechCoder] minor 4de6c68 [MechCoder] Minor refactor 5e85a3b [MechCoder] Add tests for simultaneous training and prediction fb27889 [MechCoder] Add example and docs 505380b [MechCoder] Add tests d42bdae [MechCoder] [SPARK-4127] Python bindings for StreamingLinearRegressionWithSGD
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/classification.py50
-rw-r--r--python/pyspark/mllib/regression.py90
-rw-r--r--python/pyspark/mllib/tests.py124
3 files changed, 217 insertions, 47 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 735d45ba03..8f27c446a6 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -24,7 +24,9 @@ from pyspark import RDD
from pyspark.streaming import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
-from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
+from pyspark.mllib.regression import (
+ LabeledPoint, LinearModel, _regression_train_wrapper,
+ StreamingLinearAlgorithm)
from pyspark.mllib.util import Saveable, Loader, inherit_doc
@@ -585,55 +587,13 @@ class NaiveBayes(object):
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
-class StreamingLinearAlgorithm(object):
- """
- Base class that has to be inherited by any StreamingLinearAlgorithm.
-
- Prevents reimplementation of methods predictOn and predictOnValues.
- """
- def __init__(self, model):
- self._model = model
-
- def latestModel(self):
- """
- Returns the latest model.
- """
- return self._model
-
- def _validate(self, dstream):
- if not isinstance(dstream, DStream):
- raise TypeError(
- "dstream should be a DStream object, got %s" % type(dstream))
- if not self._model:
- raise ValueError(
- "Model must be intialized using setInitialWeights")
-
- def predictOn(self, dstream):
- """
- Make predictions on a dstream.
-
- :return: Transformed dstream object.
- """
- self._validate(dstream)
- return dstream.map(lambda x: self._model.predict(x))
-
- def predictOnValues(self, dstream):
- """
- Make predictions on a keyed dstream.
-
- :return: Transformed dstream object.
- """
- self._validate(dstream)
- return dstream.mapValues(lambda x: self._model.predict(x))
-
-
@inherit_doc
class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
"""
- Run LogisticRegression with SGD on a stream of data.
+ Run LogisticRegression with SGD on a batch of data.
The weights obtained at the end of training a stream are used as initial
- weights for the next stream.
+ weights for the next batch.
:param stepSize: Step size for each iteration of gradient descent.
:param numIterations: Number of iterations run for each batch of data.
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 5ddbbee4ba..8e90adee5f 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -19,6 +19,7 @@ import numpy as np
from numpy import array
from pyspark import RDD
+from pyspark.streaming.dstream import DStream
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
from pyspark.mllib.util import Saveable, Loader
@@ -570,6 +571,95 @@ class IsotonicRegression(object):
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
+class StreamingLinearAlgorithm(object):
+ """
+ Base class that has to be inherited by any StreamingLinearAlgorithm.
+
+ Prevents reimplementation of methods predictOn and predictOnValues.
+ """
+ def __init__(self, model):
+ self._model = model
+
+ def latestModel(self):
+ """
+ Returns the latest model.
+ """
+ return self._model
+
+ def _validate(self, dstream):
+ if not isinstance(dstream, DStream):
+ raise TypeError(
+ "dstream should be a DStream object, got %s" % type(dstream))
+ if not self._model:
+ raise ValueError(
+ "Model must be intialized using setInitialWeights")
+
+ def predictOn(self, dstream):
+ """
+ Make predictions on a dstream.
+
+ :return: Transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.map(lambda x: self._model.predict(x))
+
+ def predictOnValues(self, dstream):
+ """
+ Make predictions on a keyed dstream.
+
+ :return: Transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.mapValues(lambda x: self._model.predict(x))
+
+
+@inherit_doc
+class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
+ """
+ Run LinearRegression with SGD on a batch of data.
+
+ The problem minimized is (1 / n_samples) * (y - weights'X)**2.
+ After training on a batch of data, the weights obtained at the end of
+ training are used as initial weights for the next batch.
+
+ :param: stepSize Step size for each iteration of gradient descent.
+ :param: numIterations Total number of iterations run.
+ :param: miniBatchFraction Fraction of data on which SGD is run for each
+ iteration.
+ """
+ def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
+ self.stepSize = stepSize
+ self.numIterations = numIterations
+ self.miniBatchFraction = miniBatchFraction
+ self._model = None
+ super(StreamingLinearRegressionWithSGD, self).__init__(
+ model=self._model)
+
+ def setInitialWeights(self, initialWeights):
+ """
+ Set the initial value of weights.
+
+ This must be set before running trainOn and predictOn
+ """
+ initialWeights = _convert_to_vector(initialWeights)
+ self._model = LinearRegressionModel(initialWeights, 0)
+ return self
+
+ def trainOn(self, dstream):
+ """Train the model on the incoming dstream."""
+ self._validate(dstream)
+
+ def update(rdd):
+ # LinearRegressionWithSGD.train raises an error for an empty RDD.
+ if not rdd.isEmpty():
+ self._model = LinearRegressionWithSGD.train(
+ rdd, self.numIterations, self.stepSize,
+ self.miniBatchFraction, self._model.weights,
+ self._model.intercept)
+
+ dstream.foreachRDD(update)
+
+
def _test():
import doctest
from pyspark import SparkContext
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index cd80c3e07a..f0091d6fac 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -27,8 +27,9 @@ from time import time, sleep
from shutil import rmtree
from numpy import (
- array, array_equal, zeros, inf, random, exp, dot, all, mean)
+ array, array_equal, zeros, inf, random, exp, dot, all, mean, abs)
from numpy import sum as array_sum
+
from py4j.protocol import Py4JJavaError
if sys.version_info[:2] <= (2, 6):
@@ -45,8 +46,8 @@ from pyspark.mllib.common import _to_java_object_rdd
from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
-from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
+from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
from pyspark.mllib.feature import Word2Vec
@@ -56,6 +57,7 @@ from pyspark.mllib.util import LinearDataGenerator
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
+from pyspark.streaming import StreamingContext
_have_scipy = False
try:
@@ -1170,6 +1172,124 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
self.assertTrue(errors[1] - errors[-1] > 0.3)
+class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
+
+ def assertArrayAlmostEqual(self, array1, array2, dec):
+ for i, j in array1, array2:
+ self.assertAlmostEqual(i, j, dec)
+
+ def test_parameter_accuracy(self):
+ """Test that coefs are predicted accurately by fitting on toy data."""
+
+ # Test that fitting (10*X1 + 10*X2), (X1, X2) gives coefficients
+ # (10, 10)
+ slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+ slr.setInitialWeights([0.0, 0.0])
+ xMean = [0.0, 0.0]
+ xVariance = [1.0 / 3.0, 1.0 / 3.0]
+
+ # Create ten batches with 100 sample points in each.
+ batches = []
+ for i in range(10):
+ batch = LinearDataGenerator.generateLinearInput(
+ 0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
+ batches.append(sc.parallelize(batch))
+
+ input_stream = self.ssc.queueStream(batches)
+ t = time()
+ slr.trainOn(input_stream)
+ self.ssc.start()
+ self._ssc_wait(t, 10, 0.01)
+ self.assertArrayAlmostEqual(
+ slr.latestModel().weights.array, [10., 10.], 1)
+ self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
+
+ def test_parameter_convergence(self):
+ """Test that the model parameters improve with streaming data."""
+ slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+ slr.setInitialWeights([0.0])
+
+ # Create ten batches with 100 sample points in each.
+ batches = []
+ for i in range(10):
+ batch = LinearDataGenerator.generateLinearInput(
+ 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
+ batches.append(sc.parallelize(batch))
+
+ model_weights = []
+ input_stream = self.ssc.queueStream(batches)
+ input_stream.foreachRDD(
+ lambda x: model_weights.append(slr.latestModel().weights[0]))
+ t = time()
+ slr.trainOn(input_stream)
+ self.ssc.start()
+ self._ssc_wait(t, 10, 0.01)
+
+ model_weights = array(model_weights)
+ diff = model_weights[1:] - model_weights[:-1]
+ self.assertTrue(all(diff >= -0.1))
+
+ def test_prediction(self):
+ """Test prediction on a model with weights already set."""
+ # Create a model with initial Weights equal to coefs
+ slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+ slr.setInitialWeights([10.0, 10.0])
+
+ # Create ten batches with 100 sample points in each.
+ batches = []
+ for i in range(10):
+ batch = LinearDataGenerator.generateLinearInput(
+ 0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
+ 100, 42 + i, 0.1)
+ batches.append(
+ sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
+
+ input_stream = self.ssc.queueStream(batches)
+ t = time()
+ output_stream = slr.predictOnValues(input_stream)
+ samples = []
+ output_stream.foreachRDD(lambda x: samples.append(x.collect()))
+
+ self.ssc.start()
+ self._ssc_wait(t, 5, 0.01)
+
+ # Test that mean absolute error on each batch is less than 0.1
+ for batch in samples:
+ true, predicted = zip(*batch)
+ self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)
+
+ def test_train_prediction(self):
+ """Test that error on test data improves as model is trained."""
+ slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+ slr.setInitialWeights([0.0])
+
+ # Create ten batches with 100 sample points in each.
+ batches = []
+ for i in range(10):
+ batch = LinearDataGenerator.generateLinearInput(
+ 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
+ batches.append(sc.parallelize(batch))
+
+ predict_batches = [
+ b.map(lambda lp: (lp.label, lp.features)) for b in batches]
+ mean_absolute_errors = []
+
+ def func(rdd):
+ true, predicted = zip(*rdd.collect())
+ mean_absolute_errors.append(mean(abs(true) - abs(predicted)))
+
+ model_weights = []
+ input_stream = self.ssc.queueStream(batches)
+ output_stream = self.ssc.queueStream(predict_batches)
+ t = time()
+ slr.trainOn(input_stream)
+ output_stream = slr.predictOnValues(output_stream)
+ output_stream.foreachRDD(func)
+ self.ssc.start()
+ self._ssc_wait(t, 10, 0.01)
+ self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
+
+
if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")