aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-19 12:23:15 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-19 12:23:15 -0700
commit54976e55e36465108b71b40b8a431be9d6d703ce (patch)
treecacad3ffa4f48e89ce575684272456e7b3931937 /python/pyspark/mllib
parente41e2fd6c61076f870de03b85c5da6c12b8da038 (diff)
downloadspark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.gz
spark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.bz2
spark-54976e55e36465108b71b40b8a431be9d6d703ce.zip
[SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans
Python bindings for StreamingKMeans Will change status to MRG once docs, tests and examples are updated. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6499 from MechCoder/spark-4118 and squashes the following commits: 7722d16 [MechCoder] minor style fixes 51052d3 [MechCoder] Doc fixes 2061a76 [MechCoder] Add tests for simultaneous training and prediction Minor style fixes 81482fd [MechCoder] minor 5d9fe61 [MechCoder] predictOn should take into account the latest model 8ab9e89 [MechCoder] Fix Python3 error a9817df [MechCoder] Better tests and minor fixes c80e451 [MechCoder] Add ignore_unicode_prefix ee8ce16 [MechCoder] Update tests, doc and examples 4b1481f [MechCoder] Some changes and tests d8b066a [MechCoder] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans
Diffstat (limited to 'python/pyspark/mllib')
-rw-r--r--python/pyspark/mllib/clustering.py207
-rw-r--r--python/pyspark/mllib/tests.py150
2 files changed, 352 insertions, 5 deletions
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index b55583f822..c38229864d 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -21,16 +21,20 @@ import array as pyarray
if sys.version > '3':
xrange = range
-from numpy import array
+from math import exp, log
+
+from numpy import array, random, tile
-from pyspark import RDD
from pyspark import SparkContext
+from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc
+from pyspark.streaming import DStream
-__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture']
+__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
+ 'StreamingKMeans', 'StreamingKMeansModel']
@inherit_doc
@@ -98,6 +102,9 @@ class KMeansModel(Saveable, Loader):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = float("inf")
+ if isinstance(x, RDD):
+ return x.map(self.predict)
+
x = _convert_to_vector(x)
for i in xrange(len(self.centers)):
distance = x.squared_distance(self.centers[i])
@@ -264,6 +271,198 @@ class GaussianMixture(object):
return GaussianMixtureModel(weight, mvg_obj)
+class StreamingKMeansModel(KMeansModel):
+ """
+ .. note:: Experimental
+
+ Clustering model which can perform an online update of the centroids.
+
+ The update formula for each centroid is given by
+
+ * c_t+1 = ((c_t * n_t * a) + (x_t * m_t)) / (n_t + m_t)
+ * n_t+1 = n_t * a + m_t
+
+ where
+
+ * c_t: Centroid at the n_th iteration.
+ * n_t: Number of samples (or) weights associated with the centroid
+ at the n_th iteration.
+ * x_t: Centroid of the new data closest to c_t.
+ * m_t: Number of samples (or) weights of the new data closest to c_t
+ * c_t+1: New centroid.
+ * n_t+1: New number of weights.
+ * a: Decay Factor, which gives the forgetfulness.
+
+ Note that if a is set to 1, it is the weighted mean of the previous
+ and new data. If it set to zero, the old centroids are completely
+ forgotten.
+
+ :param clusterCenters: Initial cluster centers.
+ :param clusterWeights: List of weights assigned to each cluster.
+
+ >>> initCenters = [[0.0, 0.0], [1.0, 1.0]]
+ >>> initWeights = [1.0, 1.0]
+ >>> stkm = StreamingKMeansModel(initCenters, initWeights)
+ >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1],
+ ... [0.9, 0.9], [1.1, 1.1]])
+ >>> stkm = stkm.update(data, 1.0, u"batches")
+ >>> stkm.centers
+ array([[ 0., 0.],
+ [ 1., 1.]])
+ >>> stkm.predict([-0.1, -0.1])
+ 0
+ >>> stkm.predict([0.9, 0.9])
+ 1
+ >>> stkm.clusterWeights
+ [3.0, 3.0]
+ >>> decayFactor = 0.0
+ >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])])
+ >>> stkm = stkm.update(data, 0.0, u"batches")
+ >>> stkm.centers
+ array([[ 0.2, 0.2],
+ [ 1.5, 1.5]])
+ >>> stkm.clusterWeights
+ [1.0, 1.0]
+ >>> stkm.predict([0.2, 0.2])
+ 0
+ >>> stkm.predict([1.5, 1.5])
+ 1
+ """
+ def __init__(self, clusterCenters, clusterWeights):
+ super(StreamingKMeansModel, self).__init__(centers=clusterCenters)
+ self._clusterWeights = list(clusterWeights)
+
+ @property
+ def clusterWeights(self):
+ """Return the cluster weights."""
+ return self._clusterWeights
+
+ @ignore_unicode_prefix
+ def update(self, data, decayFactor, timeUnit):
+ """Update the centroids, according to data
+
+ :param data: Should be a RDD that represents the new data.
+ :param decayFactor: forgetfulness of the previous centroids.
+ :param timeUnit: Can be "batches" or "points". If points, then the
+ decay factor is raised to the power of number of new
+ points and if batches, it is used as it is.
+ """
+ if not isinstance(data, RDD):
+ raise TypeError("Data should be of an RDD, got %s." % type(data))
+ data = data.map(_convert_to_vector)
+ decayFactor = float(decayFactor)
+ if timeUnit not in ["batches", "points"]:
+ raise ValueError(
+ "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
+ vectorCenters = [_convert_to_vector(center) for center in self.centers]
+ updatedModel = callMLlibFunc(
+ "updateStreamingKMeansModel", vectorCenters, self._clusterWeights,
+ data, decayFactor, timeUnit)
+ self.centers = array(updatedModel[0])
+ self._clusterWeights = list(updatedModel[1])
+ return self
+
+
+class StreamingKMeans(object):
+ """
+ .. note:: Experimental
+
+ Provides methods to set k, decayFactor, timeUnit to configure the
+ KMeans algorithm for fitting and predicting on incoming dstreams.
+ More details on how the centroids are updated are provided under the
+ docs of StreamingKMeansModel.
+
+ :param k: int, number of clusters
+ :param decayFactor: float, forgetfulness of the previous centroids.
+ :param timeUnit: can be "batches" or "points". If points, then the
+ decayfactor is raised to the power of no. of new points.
+ """
+ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
+ self._k = k
+ self._decayFactor = decayFactor
+ if timeUnit not in ["batches", "points"]:
+ raise ValueError(
+ "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
+ self._timeUnit = timeUnit
+ self._model = None
+
+ def latestModel(self):
+ """Return the latest model"""
+ return self._model
+
+ def _validate(self, dstream):
+ if self._model is None:
+ raise ValueError(
+ "Initial centers should be set either by setInitialCenters "
+ "or setRandomCenters.")
+ if not isinstance(dstream, DStream):
+ raise TypeError(
+ "Expected dstream to be of type DStream, "
+ "got type %s" % type(dstream))
+
+ def setK(self, k):
+ """Set number of clusters."""
+ self._k = k
+ return self
+
+ def setDecayFactor(self, decayFactor):
+ """Set decay factor."""
+ self._decayFactor = decayFactor
+ return self
+
+ def setHalfLife(self, halfLife, timeUnit):
+ """
+ Set number of batches after which the centroids of that
+ particular batch has half the weightage.
+ """
+ self._timeUnit = timeUnit
+ self._decayFactor = exp(log(0.5) / halfLife)
+ return self
+
+ def setInitialCenters(self, centers, weights):
+ """
+ Set initial centers. Should be set before calling trainOn.
+ """
+ self._model = StreamingKMeansModel(centers, weights)
+ return self
+
+ def setRandomCenters(self, dim, weight, seed):
+ """
+ Set the initial centres to be random samples from
+ a gaussian population with constant weights.
+ """
+ rng = random.RandomState(seed)
+ clusterCenters = rng.randn(self._k, dim)
+ clusterWeights = tile(weight, self._k)
+ self._model = StreamingKMeansModel(clusterCenters, clusterWeights)
+ return self
+
+ def trainOn(self, dstream):
+ """Train the model on the incoming dstream."""
+ self._validate(dstream)
+
+ def update(rdd):
+ self._model.update(rdd, self._decayFactor, self._timeUnit)
+
+ dstream.foreachRDD(update)
+
+ def predictOn(self, dstream):
+ """
+ Make predictions on a dstream.
+ Returns a transformed dstream object
+ """
+ self._validate(dstream)
+ return dstream.map(lambda x: self._model.predict(x))
+
+ def predictOnValues(self, dstream):
+ """
+ Make predictions on a keyed dstream.
+ Returns a transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.mapValues(lambda x: self._model.predict(x))
+
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index c482e6b068..744dc112d9 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -23,8 +23,10 @@ import os
import sys
import tempfile
import array as pyarray
+from time import time, sleep
-from numpy import array, array_equal, zeros, inf
+from numpy import array, array_equal, zeros, inf, all, random
+from numpy import sum as array_sum
from py4j.protocol import Py4JJavaError
if sys.version_info[:2] <= (2, 6):
@@ -38,6 +40,7 @@ else:
from pyspark import SparkContext
from pyspark.mllib.common import _to_java_object_rdd
+from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.regression import LabeledPoint
@@ -48,6 +51,7 @@ from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.serializers import PickleSerializer
+from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
_have_scipy = False
@@ -67,6 +71,20 @@ class MLlibTestCase(unittest.TestCase):
self.sc = sc
+class MLLibStreamingTestCase(unittest.TestCase):
+ def setUp(self):
+ self.sc = sc
+ self.ssc = StreamingContext(self.sc, 1.0)
+
+ def tearDown(self):
+ self.ssc.stop(False)
+
+ @staticmethod
+ def _ssc_wait(start_time, end_time, sleep_time):
+ while time() - start_time < end_time:
+ sleep(0.01)
+
+
def _squared_distance(a, b):
if isinstance(a, Vector):
return a.squared_distance(b)
@@ -863,6 +881,136 @@ class ElementwiseProductTests(MLlibTestCase):
eprod.transform(sparsevec), SparseVector(3, [0], [3]))
+class StreamingKMeansTest(MLLibStreamingTestCase):
+ def test_model_params(self):
+ """Test that the model params are set correctly"""
+ stkm = StreamingKMeans()
+ stkm.setK(5).setDecayFactor(0.0)
+ self.assertEquals(stkm._k, 5)
+ self.assertEquals(stkm._decayFactor, 0.0)
+
+ # Model not set yet.
+ self.assertIsNone(stkm.latestModel())
+ self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])
+
+ stkm.setInitialCenters(
+ centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0])
+ self.assertEquals(
+ stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]])
+ self.assertEquals(stkm.latestModel().clusterWeights, [1.0, 1.0])
+
+ def test_accuracy_for_single_center(self):
+ """Test that parameters obtained are correct for a single center."""
+ centers, batches = self.streamingKMeansDataGenerator(
+ batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0)
+ stkm = StreamingKMeans(1)
+ stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
+ input_stream = self.ssc.queueStream(
+ [self.sc.parallelize(batch, 1) for batch in batches])
+ stkm.trainOn(input_stream)
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 10.0, 0.01)
+ self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+ realCenters = array_sum(array(centers), axis=0)
+ for i in range(5):
+ modelCenters = stkm.latestModel().centers[0][i]
+ self.assertAlmostEqual(centers[0][i], modelCenters, 1)
+ self.assertAlmostEqual(realCenters[i], modelCenters, 1)
+
+ def streamingKMeansDataGenerator(self, batches, numPoints,
+ k, d, r, seed, centers=None):
+ rng = random.RandomState(seed)
+
+ # Generate centers.
+ centers = [rng.randn(d) for i in range(k)]
+
+ return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
+ for j in range(numPoints)]
+ for i in range(batches)]
+
+ def test_trainOn_model(self):
+ """Test the model on toy data with four clusters."""
+ stkm = StreamingKMeans()
+ initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
+ stkm.setInitialCenters(
+ centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
+
+ # Create a toy dataset by setting a tiny offest for each point.
+ offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
+ batches = []
+ for offset in offsets:
+ batches.append([[offset[0] + center[0], offset[1] + center[1]]
+ for center in initCenters])
+
+ batches = [self.sc.parallelize(batch, 1) for batch in batches]
+ input_stream = self.ssc.queueStream(batches)
+ stkm.trainOn(input_stream)
+ t = time()
+ self.ssc.start()
+
+ # Give enough time to train the model.
+ self._ssc_wait(t, 6.0, 0.01)
+ finalModel = stkm.latestModel()
+ self.assertTrue(all(finalModel.centers == array(initCenters)))
+ self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+
+ def test_predictOn_model(self):
+ """Test that the model predicts correctly on toy data."""
+ stkm = StreamingKMeans()
+ stkm._model = StreamingKMeansModel(
+ clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]],
+ clusterWeights=[1.0, 1.0, 1.0, 1.0])
+
+ predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
+ predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
+ predict_stream = self.ssc.queueStream(predict_data)
+ predict_val = stkm.predictOn(predict_stream)
+
+ result = []
+
+ def update(rdd):
+ rdd_collect = rdd.collect()
+ if rdd_collect:
+ result.append(rdd_collect)
+
+ predict_val.foreachRDD(update)
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 6.0, 0.01)
+ self.assertEquals(result, [[0], [1], [2], [3]])
+
+ def test_trainOn_predictOn(self):
+ """Test that prediction happens on the updated model."""
+ stkm = StreamingKMeans(decayFactor=0.0, k=2)
+ stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
+
+ # Since decay factor is set to zero, once the first batch
+ # is passed the clusterCenters are updated to [-0.5, 0.7]
+ # which causes 0.2 & 0.3 to be classified as 1, even though the
+ # classification based in the initial model would have been 0
+ # proving that the model is updated.
+ batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
+ batches = [sc.parallelize(batch) for batch in batches]
+ input_stream = self.ssc.queueStream(batches)
+ predict_results = []
+
+ def collect(rdd):
+ rdd_collect = rdd.collect()
+ if rdd_collect:
+ predict_results.append(rdd_collect)
+
+ stkm.trainOn(input_stream)
+ predict_stream = stkm.predictOn(input_stream)
+ predict_stream.foreachRDD(collect)
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 6.0, 0.01)
+ self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+
+
if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")