From 54976e55e36465108b71b40b8a431be9d6d703ce Mon Sep 17 00:00:00 2001 From: MechCoder Date: Fri, 19 Jun 2015 12:23:15 -0700 Subject: [SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans Python bindings for StreamingKMeans Will change status to MRG once docs, tests and examples are updated. Author: MechCoder Closes #6499 from MechCoder/spark-4118 and squashes the following commits: 7722d16 [MechCoder] minor style fixes 51052d3 [MechCoder] Doc fixes 2061a76 [MechCoder] Add tests for simultaneous training and prediction Minor style fixes 81482fd [MechCoder] minor 5d9fe61 [MechCoder] predictOn should take into account the latest model 8ab9e89 [MechCoder] Fix Python3 error a9817df [MechCoder] Better tests and minor fixes c80e451 [MechCoder] Add ignore_unicode_prefix ee8ce16 [MechCoder] Update tests, doc and examples 4b1481f [MechCoder] Some changes and tests d8b066a [MechCoder] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans --- python/pyspark/mllib/tests.py | 150 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 149 insertions(+), 1 deletion(-) (limited to 'python/pyspark/mllib/tests.py') diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index c482e6b068..744dc112d9 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -23,8 +23,10 @@ import os import sys import tempfile import array as pyarray +from time import time, sleep -from numpy import array, array_equal, zeros, inf +from numpy import array, array_equal, zeros, inf, all, random +from numpy import sum as array_sum from py4j.protocol import Py4JJavaError if sys.version_info[:2] <= (2, 6): @@ -38,6 +40,7 @@ else: from pyspark import SparkContext from pyspark.mllib.common import _to_java_object_rdd +from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT from pyspark.mllib.regression import LabeledPoint @@ -48,6 +51,7 @@ from pyspark.mllib.feature import IDF from pyspark.mllib.feature import StandardScaler from pyspark.mllib.feature import ElementwiseProduct from pyspark.serializers import PickleSerializer +from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext _have_scipy = False @@ -67,6 +71,20 @@ class MLlibTestCase(unittest.TestCase): self.sc = sc +class MLLibStreamingTestCase(unittest.TestCase): + def setUp(self): + self.sc = sc + self.ssc = StreamingContext(self.sc, 1.0) + + def tearDown(self): + self.ssc.stop(False) + + @staticmethod + def _ssc_wait(start_time, end_time, sleep_time): + while time() - start_time < end_time: + sleep(0.01) + + def _squared_distance(a, b): if isinstance(a, Vector): return a.squared_distance(b) @@ -863,6 +881,136 @@ class ElementwiseProductTests(MLlibTestCase): eprod.transform(sparsevec), SparseVector(3, [0], [3])) +class StreamingKMeansTest(MLLibStreamingTestCase): + def test_model_params(self): + """Test that the model params are set correctly""" + stkm = StreamingKMeans() + stkm.setK(5).setDecayFactor(0.0) + self.assertEquals(stkm._k, 5) + self.assertEquals(stkm._decayFactor, 0.0) + + # Model not set yet. + self.assertIsNone(stkm.latestModel()) + self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0]) + + stkm.setInitialCenters( + centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0]) + self.assertEquals( + stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]]) + self.assertEquals(stkm.latestModel().clusterWeights, [1.0, 1.0]) + + def test_accuracy_for_single_center(self): + """Test that parameters obtained are correct for a single center.""" + centers, batches = self.streamingKMeansDataGenerator( + batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0) + stkm = StreamingKMeans(1) + stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.]) + input_stream = self.ssc.queueStream( + [self.sc.parallelize(batch, 1) for batch in batches]) + stkm.trainOn(input_stream) + + t = time() + self.ssc.start() + self._ssc_wait(t, 10.0, 0.01) + self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) + realCenters = array_sum(array(centers), axis=0) + for i in range(5): + modelCenters = stkm.latestModel().centers[0][i] + self.assertAlmostEqual(centers[0][i], modelCenters, 1) + self.assertAlmostEqual(realCenters[i], modelCenters, 1) + + def streamingKMeansDataGenerator(self, batches, numPoints, + k, d, r, seed, centers=None): + rng = random.RandomState(seed) + + # Generate centers. + centers = [rng.randn(d) for i in range(k)] + + return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d)) + for j in range(numPoints)] + for i in range(batches)] + + def test_trainOn_model(self): + """Test the model on toy data with four clusters.""" + stkm = StreamingKMeans() + initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]] + stkm.setInitialCenters( + centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0]) + + # Create a toy dataset by setting a tiny offest for each point. + offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]] + batches = [] + for offset in offsets: + batches.append([[offset[0] + center[0], offset[1] + center[1]] + for center in initCenters]) + + batches = [self.sc.parallelize(batch, 1) for batch in batches] + input_stream = self.ssc.queueStream(batches) + stkm.trainOn(input_stream) + t = time() + self.ssc.start() + + # Give enough time to train the model. + self._ssc_wait(t, 6.0, 0.01) + finalModel = stkm.latestModel() + self.assertTrue(all(finalModel.centers == array(initCenters))) + self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0]) + + def test_predictOn_model(self): + """Test that the model predicts correctly on toy data.""" + stkm = StreamingKMeans() + stkm._model = StreamingKMeansModel( + clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]], + clusterWeights=[1.0, 1.0, 1.0, 1.0]) + + predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]] + predict_data = [sc.parallelize(batch, 1) for batch in predict_data] + predict_stream = self.ssc.queueStream(predict_data) + predict_val = stkm.predictOn(predict_stream) + + result = [] + + def update(rdd): + rdd_collect = rdd.collect() + if rdd_collect: + result.append(rdd_collect) + + predict_val.foreachRDD(update) + t = time() + self.ssc.start() + self._ssc_wait(t, 6.0, 0.01) + self.assertEquals(result, [[0], [1], [2], [3]]) + + def test_trainOn_predictOn(self): + """Test that prediction happens on the updated model.""" + stkm = StreamingKMeans(decayFactor=0.0, k=2) + stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0]) + + # Since decay factor is set to zero, once the first batch + # is passed the clusterCenters are updated to [-0.5, 0.7] + # which causes 0.2 & 0.3 to be classified as 1, even though the + # classification based in the initial model would have been 0 + # proving that the model is updated. + batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]] + batches = [sc.parallelize(batch) for batch in batches] + input_stream = self.ssc.queueStream(batches) + predict_results = [] + + def collect(rdd): + rdd_collect = rdd.collect() + if rdd_collect: + predict_results.append(rdd_collect) + + stkm.trainOn(input_stream) + predict_stream = stkm.predictOn(input_stream) + predict_stream.foreachRDD(collect) + + t = time() + self.ssc.start() + self._ssc_wait(t, 6.0, 0.01) + self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) + + if __name__ == "__main__": if not _have_scipy: print("NOTE: Skipping SciPy tests as it does not seem to be installed") -- cgit v1.2.3