aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/tests.py
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-19 12:23:15 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-19 12:23:15 -0700
commit54976e55e36465108b71b40b8a431be9d6d703ce (patch)
treecacad3ffa4f48e89ce575684272456e7b3931937 /python/pyspark/mllib/tests.py
parente41e2fd6c61076f870de03b85c5da6c12b8da038 (diff)
downloadspark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.gz
spark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.bz2
spark-54976e55e36465108b71b40b8a431be9d6d703ce.zip
[SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans
Python bindings for StreamingKMeans Will change status to MRG once docs, tests and examples are updated. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6499 from MechCoder/spark-4118 and squashes the following commits: 7722d16 [MechCoder] minor style fixes 51052d3 [MechCoder] Doc fixes 2061a76 [MechCoder] Add tests for simultaneous training and prediction Minor style fixes 81482fd [MechCoder] minor 5d9fe61 [MechCoder] predictOn should take into account the latest model 8ab9e89 [MechCoder] Fix Python3 error a9817df [MechCoder] Better tests and minor fixes c80e451 [MechCoder] Add ignore_unicode_prefix ee8ce16 [MechCoder] Update tests, doc and examples 4b1481f [MechCoder] Some changes and tests d8b066a [MechCoder] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans
Diffstat (limited to 'python/pyspark/mllib/tests.py')
-rw-r--r--python/pyspark/mllib/tests.py150
1 files changed, 149 insertions, 1 deletions
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index c482e6b068..744dc112d9 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -23,8 +23,10 @@ import os
import sys
import tempfile
import array as pyarray
+from time import time, sleep
-from numpy import array, array_equal, zeros, inf
+from numpy import array, array_equal, zeros, inf, all, random
+from numpy import sum as array_sum
from py4j.protocol import Py4JJavaError
if sys.version_info[:2] <= (2, 6):
@@ -38,6 +40,7 @@ else:
from pyspark import SparkContext
from pyspark.mllib.common import _to_java_object_rdd
+from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.regression import LabeledPoint
@@ -48,6 +51,7 @@ from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.serializers import PickleSerializer
+from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
_have_scipy = False
@@ -67,6 +71,20 @@ class MLlibTestCase(unittest.TestCase):
self.sc = sc
+class MLLibStreamingTestCase(unittest.TestCase):
+ def setUp(self):
+ self.sc = sc
+ self.ssc = StreamingContext(self.sc, 1.0)
+
+ def tearDown(self):
+ self.ssc.stop(False)
+
+ @staticmethod
+ def _ssc_wait(start_time, end_time, sleep_time):
+ while time() - start_time < end_time:
+ sleep(0.01)
+
+
def _squared_distance(a, b):
if isinstance(a, Vector):
return a.squared_distance(b)
@@ -863,6 +881,136 @@ class ElementwiseProductTests(MLlibTestCase):
eprod.transform(sparsevec), SparseVector(3, [0], [3]))
+class StreamingKMeansTest(MLLibStreamingTestCase):
+ def test_model_params(self):
+ """Test that the model params are set correctly"""
+ stkm = StreamingKMeans()
+ stkm.setK(5).setDecayFactor(0.0)
+ self.assertEquals(stkm._k, 5)
+ self.assertEquals(stkm._decayFactor, 0.0)
+
+ # Model not set yet.
+ self.assertIsNone(stkm.latestModel())
+ self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])
+
+ stkm.setInitialCenters(
+ centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0])
+ self.assertEquals(
+ stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]])
+ self.assertEquals(stkm.latestModel().clusterWeights, [1.0, 1.0])
+
+ def test_accuracy_for_single_center(self):
+ """Test that parameters obtained are correct for a single center."""
+ centers, batches = self.streamingKMeansDataGenerator(
+ batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0)
+ stkm = StreamingKMeans(1)
+ stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
+ input_stream = self.ssc.queueStream(
+ [self.sc.parallelize(batch, 1) for batch in batches])
+ stkm.trainOn(input_stream)
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 10.0, 0.01)
+ self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+ realCenters = array_sum(array(centers), axis=0)
+ for i in range(5):
+ modelCenters = stkm.latestModel().centers[0][i]
+ self.assertAlmostEqual(centers[0][i], modelCenters, 1)
+ self.assertAlmostEqual(realCenters[i], modelCenters, 1)
+
+ def streamingKMeansDataGenerator(self, batches, numPoints,
+ k, d, r, seed, centers=None):
+ rng = random.RandomState(seed)
+
+ # Generate centers.
+ centers = [rng.randn(d) for i in range(k)]
+
+ return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
+ for j in range(numPoints)]
+ for i in range(batches)]
+
+ def test_trainOn_model(self):
+ """Test the model on toy data with four clusters."""
+ stkm = StreamingKMeans()
+ initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
+ stkm.setInitialCenters(
+ centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
+
+ # Create a toy dataset by setting a tiny offest for each point.
+ offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
+ batches = []
+ for offset in offsets:
+ batches.append([[offset[0] + center[0], offset[1] + center[1]]
+ for center in initCenters])
+
+ batches = [self.sc.parallelize(batch, 1) for batch in batches]
+ input_stream = self.ssc.queueStream(batches)
+ stkm.trainOn(input_stream)
+ t = time()
+ self.ssc.start()
+
+ # Give enough time to train the model.
+ self._ssc_wait(t, 6.0, 0.01)
+ finalModel = stkm.latestModel()
+ self.assertTrue(all(finalModel.centers == array(initCenters)))
+ self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+
+ def test_predictOn_model(self):
+ """Test that the model predicts correctly on toy data."""
+ stkm = StreamingKMeans()
+ stkm._model = StreamingKMeansModel(
+ clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]],
+ clusterWeights=[1.0, 1.0, 1.0, 1.0])
+
+ predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
+ predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
+ predict_stream = self.ssc.queueStream(predict_data)
+ predict_val = stkm.predictOn(predict_stream)
+
+ result = []
+
+ def update(rdd):
+ rdd_collect = rdd.collect()
+ if rdd_collect:
+ result.append(rdd_collect)
+
+ predict_val.foreachRDD(update)
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 6.0, 0.01)
+ self.assertEquals(result, [[0], [1], [2], [3]])
+
+ def test_trainOn_predictOn(self):
+ """Test that prediction happens on the updated model."""
+ stkm = StreamingKMeans(decayFactor=0.0, k=2)
+ stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
+
+ # Since decay factor is set to zero, once the first batch
+ # is passed the clusterCenters are updated to [-0.5, 0.7]
+ # which causes 0.2 & 0.3 to be classified as 1, even though the
+ # classification based in the initial model would have been 0
+ # proving that the model is updated.
+ batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
+ batches = [sc.parallelize(batch) for batch in batches]
+ input_stream = self.ssc.queueStream(batches)
+ predict_results = []
+
+ def collect(rdd):
+ rdd_collect = rdd.collect()
+ if rdd_collect:
+ predict_results.append(rdd_collect)
+
+ stkm.trainOn(input_stream)
+ predict_stream = stkm.predictOn(input_stream)
+ predict_stream.foreachRDD(collect)
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 6.0, 0.01)
+ self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+
+
if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")