aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-06-19 12:23:15 -0700
committerXiangrui Meng <meng@databricks.com>2015-06-19 12:23:15 -0700
commit54976e55e36465108b71b40b8a431be9d6d703ce (patch)
treecacad3ffa4f48e89ce575684272456e7b3931937
parente41e2fd6c61076f870de03b85c5da6c12b8da038 (diff)
downloadspark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.gz
spark-54976e55e36465108b71b40b8a431be9d6d703ce.tar.bz2
spark-54976e55e36465108b71b40b8a431be9d6d703ce.zip
[SPARK-4118] [MLLIB] [PYSPARK] Python bindings for StreamingKMeans
Python bindings for StreamingKMeans Will change status to MRG once docs, tests and examples are updated. Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #6499 from MechCoder/spark-4118 and squashes the following commits: 7722d16 [MechCoder] minor style fixes 51052d3 [MechCoder] Doc fixes 2061a76 [MechCoder] Add tests for simultaneous training and prediction Minor style fixes 81482fd [MechCoder] minor 5d9fe61 [MechCoder] predictOn should take into account the latest model 8ab9e89 [MechCoder] Fix Python3 error a9817df [MechCoder] Better tests and minor fixes c80e451 [MechCoder] Add ignore_unicode_prefix ee8ce16 [MechCoder] Update tests, doc and examples 4b1481f [MechCoder] Some changes and tests d8b066a [MechCoder] [SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans
-rw-r--r--docs/mllib-clustering.md48
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala15
-rw-r--r--python/pyspark/mllib/clustering.py207
-rw-r--r--python/pyspark/mllib/tests.py150
4 files changed, 411 insertions, 9 deletions
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 1b088969dd..dcaa3784be 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -593,6 +593,50 @@ ssc.start()
ssc.awaitTermination()
{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+First we import the neccessary classes.
+
+{% highlight python %}
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.clustering import StreamingKMeans
+{% endhighlight %}
+
+Then we make an input stream of vectors for training, as well as a stream of labeled data
+points for testing. We assume a StreamingContext `ssc` has been created, see
+[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info.
+
+{% highlight python %}
+def parse(lp):
+ label = float(lp[lp.find('(') + 1: lp.find(',')])
+ vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
+ return LabeledPoint(label, vec)
+
+trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
+testData = ssc.textFileStream("/testing/data/dir").map(parse)
+{% endhighlight %}
+
+We create a model with random clusters and specify the number of clusters to find
+
+{% highlight python %}
+model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
+{% endhighlight %}
+
+Now register the streams for training and testing and start the job, printing
+the predicted cluster assignments on new data points as they arrive.
+
+{% highlight python %}
+model.trainOn(trainingData)
+print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
+
+ssc.start()
+ssc.awaitTermination()
+{% endhighlight %}
+</div>
+
+</div>
As you add new text files with data the cluster centers will update. Each training
point should be formatted as `[x1, x2, x3]`, and each test data point
@@ -600,7 +644,3 @@ should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or id
(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir`
you will see predictions. With new data, the cluster centers will change!
-
-</div>
-
-</div>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 1812b3ac7c..2897865af6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -964,6 +964,21 @@ private[python] class PythonMLLibAPI extends Serializable {
points.asScala.toArray)
}
+ /**
+ * Java stub for the update method of StreamingKMeansModel.
+ */
+ def updateStreamingKMeansModel(
+ clusterCenters: JList[Vector],
+ clusterWeights: JList[Double],
+ data: JavaRDD[Vector],
+ decayFactor: Double,
+ timeUnit: String): JList[Object] = {
+ val model = new StreamingKMeansModel(
+ clusterCenters.asScala.toArray, clusterWeights.asScala.toArray)
+ .update(data, decayFactor, timeUnit)
+ List[AnyRef](model.clusterCenters, Vectors.dense(model.clusterWeights)).asJava
+ }
+
}
/**
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index b55583f822..c38229864d 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -21,16 +21,20 @@ import array as pyarray
if sys.version > '3':
xrange = range
-from numpy import array
+from math import exp, log
+
+from numpy import array, random, tile
-from pyspark import RDD
from pyspark import SparkContext
+from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc
+from pyspark.streaming import DStream
-__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture']
+__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
+ 'StreamingKMeans', 'StreamingKMeansModel']
@inherit_doc
@@ -98,6 +102,9 @@ class KMeansModel(Saveable, Loader):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = float("inf")
+ if isinstance(x, RDD):
+ return x.map(self.predict)
+
x = _convert_to_vector(x)
for i in xrange(len(self.centers)):
distance = x.squared_distance(self.centers[i])
@@ -264,6 +271,198 @@ class GaussianMixture(object):
return GaussianMixtureModel(weight, mvg_obj)
+class StreamingKMeansModel(KMeansModel):
+ """
+ .. note:: Experimental
+
+ Clustering model which can perform an online update of the centroids.
+
+ The update formula for each centroid is given by
+
+ * c_t+1 = ((c_t * n_t * a) + (x_t * m_t)) / (n_t + m_t)
+ * n_t+1 = n_t * a + m_t
+
+ where
+
+ * c_t: Centroid at the n_th iteration.
+ * n_t: Number of samples (or) weights associated with the centroid
+ at the n_th iteration.
+ * x_t: Centroid of the new data closest to c_t.
+ * m_t: Number of samples (or) weights of the new data closest to c_t
+ * c_t+1: New centroid.
+ * n_t+1: New number of weights.
+ * a: Decay Factor, which gives the forgetfulness.
+
+ Note that if a is set to 1, it is the weighted mean of the previous
+ and new data. If it set to zero, the old centroids are completely
+ forgotten.
+
+ :param clusterCenters: Initial cluster centers.
+ :param clusterWeights: List of weights assigned to each cluster.
+
+ >>> initCenters = [[0.0, 0.0], [1.0, 1.0]]
+ >>> initWeights = [1.0, 1.0]
+ >>> stkm = StreamingKMeansModel(initCenters, initWeights)
+ >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1],
+ ... [0.9, 0.9], [1.1, 1.1]])
+ >>> stkm = stkm.update(data, 1.0, u"batches")
+ >>> stkm.centers
+ array([[ 0., 0.],
+ [ 1., 1.]])
+ >>> stkm.predict([-0.1, -0.1])
+ 0
+ >>> stkm.predict([0.9, 0.9])
+ 1
+ >>> stkm.clusterWeights
+ [3.0, 3.0]
+ >>> decayFactor = 0.0
+ >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])])
+ >>> stkm = stkm.update(data, 0.0, u"batches")
+ >>> stkm.centers
+ array([[ 0.2, 0.2],
+ [ 1.5, 1.5]])
+ >>> stkm.clusterWeights
+ [1.0, 1.0]
+ >>> stkm.predict([0.2, 0.2])
+ 0
+ >>> stkm.predict([1.5, 1.5])
+ 1
+ """
+ def __init__(self, clusterCenters, clusterWeights):
+ super(StreamingKMeansModel, self).__init__(centers=clusterCenters)
+ self._clusterWeights = list(clusterWeights)
+
+ @property
+ def clusterWeights(self):
+ """Return the cluster weights."""
+ return self._clusterWeights
+
+ @ignore_unicode_prefix
+ def update(self, data, decayFactor, timeUnit):
+ """Update the centroids, according to data
+
+ :param data: Should be a RDD that represents the new data.
+ :param decayFactor: forgetfulness of the previous centroids.
+ :param timeUnit: Can be "batches" or "points". If points, then the
+ decay factor is raised to the power of number of new
+ points and if batches, it is used as it is.
+ """
+ if not isinstance(data, RDD):
+ raise TypeError("Data should be of an RDD, got %s." % type(data))
+ data = data.map(_convert_to_vector)
+ decayFactor = float(decayFactor)
+ if timeUnit not in ["batches", "points"]:
+ raise ValueError(
+ "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
+ vectorCenters = [_convert_to_vector(center) for center in self.centers]
+ updatedModel = callMLlibFunc(
+ "updateStreamingKMeansModel", vectorCenters, self._clusterWeights,
+ data, decayFactor, timeUnit)
+ self.centers = array(updatedModel[0])
+ self._clusterWeights = list(updatedModel[1])
+ return self
+
+
+class StreamingKMeans(object):
+ """
+ .. note:: Experimental
+
+ Provides methods to set k, decayFactor, timeUnit to configure the
+ KMeans algorithm for fitting and predicting on incoming dstreams.
+ More details on how the centroids are updated are provided under the
+ docs of StreamingKMeansModel.
+
+ :param k: int, number of clusters
+ :param decayFactor: float, forgetfulness of the previous centroids.
+ :param timeUnit: can be "batches" or "points". If points, then the
+ decayfactor is raised to the power of no. of new points.
+ """
+ def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"):
+ self._k = k
+ self._decayFactor = decayFactor
+ if timeUnit not in ["batches", "points"]:
+ raise ValueError(
+ "timeUnit should be 'batches' or 'points', got %s." % timeUnit)
+ self._timeUnit = timeUnit
+ self._model = None
+
+ def latestModel(self):
+ """Return the latest model"""
+ return self._model
+
+ def _validate(self, dstream):
+ if self._model is None:
+ raise ValueError(
+ "Initial centers should be set either by setInitialCenters "
+ "or setRandomCenters.")
+ if not isinstance(dstream, DStream):
+ raise TypeError(
+ "Expected dstream to be of type DStream, "
+ "got type %s" % type(dstream))
+
+ def setK(self, k):
+ """Set number of clusters."""
+ self._k = k
+ return self
+
+ def setDecayFactor(self, decayFactor):
+ """Set decay factor."""
+ self._decayFactor = decayFactor
+ return self
+
+ def setHalfLife(self, halfLife, timeUnit):
+ """
+ Set number of batches after which the centroids of that
+ particular batch has half the weightage.
+ """
+ self._timeUnit = timeUnit
+ self._decayFactor = exp(log(0.5) / halfLife)
+ return self
+
+ def setInitialCenters(self, centers, weights):
+ """
+ Set initial centers. Should be set before calling trainOn.
+ """
+ self._model = StreamingKMeansModel(centers, weights)
+ return self
+
+ def setRandomCenters(self, dim, weight, seed):
+ """
+ Set the initial centres to be random samples from
+ a gaussian population with constant weights.
+ """
+ rng = random.RandomState(seed)
+ clusterCenters = rng.randn(self._k, dim)
+ clusterWeights = tile(weight, self._k)
+ self._model = StreamingKMeansModel(clusterCenters, clusterWeights)
+ return self
+
+ def trainOn(self, dstream):
+ """Train the model on the incoming dstream."""
+ self._validate(dstream)
+
+ def update(rdd):
+ self._model.update(rdd, self._decayFactor, self._timeUnit)
+
+ dstream.foreachRDD(update)
+
+ def predictOn(self, dstream):
+ """
+ Make predictions on a dstream.
+ Returns a transformed dstream object
+ """
+ self._validate(dstream)
+ return dstream.map(lambda x: self._model.predict(x))
+
+ def predictOnValues(self, dstream):
+ """
+ Make predictions on a keyed dstream.
+ Returns a transformed dstream object.
+ """
+ self._validate(dstream)
+ return dstream.mapValues(lambda x: self._model.predict(x))
+
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index c482e6b068..744dc112d9 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -23,8 +23,10 @@ import os
import sys
import tempfile
import array as pyarray
+from time import time, sleep
-from numpy import array, array_equal, zeros, inf
+from numpy import array, array_equal, zeros, inf, all, random
+from numpy import sum as array_sum
from py4j.protocol import Py4JJavaError
if sys.version_info[:2] <= (2, 6):
@@ -38,6 +40,7 @@ else:
from pyspark import SparkContext
from pyspark.mllib.common import _to_java_object_rdd
+from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.regression import LabeledPoint
@@ -48,6 +51,7 @@ from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.serializers import PickleSerializer
+from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
_have_scipy = False
@@ -67,6 +71,20 @@ class MLlibTestCase(unittest.TestCase):
self.sc = sc
+class MLLibStreamingTestCase(unittest.TestCase):
+ def setUp(self):
+ self.sc = sc
+ self.ssc = StreamingContext(self.sc, 1.0)
+
+ def tearDown(self):
+ self.ssc.stop(False)
+
+ @staticmethod
+ def _ssc_wait(start_time, end_time, sleep_time):
+ while time() - start_time < end_time:
+ sleep(0.01)
+
+
def _squared_distance(a, b):
if isinstance(a, Vector):
return a.squared_distance(b)
@@ -863,6 +881,136 @@ class ElementwiseProductTests(MLlibTestCase):
eprod.transform(sparsevec), SparseVector(3, [0], [3]))
+class StreamingKMeansTest(MLLibStreamingTestCase):
+ def test_model_params(self):
+ """Test that the model params are set correctly"""
+ stkm = StreamingKMeans()
+ stkm.setK(5).setDecayFactor(0.0)
+ self.assertEquals(stkm._k, 5)
+ self.assertEquals(stkm._decayFactor, 0.0)
+
+ # Model not set yet.
+ self.assertIsNone(stkm.latestModel())
+ self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])
+
+ stkm.setInitialCenters(
+ centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0])
+ self.assertEquals(
+ stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]])
+ self.assertEquals(stkm.latestModel().clusterWeights, [1.0, 1.0])
+
+ def test_accuracy_for_single_center(self):
+ """Test that parameters obtained are correct for a single center."""
+ centers, batches = self.streamingKMeansDataGenerator(
+ batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0)
+ stkm = StreamingKMeans(1)
+ stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
+ input_stream = self.ssc.queueStream(
+ [self.sc.parallelize(batch, 1) for batch in batches])
+ stkm.trainOn(input_stream)
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 10.0, 0.01)
+ self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+ realCenters = array_sum(array(centers), axis=0)
+ for i in range(5):
+ modelCenters = stkm.latestModel().centers[0][i]
+ self.assertAlmostEqual(centers[0][i], modelCenters, 1)
+ self.assertAlmostEqual(realCenters[i], modelCenters, 1)
+
+ def streamingKMeansDataGenerator(self, batches, numPoints,
+ k, d, r, seed, centers=None):
+ rng = random.RandomState(seed)
+
+ # Generate centers.
+ centers = [rng.randn(d) for i in range(k)]
+
+ return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
+ for j in range(numPoints)]
+ for i in range(batches)]
+
+ def test_trainOn_model(self):
+ """Test the model on toy data with four clusters."""
+ stkm = StreamingKMeans()
+ initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
+ stkm.setInitialCenters(
+ centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
+
+ # Create a toy dataset by setting a tiny offest for each point.
+ offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
+ batches = []
+ for offset in offsets:
+ batches.append([[offset[0] + center[0], offset[1] + center[1]]
+ for center in initCenters])
+
+ batches = [self.sc.parallelize(batch, 1) for batch in batches]
+ input_stream = self.ssc.queueStream(batches)
+ stkm.trainOn(input_stream)
+ t = time()
+ self.ssc.start()
+
+ # Give enough time to train the model.
+ self._ssc_wait(t, 6.0, 0.01)
+ finalModel = stkm.latestModel()
+ self.assertTrue(all(finalModel.centers == array(initCenters)))
+ self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+
+ def test_predictOn_model(self):
+ """Test that the model predicts correctly on toy data."""
+ stkm = StreamingKMeans()
+ stkm._model = StreamingKMeansModel(
+ clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]],
+ clusterWeights=[1.0, 1.0, 1.0, 1.0])
+
+ predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
+ predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
+ predict_stream = self.ssc.queueStream(predict_data)
+ predict_val = stkm.predictOn(predict_stream)
+
+ result = []
+
+ def update(rdd):
+ rdd_collect = rdd.collect()
+ if rdd_collect:
+ result.append(rdd_collect)
+
+ predict_val.foreachRDD(update)
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 6.0, 0.01)
+ self.assertEquals(result, [[0], [1], [2], [3]])
+
+ def test_trainOn_predictOn(self):
+ """Test that prediction happens on the updated model."""
+ stkm = StreamingKMeans(decayFactor=0.0, k=2)
+ stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
+
+ # Since decay factor is set to zero, once the first batch
+ # is passed the clusterCenters are updated to [-0.5, 0.7]
+ # which causes 0.2 & 0.3 to be classified as 1, even though the
+ # classification based in the initial model would have been 0
+ # proving that the model is updated.
+ batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
+ batches = [sc.parallelize(batch) for batch in batches]
+ input_stream = self.ssc.queueStream(batches)
+ predict_results = []
+
+ def collect(rdd):
+ rdd_collect = rdd.collect()
+ if rdd_collect:
+ predict_results.append(rdd_collect)
+
+ stkm.trainOn(input_stream)
+ predict_stream = stkm.predictOn(input_stream)
+ predict_stream.foreachRDD(collect)
+
+ t = time()
+ self.ssc.start()
+ self._ssc_wait(t, 6.0, 0.01)
+ self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+
+
if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")