diff options
author | MechCoder <manojkumarsivaraj334@gmail.com> | 2015-06-30 10:25:59 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-06-30 10:25:59 -0700 |
commit | 45281664e0d3b22cd63660ca8ad6dd574f10e21f (patch) | |
tree | 15ee52c5ef70165f6492ea1f5246c6a0b5d71d1a /python/pyspark/mllib/tests.py | |
parent | ada384b785c663392a0b69fad5bfe7a0a0584ee0 (diff) | |
download | spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.gz spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.tar.bz2 spark-45281664e0d3b22cd63660ca8ad6dd574f10e21f.zip |
[SPARK-4127] [MLLIB] [PYSPARK] Python bindings for StreamingLinearRegressionWithSGD
Python bindings for StreamingLinearRegressionWithSGD
Author: MechCoder <manojkumarsivaraj334@gmail.com>
Closes #6744 from MechCoder/spark-4127 and squashes the following commits:
d8f6457 [MechCoder] Moved StreamingLinearAlgorithm to pyspark.mllib.regression
d47cc24 [MechCoder] Inherit from StreamingLinearAlgorithm
1b4ddd6 [MechCoder] minor
4de6c68 [MechCoder] Minor refactor
5e85a3b [MechCoder] Add tests for simultaneous training and prediction
fb27889 [MechCoder] Add example and docs
505380b [MechCoder] Add tests
d42bdae [MechCoder] [SPARK-4127] Python bindings for StreamingLinearRegressionWithSGD
Diffstat (limited to 'python/pyspark/mllib/tests.py')
-rw-r--r-- | python/pyspark/mllib/tests.py | 124 |
1 files changed, 122 insertions, 2 deletions
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index cd80c3e07a..f0091d6fac 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -27,8 +27,9 @@ from time import time, sleep from shutil import rmtree from numpy import ( - array, array_equal, zeros, inf, random, exp, dot, all, mean) + array, array_equal, zeros, inf, random, exp, dot, all, mean, abs) from numpy import sum as array_sum + from py4j.protocol import Py4JJavaError if sys.version_info[:2] <= (2, 6): @@ -45,8 +46,8 @@ from pyspark.mllib.common import _to_java_object_rdd from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT -from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD +from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD from pyspark.mllib.random import RandomRDDs from pyspark.mllib.stat import Statistics from pyspark.mllib.feature import Word2Vec @@ -56,6 +57,7 @@ from pyspark.mllib.util import LinearDataGenerator from pyspark.serializers import PickleSerializer from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext +from pyspark.streaming import StreamingContext _have_scipy = False try: @@ -1170,6 +1172,124 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase): self.assertTrue(errors[1] - errors[-1] > 0.3) +class StreamingLinearRegressionWithTests(MLLibStreamingTestCase): + + def assertArrayAlmostEqual(self, array1, array2, dec): + for i, j in array1, array2: + self.assertAlmostEqual(i, j, dec) + + def test_parameter_accuracy(self): + """Test that coefs are predicted accurately by fitting on toy data.""" + + # Test that fitting (10*X1 + 10*X2), (X1, X2) gives coefficients + # (10, 10) + slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25) + slr.setInitialWeights([0.0, 0.0]) + xMean = [0.0, 0.0] + xVariance = [1.0 / 3.0, 1.0 / 3.0] + + # Create ten batches with 100 sample points in each. + batches = [] + for i in range(10): + batch = LinearDataGenerator.generateLinearInput( + 0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1) + batches.append(sc.parallelize(batch)) + + input_stream = self.ssc.queueStream(batches) + t = time() + slr.trainOn(input_stream) + self.ssc.start() + self._ssc_wait(t, 10, 0.01) + self.assertArrayAlmostEqual( + slr.latestModel().weights.array, [10., 10.], 1) + self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1) + + def test_parameter_convergence(self): + """Test that the model parameters improve with streaming data.""" + slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25) + slr.setInitialWeights([0.0]) + + # Create ten batches with 100 sample points in each. + batches = [] + for i in range(10): + batch = LinearDataGenerator.generateLinearInput( + 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1) + batches.append(sc.parallelize(batch)) + + model_weights = [] + input_stream = self.ssc.queueStream(batches) + input_stream.foreachRDD( + lambda x: model_weights.append(slr.latestModel().weights[0])) + t = time() + slr.trainOn(input_stream) + self.ssc.start() + self._ssc_wait(t, 10, 0.01) + + model_weights = array(model_weights) + diff = model_weights[1:] - model_weights[:-1] + self.assertTrue(all(diff >= -0.1)) + + def test_prediction(self): + """Test prediction on a model with weights already set.""" + # Create a model with initial Weights equal to coefs + slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25) + slr.setInitialWeights([10.0, 10.0]) + + # Create ten batches with 100 sample points in each. + batches = [] + for i in range(10): + batch = LinearDataGenerator.generateLinearInput( + 0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0], + 100, 42 + i, 0.1) + batches.append( + sc.parallelize(batch).map(lambda lp: (lp.label, lp.features))) + + input_stream = self.ssc.queueStream(batches) + t = time() + output_stream = slr.predictOnValues(input_stream) + samples = [] + output_stream.foreachRDD(lambda x: samples.append(x.collect())) + + self.ssc.start() + self._ssc_wait(t, 5, 0.01) + + # Test that mean absolute error on each batch is less than 0.1 + for batch in samples: + true, predicted = zip(*batch) + self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1) + + def test_train_prediction(self): + """Test that error on test data improves as model is trained.""" + slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25) + slr.setInitialWeights([0.0]) + + # Create ten batches with 100 sample points in each. + batches = [] + for i in range(10): + batch = LinearDataGenerator.generateLinearInput( + 0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1) + batches.append(sc.parallelize(batch)) + + predict_batches = [ + b.map(lambda lp: (lp.label, lp.features)) for b in batches] + mean_absolute_errors = [] + + def func(rdd): + true, predicted = zip(*rdd.collect()) + mean_absolute_errors.append(mean(abs(true) - abs(predicted))) + + model_weights = [] + input_stream = self.ssc.queueStream(batches) + output_stream = self.ssc.queueStream(predict_batches) + t = time() + slr.trainOn(input_stream) + output_stream = slr.predictOnValues(output_stream) + output_stream.foreachRDD(func) + self.ssc.start() + self._ssc_wait(t, 10, 0.01) + self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2) + + if __name__ == "__main__": if not _have_scipy: print("NOTE: Skipping SciPy tests as it does not seem to be installed") |